Apache Spark — MLlib — Формат ввода K-Means

Я хочу выполнить задачу K-Means и не пройти обучение модели, и меня выкинут из оболочки Sparks scala до того, как я получу результаты. Я не уверен, является ли формат ввода проблемой или что-то еще. Я использую Spark 1.0.0, и моя входная ткань (400 МБ) выглядит так:

ID,Category,PruductSize,PurchaseAMount
86252,3711,15.4,4.18
86252,3504,28,1.25
86252,3703,10.75,8.85
86252,3703,10.5,5.55
86252,2201,64,2.79
12262064,7203,32,8.49
etc.

Я не уверен, смогу ли я использовать первые два, потому что в файле примера MLlib используются только числа с плавающей запятой. Поэтому я также попробовал последние два:

16 2.49
64 3.29
56 1
etc.

Мой код ошибки в обоих случаях здесь:

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> 

scala> // Load and parse the data

scala> val data = sc.textFile("data/outkmeanssm.txt")
14/08/07 16:15:37 INFO MemoryStore: ensureFreeSpace(35456) called with curMem=0, maxMem=318111744
14/08/07 16:15:37 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 34.6 KB, free 303.3 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14

scala> val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MappedRDD[2] at map at <console>:16

scala> 

scala> // Cluster the data into two classes using KMeans

scala> val numClusters = 2
numClusters: Int = 2

scala> val numIterations = 20
numIterations: Int = 20

scala> val clusters = KMeans.train(parsedData, numClusters, numIterations)
14/08/07 16:15:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/08/07 16:15:38 WARN LoadSnappy: Snappy native library not loaded
14/08/07 16:15:38 INFO FileInputFormat: Total input paths to process : 1
14/08/07 16:15:38 INFO SparkContext: Starting job: takeSample at KMeans.scala:260
14/08/07 16:15:38 INFO DAGScheduler: Got job 0 (takeSample at KMeans.scala:260) with 7 output partitions (allowLocal=false)
14/08/07 16:15:38 INFO DAGScheduler: Final stage: Stage 0(takeSample at KMeans.scala:260)
14/08/07 16:15:38 INFO DAGScheduler: Parents of final stage: List()
14/08/07 16:15:38 INFO DAGScheduler: Missing parents: List()
14/08/07 16:15:38 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[6] at map at KMeans.scala:123), which has no missing parents
14/08/07 16:15:39 INFO DAGScheduler: Submitting 7 missing tasks from Stage 0 (MappedRDD[6] at map at KMeans.scala:123)
14/08/07 16:15:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 7 tasks
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:0 as 2221 bytes in 3 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:1 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:2 as TID 2 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:2 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:3 as TID 3 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:3 as 2221 bytes in 1 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:4 as TID 4 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:4 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:5 as TID 5 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:5 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO TaskSetManager: Starting task 0.0:6 as TID 6 on executor localhost: localhost (PROCESS_LOCAL)
14/08/07 16:15:39 INFO TaskSetManager: Serialized task 0.0:6 as 2221 bytes in 0 ms
14/08/07 16:15:39 INFO Executor: Running task ID 4
14/08/07 16:15:39 INFO Executor: Running task ID 1
14/08/07 16:15:39 INFO Executor: Running task ID 5
14/08/07 16:15:39 INFO Executor: Running task ID 6
14/08/07 16:15:39 INFO Executor: Running task ID 0
14/08/07 16:15:39 INFO Executor: Running task ID 3
14/08/07 16:15:39 INFO Executor: Running task ID 2
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO BlockManager: Found block broadcast_0 locally
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:0+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:100663296+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:201326592+24305610
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:33554432+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:67108864+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:134217728+33554432
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:167772160+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_0 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:0+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_2 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:67108864+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_1 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:33554432+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_4 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:134217728+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_6 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:201326592+24305610
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_3 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:100663296+33554432
14/08/07 16:15:39 INFO CacheManager: Partition rdd_3_5 not found, computing it
14/08/07 16:15:39 INFO HadoopRDD: Input split: file:/Users/admin/BD_Tools/spark-1.0.0/data/outkmeanssm.txt:167772160+33554432
14/08/07 16:16:53 ERROR Executor: Exception in task ID 5
java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99)
    at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)
14/08/07 16:16:59 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError: Java heap space
    at scala.collection.mutable.ResizableArray$class.ensureSize(ResizableArray.scala:99)
    at scala.collection.mutable.ArrayBuffer.ensureSize(ArrayBuffer.scala:47)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:83)
    at scala.collection.mutable.ArrayBuffer.$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:66)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
    at org.apache.spark.scheduler.Task.run(Task.scala:51)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:695)
14/08/07 16:17:00 WARN TaskSetManager: Lost TID 5 (task 0.0:5)
Chairs-MacBook-Pro:spark-1.0.0 admin$ 
Chairs-MacBook-Pro:spark-1.0.0 admin$ // Evaluate clustering by computing Within Set Sum of Squared Errors
-bash: //: is a directory
Chairs-MacBook-Pro:spark-1.0.0 admin$ val WSSSE = clusters.computeCost(parsedData)
-bash: syntax error near unexpected token `('
Chairs-MacBook-Pro:spark-1.0.0 admin$ println("Within Set Sum of Squared Errors = " + WSSSE)

Что мне не хватает?


person user3400996    schedule 07.08.2014    source источник


Ответы (1)


Ошибка «java.lang.OutOfMemoryError: пространство кучи Java», с которой вы сталкиваетесь, будет вызвана, когда вы попытаетесь добавить больше данных в область кучи в памяти, но размер этих данных больше, чем JVM может разместить в Java. куча пространства.

Это происходит из-за того, что приложениям, развернутым на виртуальной машине Java, разрешено использовать только ограниченный объем памяти. Это ограничение задается при запуске приложения. Чтобы усложнить ситуацию, память Java разделена на две разные области, одна из которых называется кучей. И ты исчерпал кучу.

Первое решение должно быть очевидным — когда у вас закончился определенный ресурс, вы должны увеличить доступность такого ресурса. В нашем случае: когда вашему приложению не хватает памяти в куче Java для правильной работы, исправить это так же просто, как изменить конфигурацию запуска JVM и добавить (или увеличить, если есть) следующее:

-Xmx1024m
person Ivo    schedule 08.08.2014