我开发了一个用于HDFS数据摄取的NiFi流程原型.现在我想提高整体表现,但似乎我无法真正前进.
流接收输入csv文件(每行有80个字段),在行级别拆分,对字段应用一些转换(使用顺序执行的4个自定义处理器),将新行缓冲到csv文件中,将它们输出到HDFS.我以这样的方式开发了处理器:当读取每个单独的记录并且将其字段移动到flowfile属性时,仅访问流文件的内容一次.测试已在亚马逊EC2 m4.4xlarge实例(16核CPU,64 GB RAM)上进行.
这是我到目前为止所尝试的:
将flowfile存储库和内容存储库移动到不同的SSD驱动器上
将原产地存储库移到内存中(NiFi无法跟上事件发生率)
根据配置最佳实践配置系统
我已经尝试为每个处理器分配多个线程,以便达到不同数量的总线程
我已经尝试增加nifi.queue.swap.threshold并将背压设置为永远不会达到交换限制
尝试了不同的JVM内存设置,从8到32 GB(与G1GC结合使用)
我试过增加实例规范,没有任何改变
从我执行的监控来看,它看起来像磁盘不是瓶颈(它们在很长一段时间内基本上处于空闲状态,显示计算实际上是在内存中执行)并且平均CPU负载低于60%.
我能得到的最多的是215K行/分钟,这是3,5k行/秒. 就数量而言,它仅为4,7 MB/s.我的目标是肯定比这更大的东西.就像比较一样,我创建了一个读取文件的流程,将其拆分为行,将它们合并为块并将数据合并到磁盘上.在这里,我得到12k行/秒,或17 MB/s.看起来也不会太快,让我觉得我可能做错了.有没有人有关于如何提高表现的建议?在集群上运行NiFi而不是随实例规范增长,我将从中受益多少?谢谢你们
事实证明,糟糕的表现是开发的自定义处理器和合并内容处理器的组合.在hortonworks社区论坛上反映的同样问题得到了有趣的反馈.
关于第一个问题,建议是将SupportsBatching
注释添加到处理器.这允许处理器将多个提交批处理在一起,并允许NiFi用户通过配置菜单中的处理器执行来支持延迟或吞吐量.其他信息可以在这里的文档中找到.
另一个发现是MergeContent
内置处理器本身似乎没有最佳性能,因此如果可能,应考虑修改流程并避免合并阶段.