当前位置:  开发笔记 > 编程语言 > 正文

RxJava:观察套接字发出的消息

如何解决《RxJava:观察套接字发出的消息》经验,为你挑选了1个好方法。

我仍在尝试学习RxJava。有一件事我现在无法束手无策。每一篇试图学习我如何使用Rx的文章都向我展示了如何基于已经“可预测”的源(即一定数量的项目的序列(单个值或简单的Iterable))创建Observable。 。

通常,您会看到一些类似的内容 Observable.just()

Observable observerable = Observable.just("Hello, world!");

Observable.from()

Observable.from("apple", "orange", "banana").subscribe(fruit -> System.out.println(fruit));

很好,但是下面的用例呢?

我有不断通过套接字推送的消息(我没有构建它,我只是在集成)。我需要“观察”通过套接字推送的数据序列。

很多人似乎都指向Obserable.using()(这里是一个例子),但是我也不认为这是正确的解决方案。通过套接字推送的消息是不完整的,因为它们具有最大长度。我需要自己“构造”消息,因此需要在每次从套接字推送之间进行缓冲。

换句话说,我正在寻找一种方法来根据从套接字推入的数据自己构造消息,然后将其推入Observable。我一直在寻找在整个地方执行此操作的正确方法,但是我似乎找不到合适的解决方案。



1> Artem Zinnat..:

怎么样Observable通过完全自定义的行为吗?

Observable.create(subscriber -> {
  Socket socket = getSocket();
  socket.subscribe(new SocketListener() {

    @Override public void onNewFrame(Frame frame) {
      // Process frame and prepare payload to the subscriber.

      if (payloadReadyForExternalObserver) {
        if (subscriber.isUnsubscribed()) {
          // Subscriber unsubscribed, let's close the socket.
          socket.close();
        } else {
          subscriber.onNext(payload);
        }
      }
    }

    @Override public void onSocketError(IOException exception) {
      subscriber.onError(exception); // Terminal state.
    }

    @Override public void onSocketClosed() {
      subscriber.onCompleted(); // Terminal state.
    }
  });
})

但是请确保您正确实施了Observable合同,有关更多信息,请阅读https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators

推荐阅读
wangtao
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有