当前位置:  开发笔记 > 编程语言 > 正文

读取Amazon Kinesis Firehose流写入s3的数据

如何解决《读取AmazonKinesisFirehose流写入s3的数据》经验,为你挑选了2个好方法。

我正在写Kinesis Firehose流的记录,最终由Amazon Kinesis Firehose写入S3文件.

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}

数据写入S3看起来像:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

没有COMMA分离.

像Json阵列一样没有启动支架

[

没有结束支持,如在Json阵列中

]

我想读取这些数据获取ItemPurchase对象的列表.

List purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

读取此数据的正确方法是什么?



1> Tom Chapin..:

令我感到困惑的是,Amazon Firehose以这种方式将JSON消息转储到S3,并且不允许您设置分隔符或其他任何内容。

最终,我发现解决该问题的技巧是使用JSON raw_decode方法处理文本文件

这将允许您读取一堆串联的JSON记录,而它们之间没有任何分隔符。

Python代码:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1



2> Xuehua Jiang..:

我也有同样的问题,这是我如何解决的.

    将"} {"替换为"} \n {"

    由"\n"分割的行.

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

嵌套的json对象有几个"}",因此用"}"分割线并不能解决问题.


我考虑过做类似的事情,但是我认为,如果JSON对象中的字符串之一恰好包含} {,那么该技术将会失效。也许,如果您遍历每个字符,如果您打了“”(表示输入或离开字符串),请切换一个布尔值,计算您所在的对象的级别(在字符串外部看到{时增加,在字符串外部看到}时减少字符串),然后将对象的末尾视为水平计数器再次达到0时。
推荐阅读
135369一生真爱_890
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有