我正在写Kinesis Firehose流的记录,最终由Amazon Kinesis Firehose写入S3文件.
我的记录对象看起来像
ItemPurchase { String personId, String itemId }
数据写入S3看起来像:
{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}
没有COMMA分离.
像Json阵列一样没有启动支架
[
没有结束支持,如在Json阵列中
]
我想读取这些数据获取ItemPurchase对象的列表.
Listpurchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
读取此数据的正确方法是什么?
令我感到困惑的是,Amazon Firehose以这种方式将JSON消息转储到S3,并且不允许您设置分隔符或其他任何内容。
最终,我发现解决该问题的技巧是使用JSON raw_decode方法处理文本文件
这将允许您读取一堆串联的JSON记录,而它们之间没有任何分隔符。
Python代码:
import json decoder = json.JSONDecoder() with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file: content = content_file.read() content_length = len(content) decode_index = 0 while decode_index < content_length: try: obj, decode_index = decoder.raw_decode(content, decode_index) print("File index:", decode_index) print(obj) except JSONDecodeError as e: print("JSONDecodeError:", e) # Scan forward and keep trying to decode decode_index += 1
我也有同样的问题,这是我如何解决的.
将"} {"替换为"} \n {"
由"\n"分割的行.
input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE)) .flatMap(lambda line: line.split("\n"))
嵌套的json对象有几个"}",因此用"}"分割线并不能解决问题.