我想分别在title
和description
字段上分别使用TF-IDF功能,然后将这些功能组合在一起,VectorAssembler
以便最终的分类器可以对这些功能进行操作.
如果我使用简单的单个串行流,它工作正常
titleTokenizer -> titleHashingTF -> VectorAssembler
但是我需要两个:
titleTokenizer -> titleHashingTF -> VectorAssembler descriptionTokenizer -> descriptionHashingTF
代码在这里:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StringIndexer, VectorAssembler} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.log4j.{Level, Logger} object SimplePipeline { def main(args: Array[String]) { // setup boilerplate val conf = new SparkConf() .setAppName("Pipeline example") val sc = new SparkContext(conf) val spark = SparkSession .builder() .appName("Session for SimplePipeline") .getOrCreate() val all_df = spark.read.json("file:///Users/me/data.json") val numLabels = all_df.count() // split into training and testing val Array(training, testing) = all_df.randomSplit(Array(0.75, 0.25)) val nTraining = training.count(); val nTesting = testing.count(); println(s"Loaded $nTraining training labels..."); println(s"Loaded $nTesting testing labels..."); // convert string labels to integers val indexer = new StringIndexer() .setInputCol("rating") .setOutputCol("label") // tokenize our string inputs val titleTokenizer = new Tokenizer() .setInputCol("title") .setOutputCol("title_words") val descriptionTokenizer = new Tokenizer() .setInputCol("description") .setOutputCol("description_words") // count term frequencies val titleHashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(titleTokenizer.getOutputCol) .setOutputCol("title_tfs") val descriptionHashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(descriptionTokenizer.getOutputCol) .setOutputCol("description_tfs") // combine features together val assembler = new VectorAssembler() .setInputCols(Array(titleHashingTF.getOutputCol, descriptionHashingTF.getOutputCol)) .setOutputCol("features") // set params for our model val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.01) // pipeline that combines all stages val stages = Array(indexer, titleTokenizer, titleHashingTF, descriptionTokenizer, descriptionHashingTF, assembler, lr); val pipeline = new Pipeline().setStages(stages); // Fit the pipeline to training documents. val model = pipeline.fit(training) // Make predictions. val predictions = model.transform(testing) // Select example rows to display. predictions.select("label", "rawPrediction", "prediction").show() sc.stop() } }
我的数据文件只是JSON对象的换行符分隔文件:
{"title" : "xxxxxx", "description" : "yyyyy" .... } {"title" : "zzzzzz", "description" : "zxzxzx" .... }
我得到的错误很长很难理解,但重要的部分(我认为)是java.lang.NullPointerException
:
ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 12) org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39) at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39) ... 23 more
我应该如何正确地制作我的管道来做到这一点?
(我也是Scala的新手)
这里的问题是你没有验证数据和一些值NULL
.重现这个很容易:
val df = Seq((1, Some("abcd bcde cdef")), (2, None)).toDF("id", "description") val tokenizer = new Tokenizer().setInputCol("description") tokenizer.transform(df).foreach(_ => ())
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
...
Caused by: java.lang.NullPointerException
at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
...
你可以举例:
tokenizer.transform(df.na.drop(Array("description")))
或用空字符串替换它们:
tokenizer.transform(df.na.fill(Map("description" -> "")))
在你的应用程序中更有意义.