那些熟悉lmax环形缓冲区( disruptor )的人都知道,该数据结构的最大优点之一就是它批准了事件,当我们有一个可以利用批处理的消费者时,系统可以自动调整负载,你抛出的事件越多越好.
我想我们无法通过Observable实现相同的效果(定位批处理功能).我已经尝试过Observable.buffer,但这是非常不同的,缓冲区将等待并且在预期的事件数量没有到达时不会发出批处理.我们想要的是完全不同的.
如果subriber正在等待一个批次Observable
,当一个项目到达流时,它会发出一个由订阅者处理的单个元素批处理,而正在处理其他元素到达并收集到下一个批处理中,一旦订阅者完成执行它获得下一批与自上次处理开始以来已经到达的事件数...
因此,如果我们的订户足够快以一次处理一个事件,它将这样做,如果负载变高,它仍将具有相同的处理频率,但每次更多事件(从而解决背压问题)...不同缓冲区将坚持并等待批次填满.
有什么建议?或者我应该使用环形缓冲区?
RxJava和Disruptor代表两种不同的编程方法.
我对Disruptor没有经验,但基于视频会话,它基本上是一个大缓冲区,生产者发出像firehose这样的数据,消费者旋转/产生/阻塞直到数据可用.
另一方面,RxJava旨在实现非阻塞事件传递.我们也有ringbuffers,特别是在observeOn中,它充当生产者和消费者之间的异步边界,但是它们要小得多,我们通过应用协同例程方法避免缓冲区溢出和缓冲膨胀.协同程序归结为发送到回调的回调,因此你可以回调我们的回调,按你的节奏发送一些数据.此类请求的频率决定了起搏.
有些数据源不支持此类合作流,并且onBackpressureXXX
如果下游请求速度不够快,则需要其中一个运算符缓冲/丢弃值.
如果您认为可以比一个接一个地更有效地批量处理数据,则可以使用buffer
具有重载的运算符来指定缓冲区的持续时间:例如,您可以拥有10 ms的数据,与方式无关许多价值在此期间到达.
通过请求频率控制批量大小是棘手的,并且可能具有不可预见的后果.通常,问题在于,如果您request(n)
来自批处理源,则表明您可以处理n个元素,但源现在必须创建大小为1的n个缓冲区(因为类型是Observable
).相反,如果没有调用请求,则运算符缓冲数据,从而产生更长的缓冲区.这些行为会在处理中引入额外的开销,如果你真的可以跟上并且还必须将冷源变成一个firehose(因为否则你拥有的本质上>
buffer(1)
),这本身现在可能导致缓冲膨胀.