我仍在尝试学习RxJava。有一件事我现在无法束手无策。每一篇试图学习我如何使用Rx的文章都向我展示了如何基于已经“可预测”的源(即一定数量的项目的序列(单个值或简单的Iterable))创建Observable。 。
通常,您会看到一些类似的内容 Observable.just()
Observableobserverable = Observable.just("Hello, world!");
或Observable.from()
:
Observable.from("apple", "orange", "banana").subscribe(fruit -> System.out.println(fruit));
很好,但是下面的用例呢?
我有不断通过套接字推送的消息(我没有构建它,我只是在集成)。我需要“观察”通过套接字推送的数据序列。
很多人似乎都指向Obserable.using()
(这里是一个例子),但是我也不认为这是正确的解决方案。通过套接字推送的消息是不完整的,因为它们具有最大长度。我需要自己“构造”消息,因此需要在每次从套接字推送之间进行缓冲。
换句话说,我正在寻找一种方法来根据从套接字推入的数据自己构造消息,然后将其推入Observable。我一直在寻找在整个地方执行此操作的正确方法,但是我似乎找不到合适的解决方案。
怎么样Observable
通过完全自定义的行为吗?
Observable.create(subscriber -> { Socket socket = getSocket(); socket.subscribe(new SocketListener() { @Override public void onNewFrame(Frame frame) { // Process frame and prepare payload to the subscriber. if (payloadReadyForExternalObserver) { if (subscriber.isUnsubscribed()) { // Subscriber unsubscribed, let's close the socket. socket.close(); } else { subscriber.onNext(payload); } } } @Override public void onSocketError(IOException exception) { subscriber.onError(exception); // Terminal state. } @Override public void onSocketClosed() { subscriber.onCompleted(); // Terminal state. } }); })
但是请确保您正确实施了Observable合同,有关更多信息,请阅读https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators