我正在使用RxParse来解析查询的异步加载但是当我使用subscribeOn(Schedulers.io())订阅我的observable时,我的onCompleted方法永远不会在主线程上调用.而不是这个,我的onCompleted方法在工作线程池内调用.如果我使用observeOn(AndroidSchedulers.mainThread),一切都会工作,但我的onNextMethod也将在主线程上调用,我不想要它.
我的代码有问题吗?
我的代码有什么问题吗?
ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(AndroidSchedulers.handlerThread(new Handler())) .subscribe( new Subscriber() { @Override public void onError(Throwable e) { Log.e("error","error",e); } @Override public void onNext(T t) { // ... worker thread (but here is ok) } public void onCompleted() { // ... worker thread again instead of mainThread } } ) );
Tanis.7x.. 9
首先你要明白之间的差别subscribeOn()
和observeOn()
.这两个完全不同的运算符会影响Rx链的不同部分.
subscribeOn()
指定Observable将在哪里工作.这不会影响在那里onNext()
,onError()
和onComplete()
执行.
observeOn()
指定回调(例如onNext()
)的执行位置.它不会影响Observable的工作位置.
所有回调都将在同一个线程上发生.您不能指定某个回调在一个线程上发生,而某些回调在另一个线程上发生,通过任何RxJava API.如果这是您想要的行为,您将不得不在回调中自己实现它.
首先你要明白之间的差别subscribeOn()
和observeOn()
.这两个完全不同的运算符会影响Rx链的不同部分.
subscribeOn()
指定Observable将在哪里工作.这不会影响在那里onNext()
,onError()
和onComplete()
执行.
observeOn()
指定回调(例如onNext()
)的执行位置.它不会影响Observable的工作位置.
所有回调都将在同一个线程上发生.您不能指定某个回调在一个线程上发生,而某些回调在另一个线程上发生,通过任何RxJava API.如果这是您想要的行为,您将不得不在回调中自己实现它.
不幸的是,订阅在所有方法的同一个线程中(onNext
,onError
和onCompleted
但你可以在方法Schedulers.io()
内部和内部观察onNext(T t)
,创建一个新的Observable
,听听MainThread
如下:
ParseObservable.find(myQuery) .map(myMapFunc()) .subscribeOn(Schedulers.io()) .subscribe( new Subscriber() { @Override public void onError(Throwable e) { Log.e("error","error",e); } @Override public void onNext(T t) { Observable.just(t) .observeOn(AndroidSchedulers.mainThread()) .subscribe((t) -> { // do something in MainThread }) } public void onCompleted() { // ... worker thread again instead of mainThread } } ) );
我希望它有所帮助!