当前位置:  开发笔记 > 编程语言 > 正文

Akka HTTP:将来阻止阻止服务器

如何解决《AkkaHTTP:将来阻止阻止服务器》经验,为你挑选了1个好方法。

我正在尝试使用Akka HTTP来基本验证我的请求.碰巧我有一个外部资源来进行身份验证,因此我必须对此资源进行休息调用.

这需要一些时间,并且在处理时,我的API的其余部分似乎被阻止,等待此调用.我用一个非常简单的例子重现了这个:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

如果我发布到日志端点,我的get端点也会等待5秒,这是日志端点所指示的.

这是预期的行为,如果是,如何在不阻止整个API的情况下进行阻止操作?



1> Konrad 'ktos..:

你观察到的是预期的行为 - 当然它非常糟糕.很好,已知的解决方案和最佳实践可以防范它.在这个答案中,我想花一些时间来解释这个问题的简短,长久,然后深入 - 享受阅读!

简短的回答:" 不要阻止路由基础设施! ",总是使用专门的调度程序来阻止操作!

观察到的症状的原因:问题是您正在使用context.dispatcher阻塞期货执行的调度程序.路由基础结构使用相同的调度程序(简单地称为"一堆线程")来实际处理传入的请求 - 因此,如果阻止所有可用线程,则最终会使路由基础结构处于饥饿状态.(争论和基准测试的一点是,如果Akka HTTP可以保护这一点,我会将其添加到我的研究todo-list中).

必须特别小心处理阻塞,以免影响同一调度程序的其他用户(这就是为什么我们将执行分离到不同的执行程序如此简单),如Akka文档部分所述:阻塞需要仔细管理.

我想引起注意的其他事情是,如果可能的话,应该尽量避免阻止API - 如果你的长时间运行操作不是真正的一个操作,而是一系列操作,你可以将它们分成不同的参与者或顺序未来.无论如何,只是想指出 - 如果可能的话,避免这种阻止呼叫,但如果你必须 - 那么下面将解释如何正确处理这些.

深入分析和解决方案:

现在我们知道出了什么问题,从概念上讲,让我们看看上面的代码究竟是什么,以及如何正确解决这个问题:

颜色=线程状态:

绿松石 - 睡觉

橙色 - 等待

绿色 - RUNNABLE

现在让我们调查3段代码以及调度程序的影响以及应用程序的性能.要强制执行此操作,应用程序已置于以下负载下:

[a]继续请求GET请求(参见上面的代码中的初始问题),它没有阻止

[b]然后经过一段时间的火灾2000 POST请求,这将导致5秒阻塞,然后返回未来

1) [bad]坏代码上的调度程序行为:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

所以我们将我们的app暴露给[a]加载,你可以看到许多akka.actor.default-dispatcher线程 - 他们正在处理请求 - 小绿色片段,而橙色意味着其他人实际上在那里闲置.

阻止是杀死默认调度程序

然后我们启动[b]加载,这导致阻塞这些线程 - 你可以看到早期线程"default-dispatcher-2,3,4"在之前空闲之后进入阻塞状态.我们还观察到池增长 - 新线程启动"default-dispatcher-18,19,20,21 ......"但是他们立即进入睡眠状态(!) - 我们在这里浪费宝贵的资源!

此类启动线程的数量取决于默认的调度程序配置,但可能不会超过50左右.由于我们刚刚解雇了2k阻塞操作,我们将整个线程池挨饿 - 阻塞操作占主导地位,因此路由infra没有可用于处理其他请求的线程 - 非常糟糕!

让我们做一些事情(这是一个Akka最佳实践btw - 总是隔离阻塞行为,如下所示):

2) [good!]调度程序行为良好的结构化代码/调度程序:

在您的application.conf配置中,此调度程序专用于阻止行为:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

您应该在Akka Dispatchers文档中阅读更多内容,以了解各种选项.但重点是我们选择了一个ThreadPoolExecutor具有线程硬限制的线程,它可以用于阻塞操作.大小设置取决于您的应用程序的功能以及服务器具有的核心数.

接下来我们需要使用它,而不是默认的:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

我们使用相同的负载对应用程序施压,首先是一些正常的请求,然后我们添加阻塞的请求.这是ThreadPools在这种情况下的行为方式:

阻塞池可以满足我们的需求

所以最初普通请求很容易被默认调度程序处理,你可以在那里看到一些绿线 - 这是实际的执行(我实际上并没有把服务器置于高负载下,所以它主要是空闲的).

现在,当我们开始发布阻塞操作时,my-blocking-dispatcher-*启动并启动配置线程的数量.它处理所有睡眠.此外,在这些线程上发生一段时间后,它会关闭它们.如果我们用另一堆阻塞命中服务器,那么池会启动新的线程来处理睡眠() - 但同时 - 我们不会浪费我们宝贵的线程"只是呆在那里没做什么".

使用此设置时,正常GET请求的吞吐量没有受到影响,他们仍然很乐意在(仍然相当免费)默认调度程序上提供服务.

这是处理反应式应用程序中任何类型阻塞的推荐方法.它经常被称为"批量处理"(或"隔离")应用程序的不良行为部分,在这种情况下,不良行为是睡眠/阻塞.

3) [workaround-ish]blocking正确应用时的调度程序行为:

在这个例子中,我们使用scaladoc forscala.concurrent.blocking method,它可以在遇到阻塞操作时提供帮助.它通常会导致更多的线程被旋转以在阻塞操作中存活.

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

该应用程序将表现如下:

阻塞导致更多线程被启动

您会注意到创建了很多新线程,这是因为阻止提示"哦,这将是阻塞,因此我们需要更多线程".这导致我们被阻止的总时间小于1)示例,但是在阻塞操作完成后我们有数百个线程无所事事......当然,它们最终会被关闭(FJP会这样做) ),但有一段时间我们将运行大量(不受控制的)线程,与2)解决方案相反,我们确切地知道我们专门用于阻塞行为的线程数.

总结:永远不要阻止默认调度员:-)

最佳实践是使用中所示的模式2),为可用的阻塞操作提供调度程序,并在那里执行它们.

希望这有帮助,快乐的hakking!

讨论了Akka HTTP版本:2.0.1

使用的Profiler:很多人私下问我这个回答是什么用于在上面的图片中可视化Thread状态的探查器,所以在这里添加这些信息:我使用的是YourKit,这是一个很棒的商业探测器(OSS免费),使用OpenJDK中的免费VisualVM可以获得相同的结果.

推荐阅读
Gbom2402851125
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有