当前位置:  开发笔记 > 编程语言 > 正文

具有支持多线程的限制的异步任务的队列

如何解决《具有支持多线程的限制的异步任务的队列》经验,为你挑选了1个好方法。

我需要实现一个库来请求vk.com API.问题是API每秒只支持3个请求.我想让API异步.

重要: API应支持从多个线程安全访问.

我的想法是实现一个名为throttler的类,它允许不超过3个请求/秒并延迟其他请求.

接口是下一个:

public interface IThrottler : IDisposable
{
    Task Throttle(Func> task);
}

用法就像

var audio = await throttler.Throttle(() => api.MyAudio());
var messages = await throttler.Throttle(() => api.ReadMessages());
var audioLyrics = await throttler.Throttle(() => api.AudioLyrics(audioId));
/// Here should be delay because 3 requests executed
var photo = await throttler.Throttle(() => api.MyPhoto());

如何实施throttler?

目前我将其实现为由后台线程处理的队列.

public Task Throttle(Func> task)
{
    /// TaskRequest has method Run() to run task
    /// TaskRequest uses TaskCompletionSource to provide new task 
    /// which is resolved when queue processed til this element.
    var request = new TaskRequest(task);

    requestQueue.Enqueue(request);

    return request.ResultTask;
}

这是缩短处理队列的后台线程循环的代码:

private void ProcessQueue(object state)
{
    while (true)
    {
        IRequest request;
        while (requestQueue.TryDequeue(out request))
        {
            /// Delay method calculates actual delay value and calls Thread.Sleep()
            Delay();
            request.Run();
        }

    }
}

是否可以在没有后台线程的情况下实现它?



1> Servy..:

因此,我们将首先解决一个更简单的问题,即创建一个可同时处理多达N个任务的队列,而不是限制每秒启动的N个任务,并在此基础上构建:

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }
    public TaskQueue(int concurrentRequests)
    {
        semaphore = new SemaphoreSlim(concurrentRequests);
    }

    public async Task Enqueue(Func> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

我们还将使用以下帮助器方法将a的结果与TaskCompletionSource`Task:

public static void Match(this TaskCompletionSource tcs, Task task)
{
    task.ContinueWith(t =>
    {
        switch (t.Status)
        {
            case TaskStatus.Canceled:
                tcs.SetCanceled();
                break;
            case TaskStatus.Faulted:
                tcs.SetException(t.Exception.InnerExceptions);
                break;
            case TaskStatus.RanToCompletion:
                tcs.SetResult(t.Result);
                break;
        }

    });
}

public static void Match(this TaskCompletionSource tcs, Task task)
{
    Match(tcs, task.ContinueWith(t => default(T)));
}

现在,对于我们的实际解决方案,我们可以做的是每次我们需要执行限制操作时,我们创建一个TaskCompletionSource,然后进入我们TaskQueue并添加一个启动任务的项目,将TCS与其结果匹配,不等待它,然后将任务队列延迟1秒钟.然后,任务队列将不允许任务启动,直到在过去的第二秒中不再启动N个任务,而操作的结果本身与create相同Task:

public class Throttler
{
    private TaskQueue queue;
    public Throttler(int requestsPerSecond)
    {
        queue = new TaskQueue(requestsPerSecond);
    }
    public Task Enqueue(Func> taskGenerator)
    {
        TaskCompletionSource tcs = new TaskCompletionSource();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
    public Task Enqueue(Func taskGenerator)
    {
        TaskCompletionSource tcs = new TaskCompletionSource();
        var unused = queue.Enqueue(() =>
        {
            tcs.Match(taskGenerator());
            return Task.Delay(TimeSpan.FromSeconds(1));
        });
        return tcs.Task;
    }
}


@STO不,重要的是不要*使用它.这就是重点.队列仅关注*启动*给定操作后的1秒延迟.它根本不关心操作本身*何时完成*.那是你的前提.因此,只要在过去的第二个任务中启动了N个任务,它就可以确保您无法启动任务,并且让自下一个任务从最早的任务**启动后的1秒开始.
推荐阅读
yzh148448
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有