在RxJava中,有Observable.OnSubscribe,当订阅了observable时会调用你的动作.
RxJs中有等价物吗?
自从提出这个问题以来,他们似乎已经把页面放下了.谷歌的缓存版本就在这里.
通过组合RxJs5的现有功能,可以轻松实现此功能.
Rx.Observable.prototype.doOnSubscribe = function (onSubscribe) {
return Rx.Observable.empty()
.do(null,null, onSubscribe)
.concat(this);
};
Rx.Observable.from([1,2,3])
.doOnSubscribe(() => console.log('subscribed to stream'))
.subscribe(console.log, null, () => console.log('completed'))
确实很像Mark的答案,但是如果您使用的是rxjs
版本5或更高版本,我宁愿使用纯函数而不是修补原型。
按照Ray的建议empty
concat
使用defer
,无需破解即可实现
import {defer} from 'rxjs/observable/defer';
import {Observable} from 'rxjs/Observable';
/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
.pipe(doOnSubscribe(() => console.log('subscribed to stream')))
.subscribe(x => console.log(x), null, () => console.log('completed'));
*/
export function doOnSubscribe(onSubscribe: () => void): (source: Observable) => Observable {
return function inner(source: Observable): Observable {
return defer(() => {
onSubscribe();
return source;
});
};
}
https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d
import {empty} from 'rxjs/observable/empty';
import {concat, tap} from 'rxjs/operators';
import {Observable} from 'rxjs/Observable';
/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
.pipe(doOnSubscribe(() => console.log('subscribed to stream')))
.subscribe(x => console.log(x), null, () => console.log('completed'));
*/
export function doOnSubscribe(callback: () => void): (source: Observable) => Observable {
return function inner(source: Observable): Observable {
return empty().pipe(
tap(null, null, callback),
concat(source)
);
};
}
另一个以Mark的答案为基础的例子是使用工厂函数:
Rx.Observable.prototype.doOnSubscribe = function(onSubscribe) {
const source = this;
return Rx.Observable.defer(() => {
onSubscribe();
return source;
});
}