我的问题很简单,关于Future.traverse方法.所以我有一个String-s列表.每个字符串都是网页的URL.然后我有一个类可以获取URL,加载网页并解析一些数据.所有这些都包含在Future {}中,因此结果将异步处理.
该类简化如下:
class RatingRetriever(context:ExecutionContext) { def resolveFilmToRating(url:String):Future[Option[Double]]={ Future{ //here it creates Selenium web driver, loads the url and parses it. }(context) } }
然后在另一个对象中我有这个:
implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2)) ....... val links:List[String] = films.map(film => film.asInstanceOf[WebElement].getAttribute("href")) val ratings: Future[List[Option[Double]]] = Future.traverse(links)(link => new RatingRetriever(executionContext).resolveFilmToRating(link))
当它工作时我绝对可以看到它按顺序进行收集.如果我将执行上下文从固定大小池更改为单线程池,则行为是相同的.所以我真的很想知道如何让Future.traverse并行工作.你能建议吗?
看看traverse的资料来源:
in.foldLeft(successful(cbf(in))) { (fr, a) => //we sequentially traverse Collection val fb = fn(a) //Your function comes here for (r <- fr; b <- fb) yield (r += b) //Just add elem to builder }.map(_.result()) //Getting the collection from builder
那么代码的并行程度取决于你的函数fn,看看两个例子:
1)此代码:
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global object FutureTraverse extends App{ def log(s: String) = println(s"${Thread.currentThread.getName}: $s") def withDelay(i: Int) = Future{ log(s"withDelay($i)") Thread.sleep(1000) i } val seq = 0 to 10 Future { for(i <- 0 to 5){ log(".") Thread.sleep(1000) } } val resultSeq = Future.traverse(seq)(withDelay(_)) Thread.sleep(6000) }
有这样的输出:
ForkJoinPool-1-worker-5: . ForkJoinPool-1-worker-3: withDelay(0) ForkJoinPool-1-worker-1: withDelay(1) ForkJoinPool-1-worker-7: withDelay(2) ForkJoinPool-1-worker-5: . ForkJoinPool-1-worker-3: withDelay(3) ForkJoinPool-1-worker-1: withDelay(4) ForkJoinPool-1-worker-7: withDelay(5) ForkJoinPool-1-worker-5: . ForkJoinPool-1-worker-3: withDelay(6) ForkJoinPool-1-worker-1: withDelay(7) ForkJoinPool-1-worker-7: withDelay(8) ForkJoinPool-1-worker-5: . ForkJoinPool-1-worker-3: withDelay(9) ForkJoinPool-1-worker-1: withDelay(10) ForkJoinPool-1-worker-5: . ForkJoinPool-1-worker-5: .
2)只需更改withDelay函数:
def withDelay(i: Int) = { Thread.sleep(1000) Future { log(s"withDelay($i)") i } }
你会得到一个顺序输出:
ForkJoinPool-1-worker-7: . ForkJoinPool-1-worker-7: . ForkJoinPool-1-worker-5: withDelay(0) ForkJoinPool-1-worker-7: . ForkJoinPool-1-worker-1: withDelay(1) ForkJoinPool-1-worker-7: . ForkJoinPool-1-worker-1: withDelay(2) ForkJoinPool-1-worker-7: . ForkJoinPool-1-worker-1: withDelay(3) ForkJoinPool-1-worker-7: . ForkJoinPool-1-worker-1: withDelay(4) ForkJoinPool-1-worker-7: withDelay(5) ForkJoinPool-1-worker-1: withDelay(6) ForkJoinPool-1-worker-1: withDelay(7) ForkJoinPool-1-worker-7: withDelay(8) ForkJoinPool-1-worker-7: withDelay(9) ForkJoinPool-1-worker-7: withDelay(10)
所以Future.traverse不一定是并行的,它只是提交任务,它可以按顺序执行,整个并行的东西都在你提交的函数中.
Scala Future.traverse
确实可以并行工作。并行执行多少由ExecutionContext
!在下面,Scala Future
只是在上调度任务java.util.concurrent.ExecutorService
。如果有线程可用,则直接执行任务。否则,它将安排在可用时运行。
很难看出Future.traverse
实现中的并行性来自哪里
def traverse(in: M[A])(fn: A => Future[B]) = in.foldLeft(successful(cbf(in))) { (fr, a) => val fb = fn(a) for (r <- fr; b <- fb) yield (r += b) }.map(_.result())
但是这里的窍门是fb
在理解之前就定义!通过执行fn
功能并因此创建Future
实例,可以安排此Future立即运行。理解力等待将来完成并将结果添加到累加器中。
通过选择其他选项,可以很容易地看到其并行性 ExecutionContext
val tp1 = java.concurrent.Executors.newFixedThreadPool(1) implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(tp1) Future.traverse((1 to 5)) { n => Future { sleep; println(n); n }} 1 2 3 4 5
当增加线程数时,这些函数将并行运行
import scala.util.Random import scala.concurrent.Future def sleep = Thread.sleep(100 + Random.nextInt(1000)) val tp5 = java.util.concurrent.Executors.newFixedThreadPool(5) implicit val ec = scala.concurrent.ExecutionContext.fromExecutorService(tp5) Future.traverse((1 to 5)) { n => Future { sleep; println(n); n }} 3 2 4 5 1