我对基于异步的程序中的堆栈溢出感到惊讶.我怀疑主要问题是使用以下函数,它应该组成两个异步计算并行执行并等待两者完成:
let ( <|> ) (a: Async) (b: Async ) = async { let! x = Async.StartChild a let! y = Async.StartChild b do! x do! y }
有了这个定义,我有以下mapReduce
程序试图map
在reduce
部分和部分中利用并行性.非正式地,我们的想法是使用共享通道激发N
映射器和N-1
缩减器,等待它们完成,并从通道读取结果.我有自己的Channel
实现,这里替换ConcurrentBag
为更短的代码(问题影响两者):
let mapReduce (map : 'T1 -> Async<'T2>) (reduce : 'T2 -> 'T2 -> Async<'T2>) (input : seq<'T1>) : Async<'T2> = let bag = System.Collections.Concurrent.ConcurrentBag() let rec read () = async { match bag.TryTake() with | true, value -> return value | _ -> do! Async.Sleep 100 return! read () } let write x = bag.Add x async.Return () let reducer = async { let! x = read () let! y = read () let! r = reduce x y return bag.Add r } let work = input |> Seq.map (fun x -> async.Bind(map x, write)) |> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer) async { do! work return! read () }
现在,以下基本测试开始在n = 10000上抛出StackOverflowException:
let test n = let map x = async.Return x let reduce x y = async.Return (x + y) mapReduce map reduce [0..n] |> Async.RunSynchronously
编辑:<|>
组合器的替代实现使测试在N = 10000时成功:
let ( <|> ) (a: Async) (b: Async ) = Async.FromContinuations(fun (ok, _, _) -> let count = ref 0 let ok () = lock count (fun () -> match !count with | 0 -> incr count | _ -> ok ()) Async.Start <| async { do! a return ok () } Async.Start <| async { do! b return ok () })
这对我来说真的很令人惊讶,因为这是我所假设Async.StartChild
的.有关哪种解决方案最佳的想法?