Spark MLLIB TFIDF Текстовая кластеризация Python

Я новичок в Spark и пытаюсь объединить новостные статьи в кластеры, используя Spark API в Python. Новостные статьи были просканированы и сохранены в локальной папке /input/. Он содержит около 100 небольших текстовых файлов.

В качестве первого шага я настроил свой SparkContent

sconf= SparkConf().setMaster("local").setAppName("My App")
sc= SparkContext(conf=sconf)

Затем я создаю HashingTF и загружаю свои данные с помощью sc.wholeTextFiles(). Каталог — это путь к папке, содержащей текстовые файлы.

htf=HashingTF()
txtdata=sc.wholeTextFiles(directory)

Теперь я хочу разделить каждый текстовый файл отдельно и вывести TF-IDF для каждого файла. Первая проблема заключается в том, что функция разделения не работает для txtdata. Я использую следующую функцию:

split_data=txtdata.map(lambda x: x.split(" "))

Я получаю следующую ошибку:

split_data=sc.wholeTextFiles(directory).map(lambda x: x.split(" "))
AttributeError: 'tuple' object has no attribute 'split'

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137)
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:174)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Наконец, я планирую запустить:

temp=htf.transform(split_data) temp.cache() idf = IDF().fit(temp)
tfidf = idf.transform(temp)

person Schon    schedule 19.02.2015    source источник
comment
Не знаком с искрой, но в сообщениях об ошибках в основном говорится, что x - это не строка (вопреки вашим ожиданиям), а скорее кортеж. Попробуйте отладить свой код, возможно, вам просто нужно распаковать кортеж в x как filename, content = x. Вам нужно будет определить именованную функцию, поскольку ее нельзя решить с помощью лямбда-выражений. def splitter(x): ..., а затем txtdata.map(splitter)   -  person Matt    schedule 19.02.2015
comment
Мы пытаемся кластеризовать строку/текст. Следовательно, хотелось бы получить подробную информацию о том, как вы закодировали то же самое. Мы пробуем это на Spark EC2.   -  person Chaitanya Bapat    schedule 20.07.2016


Ответы (1)


Функция wholeTextFiles возвращает СДР из (filename, string) пар. Итак, сначала вам нужно сделать что-то вроде split_data=txtdata.map(lambda (k, v): v.split(" "))

person Nick Pentreath    schedule 20.02.2015