当前位置:  开发笔记 > 编程语言 > 正文

org.apache.spark.ml.feature.Tokenizer中的NullPointerException

如何解决《org.apache.spark.ml.feature.Tokenizer中的NullPointerException》经验,为你挑选了1个好方法。

我想分别在titledescription字段上分别使用TF-IDF功能,然后将这些功能组合在一起,VectorAssembler以便最终的分类器可以对这些功能进行操作.

如果我使用简单的单个串行流,它工作正常

titleTokenizer -> titleHashingTF -> VectorAssembler

但是我需要两个:

titleTokenizer       -> titleHashingTF 
                                                -> VectorAssembler
descriptionTokenizer -> descriptionHashingTF

代码在这里:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StringIndexer, VectorAssembler}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

import org.apache.log4j.{Level, Logger}


object SimplePipeline {
    def main(args: Array[String]) {

        // setup boilerplate
        val conf = new SparkConf()
            .setAppName("Pipeline example")
        val sc = new SparkContext(conf)
        val spark = SparkSession
            .builder()
            .appName("Session for SimplePipeline")
            .getOrCreate()

        val all_df = spark.read.json("file:///Users/me/data.json")
        val numLabels = all_df.count()

        // split into training and testing
        val Array(training, testing) = all_df.randomSplit(Array(0.75, 0.25))
        val nTraining = training.count();
        val nTesting = testing.count();

        println(s"Loaded $nTraining training labels...");
        println(s"Loaded $nTesting  testing labels...");

        // convert string labels to integers
        val indexer = new StringIndexer()
            .setInputCol("rating")
            .setOutputCol("label")

        // tokenize our string inputs
        val titleTokenizer = new Tokenizer()
            .setInputCol("title")
            .setOutputCol("title_words")
        val descriptionTokenizer = new Tokenizer()
            .setInputCol("description")
            .setOutputCol("description_words")

        // count term frequencies
        val titleHashingTF = new HashingTF()
            .setNumFeatures(1000)
            .setInputCol(titleTokenizer.getOutputCol)
            .setOutputCol("title_tfs")
        val descriptionHashingTF = new HashingTF()
            .setNumFeatures(1000)
            .setInputCol(descriptionTokenizer.getOutputCol)
            .setOutputCol("description_tfs")

        // combine features together
        val assembler = new VectorAssembler()
            .setInputCols(Array(titleHashingTF.getOutputCol, descriptionHashingTF.getOutputCol))
            .setOutputCol("features")

        // set params for our model
        val lr = new LogisticRegression()
            .setMaxIter(10)
            .setRegParam(0.01)

        // pipeline that combines all stages
        val stages = Array(indexer, titleTokenizer, titleHashingTF, descriptionTokenizer, descriptionHashingTF, assembler, lr);
        val pipeline = new Pipeline().setStages(stages);

        // Fit the pipeline to training documents.
        val model = pipeline.fit(training)

        // Make predictions.
        val predictions = model.transform(testing)

        // Select example rows to display.
        predictions.select("label", "rawPrediction", "prediction").show()

        sc.stop()
    }
}

我的数据文件只是JSON对象的换行符分隔文件:

{"title" : "xxxxxx", "description" : "yyyyy" .... }
{"title" : "zzzzzz", "description" : "zxzxzx" .... }

我得到的错误很长很难理解,但重要的部分(我认为)是java.lang.NullPointerException:

ERROR Executor: Exception in task 0.0 in stage 9.0 (TID 12)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
    at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
    ... 23 more

我应该如何正确地制作我的管道来做到这一点?

(我也是Scala的新手)



1> user6910411..:

这里的问题是你没有验证数据和一些值NULL.重现这个很容易:

val df = Seq((1, Some("abcd bcde cdef")), (2, None)).toDF("id", "description")

val tokenizer = new Tokenizer().setInputCol("description")
tokenizer.transform(df).foreach(_ => ())
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$1: (string) => array)
  at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1072)
...
Caused by: java.lang.NullPointerException
  at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
...

你可以举例:

tokenizer.transform(df.na.drop(Array("description")))

或用空字符串替换它们:

tokenizer.transform(df.na.fill(Map("description" -> "")))

在你的应用程序中更有意义.

推荐阅读
手机用户2402852307
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有