我在我的Android应用程序中使用RxJava,我想从数据库加载数据.
通过这种方式,我创建了一个新的Observable,使用Observable.create()
它返回一个列表EventLog
public Observable> loadEventLogs() { return Observable.create(new Observable.OnSubscribe
>() { @Override public void call(Subscriber super List
> subscriber) { List logs = new Select().from(DBEventLog.class).execute(); List eventLogs = new ArrayList<>(logs.size()); for (int i = 0; i < logs.size(); i++) { eventLogs.add(new EventLog(logs.get(i))); } subscriber.onNext(eventLogs); } }); }
虽然它可以正常工作,但我读到使用Observable.create()
它实际上并不是Rx Java的最佳实践(参见此处).
所以我用这种方式改变了这个方法.
public Observable> loadEventLogs() { return Observable.fromCallable(new Func0
>() { @Override public List
call() { List logs = new Select().from(DBEventLog.class).execute(); List eventLogs = new ArrayList<>(logs.size()); for (int i = 0; i < logs.size(); i++) { eventLogs.add(new EventLog(logs.get(i))); } return eventLogs; } }); }
这是使用Rx Java的更好方法吗?为什么?这两种方法实际上有什么区别?
而且,由于数据库加载了一个元素列表,有意义一次发出整个列表?或者我应该一次发出一个项目?
这两种方法可能看起来相似并且行为相似,但是fromCallable
处理背压的困难,而create
版本没有.处理OnSubscribe
实施中的背压范围从简单到完全的心灵融化; 但是,如果省略,您可能会MissingBackpressureException
沿着异步边界(例如observeOn
)或甚至连续边界(例如concat
).
RxJava试图为尽可能多的工厂和运营商提供适当的背压支持,但是,有不少工厂和运营商无法支持它.
手动OnSubscribe
实现的第二个问题是缺少取消支持,特别是如果您生成大量onNext
呼叫.其中许多可以由标准工厂方法(例如from
)或帮助程序类(例如SyncOnSubscribe
)替换,以处理所有复杂性.
您可能会发现很多介绍和示例(仍然)使用create
有两个原因.
通过展示事件推送如何以强制方式工作,引入基于推送的数据流要容易得多.在我看来,这些来源create
按比例花费太多时间,而不是谈论标准工厂方法,并展示如何安全地实现某些常见任务(如你的).
这些示例中的许多都是在RxJava不需要背压支持或甚至正确的同步取消支持时创建的,或者只是从Rx.NET示例中移植(迄今为止不支持背压和同步取消工作,C#I提供猜测.)通过调用onNext生成值当时是无忧无虑的.但是,这样的使用会导致缓冲区膨胀和内存使用过多,因此,Netflix团队提出了一种限制内存使用的方法,要求观察者说明他们愿意继续进行多少项目.这被称为背压.
对于第二个问题,即如果要创建List或值序列,则取决于您的来源.如果您的源支持某种迭代或单个数据元素的流式传输(例如JDBC),您可以只挂钩并逐个发出(请参阅参考资料SyncOnSubscribe
).如果它不支持它或者你需要它以List形式,那么保持它原样.您可以随时通过toList
并flatMapIterable
在必要时在两种表单之间进行转换.