当前位置:  开发笔记 > 大数据 > 正文

当消息"完全处理"时,Storm如何知道?

如何解决《当消息"完全处理"时,Storm如何知道?》经验,为你挑选了1个好方法。

(还有关于超时和maxSpoutPending的几个问题)

我在Storm文档中看到很多关于消息被完全处理的引用.但是我的KafkaSpout如何知道邮件何时完全处理?

希望它能够识别我的螺栓的连接方式,所以当我的Stream中的最后一个螺栓产生一个元组时,喷口知道我的消息何时被处理?

否则,我会想象在超时期限到期后,检查消息的确认状态,如果由acking /锚定XOR指示,则认为它被处理.但我希望事实并非如此?

我还有关于maxTuplesPending和超时配置的相关问题.

如果我将maxTuplePending设置为10k,那么我是否正确认为每个spout实例将继续发出元组,直到该spout实例在飞行中跟踪10k元组,10k元组尚未完全处理?当一个当前的飞行消息被完全处理时,会发出一个新的元组?

最后,这是否与超时配置有关?在发出新消息之前,spout是否以任何方式等待配置的超时?或者,如果消息处于停滞状态/处理速度慢,超时配置是否会发挥作用,导致它因超时而失败?

更简洁(或希望更清楚),是否有效将我的超时设置为30分钟,除非消息不会失败,除非最终的螺栓在30分钟内被激活?还是有其他影响,例如影响喷口排放率的超时配置?

对不起漫长而漫无边际的问题,我们深表歉意.提前感谢您的回复.

*编辑以进一步澄清

这是我担心的原因,因为我的消息不一定贯穿整个Stream.

假设我有螺栓A,B,C,D.大多数时候消息将从A-> B - > - > D传递.但是我有一些消息故意停在螺栓A上.A会激活它们但不会发出它们(因为我的业务逻辑,在那些情况下我确实需要进一步处理消息).

那么我的KafkaSpout是否会知道已经完成处理的A消息但未从A发出的消息?在这种情况下,一旦Bolt A完成,我希望从喷口发出另一条消息.



1> Matthias J. ..:

Storm通过UDF代码必须使用的锚定机制在整个拓扑中跟踪元组.这种锚定导致所谓的元组树,树的根是由喷口发出的元组,所有其他节点(以树结构连接)表示使用输入元组作为锚点的螺栓发出的元(这只是一个逻辑模型,并没有在Storm中以这种方式实现).

例如,Spout发出一个句子元组,由句子中的第一个螺栓分开,一些单词由第二个螺栓过滤,单词计数由第三个螺栓应用.最后,接收器螺栓将结果写入文件.树看起来像这样:

"this is an example sentence" -+-> "this" 
                               +-> "is" 
                               +-> "an"
                               +-> "example" -> "example",1 -> "example",1
                               +-> "sentence" -> "sentence",1 -> "sentence",1

初始句子由spout发出,用作bolt1的锚点,用于发出的所有标记,并由bolt1获取.Bolt2过滤掉"this","is"和"an",并且仅仅是三个元组."示例"和"句子"只是被转发,用作输出元组的锚点,然后被激活.在bolt2中也是如此,最终的接收器螺栓只会响应所有传入的元组.

此外,Storm追踪所有元组的所有特征,即来自中间螺栓和下沉螺栓.首先,spout将输出元组的ID发送给acker任务.每次使用元组作为锚点时,acker也会获得一个消息,其中包含锚元组ID和输出元组ID(由Storm自动生成).来自螺栓的行动也会进行与XOR相同的acker任务.如果收到所有的ack - 即,对于spout和所有递归锚定的输出元组 - (XOR结果将为零),acker向spout发送消息,表明元组已完全处理并且后退调用Spout.ack(MessageId)发生(即,当完全处理元组时,立即进行反向调用).此外,如果acker注册的元组长于超时,则ackers会定期检查.如果发生这种情况,acker将丢弃元组ID,并向元组发送一条消息,表示元组失败(导致调用Spout.fail(MessageId)).

此外,Spouts会保留飞行中所有元组的计数,Spout.nextTuple()如果此计数超过maxTuplesPending参数,则停止呼叫.据我所知,该参数全局应用,即每个喷口任务的局部计数总结,并将全局计数与参数进行比较(不确定如何详细实施).

所以timeout参数独立于maxTuplesPending.


再次感谢所有的信息和努力,这一切都很有意义(大多数情况下).我仍然有点困惑,可能我可以解释一下令我困惑的场景:Spout发出一个元组,该集合现在有一个锚点.我接收并确认元组,该组现在是空的.我可能会很快发出元组,为集合添加一个新的achor.风暴如何知道等待呼叫Spout.ack?当我第一次听到元组时,它不会检测到空集吗?
你的观察是正确的.在一个元组被激活之后,你不能再将其用作锚.因此,只要您想使用元组作为锚点,就需要在相应的螺栓中推迟ack调用.
推荐阅读
小妖694_807
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有