当前位置:  开发笔记 > 编程语言 > 正文





import datetime
from pyspark.sql import SQLContext, Row

def featVecSms( x ):
    sttm = datetime.datetime.strptime( x[1], '%Y%m%dT%H%M%S.%f' )
    hourOfDay = int( sttm.strftime( '%H' ) )
    dayOfWeek = int( sttm.strftime( '%w' ) )
    dayOfMonth = int( sttm.strftime( '%d' ) )
    duration = datetime.datetime.strptime( x[2], '%Y%m%dT%H%M%S.%f' ) - sttm
    duration = duration.total_seconds()
    service = x[3]
    resultCode = int( x[4] )
    msc = x[5]
    actionMap = {
    action = actionMap.get( x[4] )
    return ( x[0], hourOfDay, dayOfWeek, dayOfMonth, duration, service, resultCode,  msc, action )

textFile = sc.textFile("/export/sampleMsesAll.txt")
enTuples = textFile.map(lambda x: x.split("', u'"))
msRec = enTuples.map( featVecSms )

def countByCrit( accVal, currVal, idx ):
    accVal[ int( currVal[ idx ] ) ] = accVal( [ int( currVal[ idx ] ) ] ) + 1
    return accVal

def countByTod( accVal, currVal ):
    return countByCrit( accVal, currVal, 1 )

todmap = [ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ]
msTodSuccess = msRec.filter( lambda x: x[2] >= 0 ).foldByKey( todmap, countByTod )
#.map( lambda x: ( x[0], reduce( lambda x,y: x + str(y), x[2], "" ) ) )



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 52.0 failed 1 times, most recent failure: Lost task 1.0 in stage 52.0 (TID 115, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/pyspark/rdd.py", line 1780, in combineLocally
  File "/export/edrsSmartRetry/code/spark-1.5.2/python/lib/pyspark.zip/pyspark/shuffle.py", line 266, in mergeValues
    for k, v in iterator:
ValueError: too many values to unpack


$ head -15 /export/sampleMses10M.txt/part-00000 
(u'263775998314', u'20151119T180719.000349', u'20151120T074928.837095', u'GoodMorning', u'2210', u'263775998314')
(u'263779563529', u'20151119T181318.000201', u'20151120T122346.432229', u'GoodMorning', u'2204', u'undefined')
(u'263783104169', u'20151120T092503.000629', u'20151120T111833.430649', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092316.000331', u'20151120T125251.794699', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T125514.904726', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T135521.395529', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092503.000629', u'20151120T145418.069707', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T092621.000557', u'20151120T145526.133207', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154208.000410', u'20151120T154345.379585', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T154647.354102', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T154904.993095', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154319.000636', u'20151120T164653.173588', u'Strangenet', u'2215', u'263770010027')
(u'263783104169', u'20151120T154406.000245', u'20151120T164909.888433', u'Strangenet', u'2215', u'263770010027')
(u'263774918225', u'20151120T090505.000269', u'20151120T102248.630188', u'StrangeCash', u'0', u'263770010027')
(u'263782099158', u'20151119T182038.000537', u'20151120T064040.240860', u'GoodMorning', u'0', u'263770010500')


1> zero323..:



k, v = pair


第二个问题是你使用了错误的转换.让我们来看看foldByKeyScala 中的签名:

def foldByKey(zeroValue: V)(func: (V, V) ? V): RDD[(K, V)] 

where V是值的类型(RDD[(K, V)]).正如您所看到的那样zeroValue,返回的函数类型应该与值的类型相同,这显然不是这里的情况.


DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有