我们暂时考虑以下代码
Rx.Observable.merge( Rx.Observable.just(1), Rx.Observable.just(1).delay(1000) ).distinctUntilChanged() .subscribe(x => console.log(x))
我们希望1
只记录一次.但是,如果我们想要允许重复一个值,如果它的最后一次发射是可配置的时间量呢?我的意思是记录两个事件.
例如,拥有如下内容会很酷
Rx.Observable.merge( Rx.Observable.just(1), Rx.Observable.just(1).delay(1000) ).distinctUntilChanged(1000) .subscribe(x => console.log(x))
其中distinctUntilChanged()
接受某种超时以允许重复下一个元素.然而,这样的事情不存在,我想知道是否有人知道通过使用高级操作员实现这一点的优雅方式,而不会弄乱需要处理状态的过滤器
除非我误解,否则我很确定这可以通过以下相对直接的方式完成windowTime
:
Observable .merge( Observable.of(1), Observable.of(1).delay(250), // Ignored Observable.of(1).delay(700), // Ignored Observable.of(1).delay(2000), Observable.of(1).delay(2200), //Ignored Observable.of(2).delay(2300) ) // Converts the stream into a stream of streams each 1000 milliseconds long .windowTime(1000) // Flatten each of the streams and emit only the latest (there should only be one active // at a time anyway // We apply the distinctUntilChanged to the windows before flattening .switchMap(source => source.distinctUntilChanged()) .timeInterval() .subscribe( value => console.log(value), error => console.log('error: ' + error), () => console.log('complete') );
请参阅此处的示例(借用@ martin的示例输入)