我正在尝试在我的Akka Http服务器上创建一个端点,它使用外部服务告诉用户它的IP地址(我知道这可以更轻松地执行,但我这样做是一个挑战).
在最上层没有使用流的代码是这样的:
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val requestHandler: HttpRequest => Future[HttpResponse] = { case HttpRequest(GET, Uri.Path("/"), _, _, _) => Http().singleRequest(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))).flatMap { response => response.entity.dataBytes.runFold(ByteString(""))(_ ++ _) map { string => HttpResponse(entity = HttpEntity(MediaTypes.`text/html`, "" + string.utf8String + "
")) } } case _: HttpRequest => Future(HttpResponse(404, entity = "Unknown resource!")) } Http().bindAndHandleAsync(requestHandler, "localhost", 8080)
它工作正常.然而,作为一个挑战,我想限制自己只使用流(没有Future
).
这是我认为我会用于这种方法的布局:
Source[Request] -> Flow[Request, Request] -> Flow[Request, Response] ->Flow[Response, Response]
并且也适用于404路线Source[Request] -> Flow[Request, Response]
.现在,如果我的Akka Stream知识对我有用,我需要使用一个Flow.fromGraph
这样的东西,然而,这是我被困住的地方.
在一个Future
我可以为各种端点做一个简单的map和flatMap,但在流中意味着将Flow分成多个Flow并且我不太确定我是如何做到的.我想过使用UnzipWith和Options或通用广播.
对此主题的任何帮助将不胜感激.
如果有必要,我不这样做吗?- http://doc.akka.io/docs/akka-stream-and-http-experimental/2.0-M2/scala/stream-customize.html
你不需要使用Flow.fromGraph
.相反,使用的单一流程flatMapConcat
将起作用:
//an outgoing connection flow val checkIPFlow = Http().outgoingConnection("checkip.amazonaws.com") //converts the final html String to an HttpResponse def byteStrToResponse(byteStr : ByteString) = HttpResponse(entity = new Default(MediaTypes.`text/html`, byteStr.length, Source.single(byteStr))) val reqResponseFlow = Flow[HttpRequest].flatMapConcat[HttpResponse]( _ match { case HttpRequest(GET, Uri.Path("/"), _, _, _) => Source.single(HttpRequest(GET, Uri("http://checkip.amazonaws.com/"))) .via(checkIPFlow) .mapAsync(1)(_.entity.dataBytes.runFold(ByteString(""))(_ ++ _)) .map("" + _.utf8String + "
") .map(ByteString.apply) .map(byteStrToResponse) case _ => Source.single(HttpResponse(404, entity = "Unknown resource!")) })
然后,可以使用此Flow绑定到传入的请求:
Http().bindAndHandle(reqResponseFlow, "localhost", 8080)
所有没有期货......