当前位置:  开发笔记 > 编程语言 > 正文

不使用Observable.create创建Observable

如何解决《不使用Observable.create创建Observable》经验,为你挑选了1个好方法。

我在我的Android应用程序中使用RxJava,我想从数据库加载数据.

通过这种方式,我创建了一个新的Observable,使用Observable.create()它返回一个列表EventLog

public Observable> loadEventLogs() {
    return Observable.create(new Observable.OnSubscribe>() {
        @Override
        public void call(Subscriber> subscriber) {
            List logs = new Select().from(DBEventLog.class).execute();
            List eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            subscriber.onNext(eventLogs);
        }
    });
}

虽然它可以正常工作,但我读到使用Observable.create()它实际上并不是Rx Java的最佳实践(参见此处).

所以我用这种方式改变了这个方法.

public Observable> loadEventLogs() {
    return Observable.fromCallable(new Func0>() {
        @Override
        public List call() {
            List logs = new Select().from(DBEventLog.class).execute();
            List eventLogs = new ArrayList<>(logs.size());
            for (int i = 0; i < logs.size(); i++) {
                eventLogs.add(new EventLog(logs.get(i)));
            }
            return eventLogs;
        }
    });
}

这是使用Rx Java的更好方法吗?为什么?这两种方法实际上有什么区别?

而且,由于数据库加载了一个元素列表,有意义一次发出整个列表?或者我应该一次发出一个项目?



1> akarnokd..:

这两种方法可能看起来相似并且行为相似,但是fromCallable处理背压的困难,而create版本没有.处理OnSubscribe实施中的背压范围从简单到完全的心灵融化; 但是,如果省略,您可能会MissingBackpressureException沿着异步边界(例如observeOn)或甚至连续边界(例如concat).

RxJava试图为尽可能多的工厂和运营商提供适当的背压支持,但是,有不少工厂和运营商无法支持它.

手动OnSubscribe实现的第二个问题是缺少取消支持,特别是如果您生成大量onNext呼叫.其中许多可以由标准工厂方法(例如from)或帮助程序类(例如SyncOnSubscribe)替换,以处理所有复杂性.

您可能会发现很多介绍和示例(仍然)使用create有两个原因.

    通过展示事件推送如何以强制方式工作,引入基于推送的数据流要容易得多.在我看来,这些来源create按比例花费太多时间,而不是谈论标准工厂方法,并展示如何安全地实现某些常见任务(如你的).

    这些示例中的许多都是在RxJava不需要背压支持或甚至正确的同步取消支持时创建的,或者只是从Rx.NET示例中移植(迄今为止不支持背压和同步取消工作,C#I提供猜测.)通过调用onNext生成值当时是无忧无虑的.但是,这样的使用会导致缓冲区膨胀和内存使用过多,因此,Netflix团队提出了一种限制内存使用的方法,要求观察者说明他们愿意继续进行多少项目.这被称为背压.

对于第二个问题,即如果要创建List或值序列,则取决于您的来源.如果您的源支持某种迭代或单个数据元素的流式传输(例如JDBC),您可以只挂钩并逐个发出(请参阅参考资料SyncOnSubscribe).如果它不支持它或者你需要它以List形式,那么保持它原样.您可以随时通过toListflatMapIterable在必要时在两种表单之间进行转换.


单元测试是一个很好的示例来源:https://github.com/ReactiveX/RxJava/blob/1.x/src/test/java/rx/observables/SyncOnSubscribeTest.java
推荐阅读
linjiabin43
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有