我正在尝试使用rx-java构建一个健壮的处理管道,但我遇到了一个问题.
这是一个例子:
public static void main(String[] args) { AtomicInteger div = new AtomicInteger(-1); Observable.just(1, 1, 1).map(item -> 1 / div.getAndIncrement()) .retry().subscribe(item -> System.out.println(item)); }
在这种情况下的输出是4项,因为非流可观察被重放,但这不相关所以为了简单起见,请忽略它.我添加了评论,显示了达到结果的计算和重新订阅的重点:
-1 // 1 / -1 // 1/0 (error) - resubscribes to observable 1 // 1 / 1 0 // 1 / 2 0 // 1 / 3
发生这种情况是因为retry
操作员(作为所有重试操作符)在传递错误通知后导致重新订阅.
我的预期输出是:
-1 // 1 / -1 // 1/0 (error) - resubscribe but resume erroneous item (1) 1 // 1 / 1 0 // 1 / 0
当传递错误通知时,重新订阅过程应该包括流中的错误项(在同一项上重试) - 因为错误是外部的并且没有嵌入在已处理的项中(因此重新处理是有意义的).
这是一些外部错误(如数据库连接)的情况,我希望那些未处理的项目会被延迟重新处理.我知道使用标准重试操作符可以重新订阅,但所有这些操作都会放弃错误的项目.
我还考虑将我的所有处理包装在try-catch中,我怀疑错误是可能的,但是我的处理代码中添加了样板代码,我不想这样做.
所以我的问题是:有没有一种标准方法可以重试失败的项目?
我已经考虑过做(未经测试)的事情:
someSubject.flatMap( item-> Observable.just(item) .doOnError(err -> someSubject.onNext(item))).onErrorX...
并压制错误......
但这似乎不自然,在我的用例中很昂贵(为每个项目创建一个可观察的).
是否有操作符或操作符组合可能导致重试将错误的项目传递回observable的开头而不会"破坏"或将项目包装在不同的可观察对象中?
这也是我习惯使用异步重试的重试样式.
这通常不适用于RxJava.如果元素的处理失败,则没有从该位置恢复的内置方式.您可以做的最好的事情是尝试捕获有问题的函数回调并手动重试.第二个最好的事情是使用flatMap
可能有问题的计算是Observable
可以单独重试的内部的地方:
source.flatMap(v -> Observable.just(v).map(v -> v / counter.getAndIncrement()).retry() )