(还有关于超时和maxSpoutPending的几个问题)
我在Storm文档中看到很多关于消息被完全处理的引用.但是我的KafkaSpout如何知道邮件何时完全处理?
希望它能够识别我的螺栓的连接方式,所以当我的Stream中的最后一个螺栓产生一个元组时,喷口知道我的消息何时被处理?
否则,我会想象在超时期限到期后,检查消息的确认状态,如果由acking /锚定XOR指示,则认为它被处理.但我希望事实并非如此?
我还有关于maxTuplesPending和超时配置的相关问题.
如果我将maxTuplePending设置为10k,那么我是否正确认为每个spout实例将继续发出元组,直到该spout实例在飞行中跟踪10k元组,10k元组尚未完全处理?当一个当前的飞行消息被完全处理时,会发出一个新的元组?
最后,这是否与超时配置有关?在发出新消息之前,spout是否以任何方式等待配置的超时?或者,如果消息处于停滞状态/处理速度慢,超时配置是否会发挥作用,导致它因超时而失败?
更简洁(或希望更清楚),是否有效将我的超时设置为30分钟,除非消息不会失败,除非最终的螺栓在30分钟内被激活?还是有其他影响,例如影响喷口排放率的超时配置?
对不起漫长而漫无边际的问题,我们深表歉意.提前感谢您的回复.
*编辑以进一步澄清
这是我担心的原因,因为我的消息不一定贯穿整个Stream.
假设我有螺栓A,B,C,D.大多数时候消息将从A-> B - > - > D传递.但是我有一些消息故意停在螺栓A上.A会激活它们但不会发出它们(因为我的业务逻辑,在那些情况下我确实需要进一步处理消息).
那么我的KafkaSpout是否会知道已经完成处理的A消息但未从A发出的消息?在这种情况下,一旦Bolt A完成,我希望从喷口发出另一条消息.
Storm通过UDF代码必须使用的锚定机制在整个拓扑中跟踪元组.这种锚定导致所谓的元组树,树的根是由喷口发出的元组,所有其他节点(以树结构连接)表示使用输入元组作为锚点的螺栓发出的元组(这只是一个逻辑模型,并没有在Storm中以这种方式实现).
例如,Spout发出一个句子元组,由句子中的第一个螺栓分开,一些单词由第二个螺栓过滤,单词计数由第三个螺栓应用.最后,接收器螺栓将结果写入文件.树看起来像这样:
"this is an example sentence" -+-> "this" +-> "is" +-> "an" +-> "example" -> "example",1 -> "example",1 +-> "sentence" -> "sentence",1 -> "sentence",1
初始句子由spout发出,用作bolt1的锚点,用于发出的所有标记,并由bolt1获取.Bolt2过滤掉"this","is"和"an",并且仅仅是三个元组."示例"和"句子"只是被转发,用作输出元组的锚点,然后被激活.在bolt2中也是如此,最终的接收器螺栓只会响应所有传入的元组.
此外,Storm追踪所有元组的所有特征,即来自中间螺栓和下沉螺栓.首先,spout将输出元组的ID发送给acker任务.每次使用元组作为锚点时,acker也会获得一个消息,其中包含锚元组ID和输出元组ID(由Storm自动生成).来自螺栓的行动也会进行与XOR相同的acker任务.如果收到所有的ack - 即,对于spout和所有递归锚定的输出元组 - (XOR结果将为零),acker向spout发送消息,表明元组已完全处理并且后退调用Spout.ack(MessageId)
发生(即,当完全处理元组时,立即进行反向调用).此外,如果acker注册的元组长于超时,则ackers会定期检查.如果发生这种情况,acker将丢弃元组ID,并向元组发送一条消息,表示元组失败(导致调用Spout.fail(MessageId)
).
此外,Spouts会保留飞行中所有元组的计数,Spout.nextTuple()
如果此计数超过maxTuplesPending
参数,则停止呼叫.据我所知,该参数全局应用,即每个喷口任务的局部计数总结,并将全局计数与参数进行比较(不确定如何详细实施).
所以timeout
参数独立于maxTuplesPending
.