我正在将observable推入像这样的数组......
var tasks$ = []; tasks$.push(Observable.timer(1000)); tasks$.push(Observable.timer(3000)); tasks$.push(Observable.timer(10000));
我想要一个在所有任务$完成后发出的Observable.请记住,在实践中,任务$没有已知数量的Observable.
我已经尝试了,Observable.zip(tasks$).subscribe()
但是如果只有一个任务,这似乎失败了,并且让我相信ZIP需要偶数个元素才能按照我期望的方式工作.
我试过Observable.concat(tasks$).subscribe()
但是concat运算符的结果似乎是一个可观察数组......例如与输入基本相同.你甚至不能打电话订阅它.
在C#中,这将类似于Task.WhenAll()
.在ES6承诺它将类似于Promise.all()
.
我遇到了一些SO问题,但它们似乎都在等待已知数量的流(例如将它们映射到一起).
如果要编写在所有源可观察对象完成时发出的可观察对象,可以使用forkJoin
:
import { Observable } from 'rxjs/Observable'; import 'rxjs/add/observable/forkJoin'; import 'rxjs/add/operator/first'; var tasks$ = []; tasks$.push(Observable.timer(1000).first()); tasks$.push(Observable.timer(3000).first()); tasks$.push(Observable.timer(10000).first()); Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
您可以使用zip
。
组合多个Observable来创建一个Observable,其值是根据其每个输入Observable的值依次计算得出的。
const obsvA = this._service.getObjA(); const obsvB = this._service.getObjB(); // or with array // const obsvArray = [obsvA, obsvB]; const zip = Observable.zip(obsvA, obsvB); // or with array // const zip = Observable.zip(...obsvArray); zip.subscribe( result => console.log(result), // result is an array with the responses [respA, respB] );
注意事项:
不需要是偶数个可观察值。
zip
视觉上
zip运算符将订阅所有内部可观测对象,等待每个可观测对象发出值。一旦发生这种情况,将发出具有相应索引的所有值。这将持续到至少一个内部可观察到的对象完成为止。
当一个可观察对象抛出一个错误(或什至两个都发生)时,订阅将关闭(onComplete
在完成时被调用),并且使用一种onError
方法,您只会得到第一个错误。
zip.subscribe( result => console.log(result), // result is an array with the responses [respA, respB] error => console.log(error), // will return the error message of the first observable that throws error and then finish it () => console.log ('completed after first error or if first observable finishes) );