我正在尝试使用Akka HTTP来基本验证我的请求.碰巧我有一个外部资源来进行身份验证,因此我必须对此资源进行休息调用.
这需要一些时间,并且在处理时,我的API的其余部分似乎被阻止,等待此调用.我用一个非常简单的例子重现了这个:
// used dispatcher: implicit val system = ActorSystem() implicit val executor = system.dispatcher implicit val materializer = ActorMaterializer() val routes = (post & entity(as[String])) { e => complete { Future{ Thread.sleep(5000) e } } } ~ (get & path(Segment)) { r => complete { "get" } }
如果我发布到日志端点,我的get端点也会等待5秒,这是日志端点所指示的.
这是预期的行为,如果是,如何在不阻止整个API的情况下进行阻止操作?
你观察到的是预期的行为 - 当然它非常糟糕.很好,已知的解决方案和最佳实践可以防范它.在这个答案中,我想花一些时间来解释这个问题的简短,长久,然后深入 - 享受阅读!
简短的回答:" 不要阻止路由基础设施! ",总是使用专门的调度程序来阻止操作!
观察到的症状的原因:问题是您正在使用context.dispatcher
阻塞期货执行的调度程序.路由基础结构使用相同的调度程序(简单地称为"一堆线程")来实际处理传入的请求 - 因此,如果阻止所有可用线程,则最终会使路由基础结构处于饥饿状态.(争论和基准测试的一点是,如果Akka HTTP可以保护这一点,我会将其添加到我的研究todo-list中).
必须特别小心处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们将执行分离到不同的执行程序如此简单),如Akka文档部分所述:阻塞需要仔细管理.
我想引起注意的其他事情是,如果可能的话,应该尽量避免阻止API - 如果你的长时间运行操作不是真正的一个操作,而是一系列操作,你可以将它们分成不同的参与者或顺序未来.无论如何,只是想指出 - 如果可能的话,避免这种阻止呼叫,但如果你必须 - 那么下面将解释如何正确处理这些.
深入分析和解决方案:
现在我们知道出了什么问题,从概念上讲,让我们看看上面的代码究竟是什么,以及如何正确解决这个问题:
颜色=线程状态:
绿松石 - 睡觉
橙色 - 等待
绿色 - RUNNABLE
现在让我们调查3段代码以及调度程序的影响以及应用程序的性能.要强制执行此操作,应用程序已置于以下负载下:
[a]继续请求GET请求(参见上面的代码中的初始问题),它没有阻止
[b]然后经过一段时间的火灾2000 POST请求,这将导致5秒阻塞,然后返回未来
1) [bad]
坏代码上的调度程序行为:
// BAD! (due to the blocking in Future): implicit val defaultDispatcher = system.dispatcher val routes: Route = post { complete { Future { // uses defaultDispatcher Thread.sleep(5000) // will block on the default dispatcher, System.currentTimeMillis().toString // starving the routing infra } } }
所以我们将我们的app暴露给[a]加载,你可以看到许多akka.actor.default-dispatcher线程 - 他们正在处理请求 - 小绿色片段,而橙色意味着其他人实际上在那里闲置.
然后我们启动[b]加载,这导致阻塞这些线程 - 你可以看到早期线程"default-dispatcher-2,3,4"在之前空闲之后进入阻塞状态.我们还观察到池增长 - 新线程启动"default-dispatcher-18,19,20,21 ......"但是他们立即进入睡眠状态(!) - 我们在这里浪费宝贵的资源!
此类启动线程的数量取决于默认的调度程序配置,但可能不会超过50左右.由于我们刚刚解雇了2k阻塞操作,我们将整个线程池挨饿 - 阻塞操作占主导地位,因此路由infra没有可用于处理其他请求的线程 - 非常糟糕!
让我们做一些事情(这是一个Akka最佳实践btw - 总是隔离阻塞行为,如下所示):
2) [good!]
调度程序行为良好的结构化代码/调度程序:
在您的application.conf
配置中,此调度程序专用于阻止行为:
my-blocking-dispatcher { type = Dispatcher executor = "thread-pool-executor" thread-pool-executor { // in Akka previous to 2.4.2: core-pool-size-min = 16 core-pool-size-max = 16 max-pool-size-min = 16 max-pool-size-max = 16 // or in Akka 2.4.2+ fixed-pool-size = 16 } throughput = 100 }
您应该在Akka Dispatchers文档中阅读更多内容,以了解各种选项.但重点是我们选择了一个ThreadPoolExecutor
具有线程硬限制的线程,它可以用于阻塞操作.大小设置取决于您的应用程序的功能以及服务器具有的核心数.
接下来我们需要使用它,而不是默认的:
// GOOD (due to the blocking in Future): implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher") val routes: Route = post { complete { Future { // uses the good "blocking dispatcher" that we configured, // instead of the default dispatcher – the blocking is isolated. Thread.sleep(5000) System.currentTimeMillis().toString } } }
我们使用相同的负载对应用程序施压,首先是一些正常的请求,然后我们添加阻塞的请求.这是ThreadPools在这种情况下的行为方式:
所以最初普通请求很容易被默认调度程序处理,你可以在那里看到一些绿线 - 这是实际的执行(我实际上并没有把服务器置于高负载下,所以它主要是空闲的).
现在,当我们开始发布阻塞操作时,my-blocking-dispatcher-*
启动并启动配置线程的数量.它处理所有睡眠.此外,在这些线程上发生一段时间后,它会关闭它们.如果我们用另一堆阻塞命中服务器,那么池会启动新的线程来处理睡眠() - 但同时 - 我们不会浪费我们宝贵的线程"只是呆在那里没做什么".
使用此设置时,正常GET请求的吞吐量没有受到影响,他们仍然很乐意在(仍然相当免费)默认调度程序上提供服务.
这是处理反应式应用程序中任何类型阻塞的推荐方法.它经常被称为"批量处理"(或"隔离")应用程序的不良行为部分,在这种情况下,不良行为是睡眠/阻塞.
3) [workaround-ish]
blocking
正确应用时的调度程序行为:
在这个例子中,我们使用scaladoc forscala.concurrent.blocking
method,它可以在遇到阻塞操作时提供帮助.它通常会导致更多的线程被旋转以在阻塞操作中存活.
// OK, default dispatcher but we'll use `blocking` implicit val dispatcher = system.dispatcher val routes: Route = post { complete { Future { // uses the default dispatcher (it's a Fork-Join Pool) blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, // but at the cost of exploding the number of threads (which eventually // may also lead to starvation problems, but on a different layer) Thread.sleep(5000) System.currentTimeMillis().toString } } } }
该应用程序将表现如下:
您会注意到创建了很多新线程,这是因为阻止提示"哦,这将是阻塞,因此我们需要更多线程".这导致我们被阻止的总时间小于1)示例,但是在阻塞操作完成后我们有数百个线程无所事事......当然,它们最终会被关闭(FJP会这样做) ),但有一段时间我们将运行大量(不受控制的)线程,与2)解决方案相反,我们确切地知道我们专门用于阻塞行为的线程数.
总结:永远不要阻止默认调度员:-)
最佳实践是使用中所示的模式2)
,为可用的阻塞操作提供调度程序,并在那里执行它们.
希望这有帮助,快乐的hakking!
讨论了Akka HTTP版本:2.0.1
使用的Profiler:很多人私下问我这个回答是什么用于在上面的图片中可视化Thread状态的探查器,所以在这里添加这些信息:我使用的是YourKit,这是一个很棒的商业探测器(OSS免费),使用OpenJDK中的免费VisualVM可以获得相同的结果.