当前位置:  开发笔记 > 编程语言 > 正文

从服务器关闭akka-http websocket连接

如何解决《从服务器关闭akka-httpwebsocket连接》经验,为你挑选了1个好方法。

在我的场景中,客户端发送"再见"websocket消息,我需要在服务器端关闭先前建立的连接.

来自akka-http 文档:

通过从服务器逻辑中取消传入连接流(例如,将其下游连接到Sink.cancelled,将其上游连接到Source.empty),可以关闭连接.也可以通过取消IncomingConnection源连接来关闭服务器的套接字.

但是我不清楚如何考虑到这一点SinkSource在协商新连接时设置一次:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ?
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ?
      reject(ExpectedWebsocketRequestRejection)
  }
}

kiritsuku.. 5

提示:该答案基于akka-stream-experimental版本2.0-M2。该API在其他版本中可能会略有不同。


关闭连接的一种简单方法是使用PushStage

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ?
      // println("Connection closed")
      ctx.finish()
    case msg ?
      ctx.push(msg)
  }
}

在客户端或服务器端接收到的每个元素(通常每个通过的元素Flow)都经过这样的Stage组件。在Akka中,称为完整抽象GraphStage,有关更多信息,请参见官方文档。

使用a,PushStage我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye收到消息,我们将完成上下文,否则我们将通过push方法转发值。

现在,我们可以closeClient通过该transform方法将组件连接到任意流:

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ? closeClient)
  .map(_ ? StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

上面的流接收a ByteString并返回a ByteString,这意味着它可以connection通过该join方法连接到。在流程内部,我们首先将字节转换为字符串,然后再将其发送到closeClient。如果PushStage没有完成流,则该元素将在流中转发,在该流中该元素将被丢弃并由stdin的某些输入替代,然后通过导线将其发送回去。如果流完成,将删除阶段组件之后的所有其他流处理步骤-现在关闭流。



1> kiritsuku..:

提示:该答案基于akka-stream-experimental版本2.0-M2。该API在其他版本中可能会略有不同。


关闭连接的一种简单方法是使用PushStage

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ?
      // println("Connection closed")
      ctx.finish()
    case msg ?
      ctx.push(msg)
  }
}

在客户端或服务器端接收到的每个元素(通常每个通过的元素Flow)都经过这样的Stage组件。在Akka中,称为完整抽象GraphStage,有关更多信息,请参见官方文档。

使用a,PushStage我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye收到消息,我们将完成上下文,否则我们将通过push方法转发值。

现在,我们可以closeClient通过该transform方法将组件连接到任意流:

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ? closeClient)
  .map(_ ? StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

上面的流接收a ByteString并返回a ByteString,这意味着它可以connection通过该join方法连接到。在流程内部,我们首先将字节转换为字符串,然后再将其发送到closeClient。如果PushStage没有完成流,则该元素将在流中转发,在该流中该元素将被丢弃并由stdin的某些输入替代,然后通过导线将其发送回去。如果流完成,将删除阶段组件之后的所有其他流处理步骤-现在关闭流。

推荐阅读
可爱的天使keven_464
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有