在我的场景中,客户端发送"再见"websocket消息,我需要在服务器端关闭先前建立的连接.
来自akka-http 文档:
通过从服务器逻辑中取消传入连接流(例如,将其下游连接到Sink.cancelled,将其上游连接到Source.empty),可以关闭连接.也可以通过取消IncomingConnection源连接来关闭服务器的套接字.
但是我不清楚如何考虑到这一点Sink
并Source
在协商新连接时设置一次:
(get & path("ws")) { optionalHeaderValueByType[UpgradeToWebsocket]() { case Some(upgrade) ? val connectionId = UUID() complete(upgrade.handleMessagesWithSinkSource(sink, source)) case None ? reject(ExpectedWebsocketRequestRejection) } }
kiritsuku.. 5
提示:该答案基于akka-stream-experimental
版本2.0-M2
。该API在其他版本中可能会略有不同。
关闭连接的一种简单方法是使用PushStage
:
import akka.stream.stage._ val closeClient = new PushStage[String, String] { override def onPush(elem: String, ctx: Context[String]) = elem match { case "goodbye" ? // println("Connection closed") ctx.finish() case msg ? ctx.push(msg) } }
在客户端或服务器端接收到的每个元素(通常每个通过的元素Flow
)都经过这样的Stage
组件。在Akka中,称为完整抽象GraphStage
,有关更多信息,请参见官方文档。
使用a,PushStage
我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye
收到消息,我们将完成上下文,否则我们将通过push
方法转发值。
现在,我们可以closeClient
通过该transform
方法将组件连接到任意流:
val connection = Tcp().outgoingConnection(address, port) val flow = Flow[ByteString] .via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .transform(() ? closeClient) .map(_ ? StdIn.readLine("> ")) .map(_ + "\n") .map(ByteString(_)) connection.join(flow).run()
上面的流接收a ByteString
并返回a ByteString
,这意味着它可以connection
通过该join
方法连接到。在流程内部,我们首先将字节转换为字符串,然后再将其发送到closeClient
。如果PushStage
没有完成流,则该元素将在流中转发,在该流中该元素将被丢弃并由stdin的某些输入替代,然后通过导线将其发送回去。如果流完成,将删除阶段组件之后的所有其他流处理步骤-现在关闭流。
提示:该答案基于akka-stream-experimental
版本2.0-M2
。该API在其他版本中可能会略有不同。
关闭连接的一种简单方法是使用PushStage
:
import akka.stream.stage._ val closeClient = new PushStage[String, String] { override def onPush(elem: String, ctx: Context[String]) = elem match { case "goodbye" ? // println("Connection closed") ctx.finish() case msg ? ctx.push(msg) } }
在客户端或服务器端接收到的每个元素(通常每个通过的元素Flow
)都经过这样的Stage
组件。在Akka中,称为完整抽象GraphStage
,有关更多信息,请参见官方文档。
使用a,PushStage
我们可以观察具体的传入元素的值,然后相应地转换上下文。在上面的示例中,一旦goodbye
收到消息,我们将完成上下文,否则我们将通过push
方法转发值。
现在,我们可以closeClient
通过该transform
方法将组件连接到任意流:
val connection = Tcp().outgoingConnection(address, port) val flow = Flow[ByteString] .via(Framing.delimiter( ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) .map(_.utf8String) .transform(() ? closeClient) .map(_ ? StdIn.readLine("> ")) .map(_ + "\n") .map(ByteString(_)) connection.join(flow).run()
上面的流接收a ByteString
并返回a ByteString
,这意味着它可以connection
通过该join
方法连接到。在流程内部,我们首先将字节转换为字符串,然后再将其发送到closeClient
。如果PushStage
没有完成流,则该元素将在流中转发,在该流中该元素将被丢弃并由stdin的某些输入替代,然后通过导线将其发送回去。如果流完成,将删除阶段组件之后的所有其他流处理步骤-现在关闭流。