我试图让RxJs循环遍历我的流中的Observable,直到它处于某种状态,然后让流继续.具体来说,我正在将同步do/while循环转换为RxJs,但我假设同样的答案也可以用于for或while循环.
我认为我可以使用doWhile(),但似乎条件函数无法访问流中的项目,这似乎打败了我的目的.
我不完全确定正确的反应术语对于我想要的是什么,但这里有一个我想要的例子:
var source = new Rx.Observable.of({val: 0, counter: 3});
source.map(o => {
o.counter--;
console.log('Counter: ' + o.counter);
if (!o.counter) {
o.val = "YESS!";
}
return o;
})
.doWhile(o => {
return o.counter > 0;
})
.subscribe(
function (x) {
console.log('Next: ' + x.val);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
预期的产出是:
Counter: 3 Counter: 2 Counter: 1 Counter: 0 Next: YESS! Completed
假设这是一个可解决的问题,我不清楚如何标记循环时想要返回的"开始".
有一个扩展运算符,通过允许您递归调用选择器函数来使您关闭.在这种情况下,返回一个空的observable将是你的休息.见jsbin:
var source = Rx.Observable.return({val: 0, counter: 3}) .expand(value => { if(!value.counter) return Rx.Observable.empty(); value.counter -= 1; if(!value.counter) value.val = 'YESS'; return Rx.Observable.return(value) }) .subscribe(value => console.log(value.counter ? 'Counter: ' + value.counter : 'Next: ' + value.val));
不完全是你想要的但是关闭,使用expand
运算符和信令结束递归Rx.Observable.empty
(http://jsfiddle.net/naaycu71/3/):
var source = new Rx.Observable.of({val: 0, counter: 3}); source.expand(function(o) { console.log('Counter: ' + o.counter); o.counter--; return (o.counter >= 0) ? Rx.Observable.just(o) : Rx.Observable.empty() }) .subscribe( function (x) { console.log('Next: ' , x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
输出:
Next: Object {val: 0, counter: 3} Counter: 3 Next: Object {val: 0, counter: 2} Counter: 2 Next: Object {val: 0, counter: 1} Counter: 1 Next: Object {val: 0, counter: 0} Counter: 0 Completed