我需要实现一个库来请求vk.com API.问题是API每秒只支持3个请求.我想让API异步.
重要: API应支持从多个线程安全访问.
我的想法是实现一个名为throttler的类,它允许不超过3个请求/秒并延迟其他请求.
接口是下一个:
public interface IThrottler : IDisposable { TaskThrottle (Func > task); }
用法就像
var audio = await throttler.Throttle(() => api.MyAudio()); var messages = await throttler.Throttle(() => api.ReadMessages()); var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId)); /// Here should be delay because 3 requests executed var photo = await throttler.Throttle(() => api.MyPhoto());
如何实施throttler?
目前我将其实现为由后台线程处理的队列.
public TaskThrottle (Func > task) { /// TaskRequest has method Run() to run task /// TaskRequest uses TaskCompletionSource to provide new task /// which is resolved when queue processed til this element. var request = new TaskRequest (task); requestQueue.Enqueue(request); return request.ResultTask; }
这是缩短处理队列的后台线程循环的代码:
private void ProcessQueue(object state) { while (true) { IRequest request; while (requestQueue.TryDequeue(out request)) { /// Delay method calculates actual delay value and calls Thread.Sleep() Delay(); request.Run(); } } }
是否可以在没有后台线程的情况下实现它?
因此,我们将首先解决一个更简单的问题,即创建一个可同时处理多达N个任务的队列,而不是限制每秒启动的N个任务,并在此基础上构建:
public class TaskQueue { private SemaphoreSlim semaphore; public TaskQueue() { semaphore = new SemaphoreSlim(1); } public TaskQueue(int concurrentRequests) { semaphore = new SemaphoreSlim(concurrentRequests); } public async TaskEnqueue (Func > taskGenerator) { await semaphore.WaitAsync(); try { return await taskGenerator(); } finally { semaphore.Release(); } } public async Task Enqueue(Func taskGenerator) { await semaphore.WaitAsync(); try { await taskGenerator(); } finally { semaphore.Release(); } } }
我们还将使用以下帮助器方法将a的结果与TaskCompletionSource
`Task:
public static void Match(this TaskCompletionSource tcs, Task task) { task.ContinueWith(t => { switch (t.Status) { case TaskStatus.Canceled: tcs.SetCanceled(); break; case TaskStatus.Faulted: tcs.SetException(t.Exception.InnerExceptions); break; case TaskStatus.RanToCompletion: tcs.SetResult(t.Result); break; } }); } public static void Match (this TaskCompletionSource tcs, Task task) { Match(tcs, task.ContinueWith(t => default(T))); }
现在,对于我们的实际解决方案,我们可以做的是每次我们需要执行限制操作时,我们创建一个TaskCompletionSource
,然后进入我们TaskQueue
并添加一个启动任务的项目,将TCS与其结果匹配,不等待它,然后将任务队列延迟1秒钟.然后,任务队列将不允许任务启动,直到在过去的第二秒中不再启动N个任务,而操作的结果本身与create相同Task
:
public class Throttler { private TaskQueue queue; public Throttler(int requestsPerSecond) { queue = new TaskQueue(requestsPerSecond); } public TaskEnqueue (Func > taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource (); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } public Task Enqueue (Func taskGenerator) { TaskCompletionSource tcs = new TaskCompletionSource (); var unused = queue.Enqueue(() => { tcs.Match(taskGenerator()); return Task.Delay(TimeSpan.FromSeconds(1)); }); return tcs.Task; } }