努力解决"ValueError:解压缩的值太多"错误,在运行下面的代码时,目的是为每个键构建一个值的直方图:
%pyspark import datetime from pyspark.sql import SQLContext, Row def featVecSms( x ): sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' ) hourOfDay = int( sttm.strftime( '%H' ) ) dayOfWeek = int( sttm.strftime( '%w' ) ) dayOfMonth = int( sttm.strftime( '%d' ) ) duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm duration = duration.total_seconds() service = x[3] resultCode = int( x[4] ) msc = x[5] actionMap = { "0":'fsm', "1":'fsm', "2000":'sri', "2001":'sri', "2100":'sri', "2101":'sri', "2102":'fsm', "2200":'sri', "2201":'sri', "2202":'fsm', "2203":'fsm', "2204":'fsm', "2205":'fsm', "2206":'fsm', "2207":'sri', "2208":'sri', "2209":'sri', "2210":'fsm', "2211":'fsm', "2212":'fsm', "2213":'fsm', "2214":'fsm', "2215":'sri', "2216":'fsm' } action = actionMap.get( x[4] ) return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode, msc, action ) textFile = sc.textFile("/export/sampleMsesAll.txt") enTuples = textFile.map(lambda x: x.split("', u'")) msRec = enTuples.map( featVecSms ) def countByCrit( accVal, currVal, idx ): accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1 return accVal def countByTod( accVal, currVal ): return countByCrit( accVal, currVal, 1 ) todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ] msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod ) #.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) ) msTodSuccess.collect()
抛出错误:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func return func(split, prev_func(split, iterator)) File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func return func(split, prev_func(split, iterator)) File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func return f(iterator) File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally merger.mergeValues(iterator) File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues for k, v in iterator: ValueError: too many values to unpack
数据如下所示:
$ head -15 /export/sampleMses10M.txt/part-00000 (u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314') (u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined') (u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027') (u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027') (u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027') (u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')
只有123k样本,但申请中应该有数千万条记录.
您的代码问题在于您的类型错误.
首先,所有*byKey
方法都在运行PairwiseRDDs
.在Python它意味着RDD含有长度为2或其它结构(可以称之为的元组pair
),其可以被解压缩这样的:
k, v = pair
msRec
,其中包含长度为9的元素,显然在这里不起作用.
第二个问题是你使用了错误的转换.让我们来看看foldByKey
Scala 中的签名:
def foldByKey(zeroValue: V)(func: (V, V) ? V): RDD[(K, V)]
where V
是值的类型(RDD[(K, V)]
).正如您所看到的那样zeroValue
,返回的函数类型应该与值的类型相同,这显然不是这里的情况.
如果结果类型与输入类型不同,则应使用combineByKey
或aggregateByKey
.