当前位置:  开发笔记 > 编程语言 > 正文

TPL Dataflow - Parallel&Async processing, while keeping order

如何解决《TPLDataflow-Parallel&Asyncprocessing,whilekeepingorder》经验,为你挑选了2个好方法。

I created a TPL Dataflow pipeline which consist of 3 TransformBlock's and an ActionBlock at the end.

var loadXml = new TransformBlock(job => { ... }); // I/O
var validateData = new TransformBlock(job => { ... }); // Parsing&Validating&Calculations
var importJob = new TransformBlock(job => { ... }); // Saving to database

var loadingFailed = new ActionBlock(job => CreateResponse(job));
var validationFailed = new ActionBlock(job => CreateResponse(job));
var reportImport = new ActionBlock(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

Each block will fill the Job-object with the processed data, since i not only need the data itself but also general information, that i need for the response messages. I pretty much add a path to an XML and get a Response-object with information if everything went right.

How can i achieve so that if two or more files come in that take some time to read from HDD, it reads both files parallel and async, while keeping the order they came in? If file1 takes much more time, file 2 would need to wait for file1 to finish before i pass the data over to the next Block and then it will also start validating the data parallel and async, but also here keeps the order for the next block?

Right now it looks like even if i call SendAsync to the headblock, it will sequentially process all files.

EDIT: So i wrote a little test class for my purpose of the pipeline. It has 3 stages. What i want to achieve is the first TransformBlock to keep reading in files as they come in (SendAsync from a FileSystemWatcher) and output it when done in order they came in. Means if File1 is a large file and File2+3 comes in, both will be read in, while File1 is still being processed, but File2+3 will have to wait until it can get send to the second TransformBlock, because File1 is still being read in. Stage2 should work just the same. Stage3 on the other hand needs to take objects generated from File1 and save into to database, which can be done parallel and async. However, objects from file1 need to be processed before file2 and file3. So the file contents as a whole need to be processed sequantially in order they came in. I tried doing that by limiting the 3rd TransformBlock with MaxDegreeOfParallelism and BoundedCapacity both set to 1, but this seems to fail and not really keep the order in the Console.WriteLine's

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Xml;
using System.Linq;

namespace OrderProcessing
{
    public class Job
    {
        public string Path { get; set; }

        public XmlDocument Document { get; set; }

        public List BusinessObjects { get; set; }

        public int ReturnCode { get; set; }

        public int ID { get; set; }
    }

    public class Test
    {
        ITargetBlock pathBlock = null;

        CancellationTokenSource cancellationTokenSource;

        Random rnd = new Random();

        private bool ReadDocument(Job job)
        {
            Console.WriteLine($"ReadDocument {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Read the document
            job.Document = new XmlDocument();

            // Some checking
            return true;
        }

        private bool ValidateXml(Job job)
        {
            Console.WriteLine($"ValidateXml {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            // Check XML against XSD and perform remaining checks
            job.BusinessObjects = new List();

            // Just for tests
            job.BusinessObjects.Add(new object());
            job.BusinessObjects.Add(new object());

            // Parse Xml and create business objects
            return true;
        }

        private bool ProcessJob(Job job)
        {
            Console.WriteLine($"ProcessJob {DateTime.Now.TimeOfDay} | Thread {Thread.CurrentThread.ManagedThreadId} is processing Job Id: {job.ID}");

            // Throw OperationCanceledException if cancellation is requested.
            cancellationTokenSource.Token.ThrowIfCancellationRequested();

            Parallel.ForEach(job.BusinessObjects, bO =>
            {
                ImportObject(bO);
            });


            // Import the job
            return true;
        }

        private object ImportObject(object o)
        {
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            return new object();
        }

        private void CreateResponse(Job job)
        {
            if(job.ReturnCode == 100)
            {
                Console.WriteLine("ID {0} was successfully imported.", job.ID);

            }
            else
            {
                Console.WriteLine("ID {0} failed to import.", job.ID);
            }

            // Create response XML with returncodes
        }

        ITargetBlock CreateJobProcessingPipeline()
        {
            var loadXml = new TransformBlock(job =>
            {
                try
                {
                    if(ReadDocument(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());

            var validateXml = new TransformBlock(job =>
            {
                try
                {
                    if(ValidateXml(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100;
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, TransformBlockOptions());


            var importJob = new TransformBlock(job =>
            {
                try
                {
                    if(ProcessJob(job))
                    {
                        // For later error handling
                        job.ReturnCode = 100; // success
                    }
                    else
                    {
                        job.ReturnCode = 200;
                    }

                    return job;
                }
                catch(OperationCanceledException)
                {
                    job.ReturnCode = 300;
                    return job;
                }
            }, ActionBlockOptions());

            var loadingFailed = new ActionBlock(job => CreateResponse(job), ActionBlockOptions());
            var validationFailed = new ActionBlock(job => CreateResponse(job), ActionBlockOptions());
            var reportImport = new ActionBlock(job => CreateResponse(job), ActionBlockOptions());

            //
            // Connect the pipeline
            //
            loadXml.LinkTo(validateXml, job => job.ReturnCode == 100);
            loadXml.LinkTo(loadingFailed);

            validateXml.LinkTo(importJob, Job => Job.ReturnCode == 100);
            validateXml.LinkTo(validationFailed);

            importJob.LinkTo(reportImport);

            // Return the head of the network.
            return loadXml;
        }

        public void Start()
        {
            cancellationTokenSource = new CancellationTokenSource();

            pathBlock = CreateJobProcessingPipeline();
        }

        public async void AddJob(string path, int id)
        {
            Job j = new Job();
            j.Path = path;
            j.ID = id;

            await pathBlock.SendAsync(j);
        }

        static ExecutionDataflowBlockOptions TransformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions ActionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        public void Cancel()
        {
            if(cancellationTokenSource != null)
                cancellationTokenSource.Cancel();
        }
    }

    class Program
    {
        private static String InputXml = @"C:\XML\Part.xml";
        private static Test _Pipeline;

        static void Main(string[] args)
        {
            _Pipeline = new Test();
            _Pipeline.Start();


            var data = Enumerable.Range(1, 100);

            foreach(var d in data)
                _Pipeline.AddJob(InputXml, d);

            //Wait before closing the application so we can see the results.
            Console.ReadLine();
        }
    }
}


EDIT2: After i made one change by setting BoundedCapacity to Unbounded, i got everything in the order it was send into the pipe. So it wasn't really out of order before, but messages where discarded i guess?

If i make sure that EnsureOrdered is true as well as using MaxDegreeOfParallelism of 8 in the last TransformBlock, items are not in order anymore if you check the piece of the output below. But this is where it needs to be in order, since im saving data to the database, which needs to be in the order it came in. It's not really important if its not in order when it leaves the last TransformBlock, so i guess i can't keep parallelism here?

ValidateXml 08:27:24.2855461 | Thread 21 is processing Job Id: 36
ValidateXml 08:27:24.2855461 | Thread 28 is processing Job Id: 37
+++ ProcessJob 08:27:24.2880490 | Thread 33 is processing Job Id: 9
ReadDocument 08:27:24.2855461 | Thread 6 is processing Job Id: 56
ValidateXml 08:27:25.2853094 | Thread 19 is processing Job Id: 38
ReadDocument 08:27:25.2853094 | Thread 13 is processing Job Id: 58
+++ ProcessJob 08:27:25.2868091 | Thread 34 is processing Job Id: 13
ReadDocument 08:27:25.2858087 | Thread 16 is processing Job Id: 59
+++ ProcessJob 08:27:25.2858087 | Thread 25 is processing Job Id: 10
+++ ProcessJob 08:27:25.2858087 | Thread 29 is processing Job Id: 12
ReadDocument 08:27:25.2853094 | Thread 11 is processing Job Id: 57
ReadDocument 08:27:25.2873097 | Thread 15 is processing Job Id: 60
ValidateXml 08:27:25.2853094 | Thread 22 is processing Job Id: 40
ValidateXml 08:27:25.2853094 | Thread 23 is processing Job Id: 39
+++ ProcessJob 08:27:25.2858087 | Thread 30 is processing Job Id: 11
ValidateXml 08:27:26.2865381 | Thread 21 is processing Job Id: 41
ReadDocument 08:27:26.2865381 | Thread 14 is processing Job Id: 61
ValidateXml 08:27:26.2865381 | Thread 20 is processing Job Id: 42
ValidateXml 08:27:26.2865381 | Thread 26 is processing Job Id: 43
ReadDocument 08:27:26.2865381 | Thread 17 is processing Job Id: 62
ReadDocument 08:27:26.2870374 | Thread 12 is processing Job Id: 63
+++ ProcessJob 08:27:26.2870374 | Thread 24 is processing Job Id: 14

EDIT3: 使用@JSteward最新代码后的输出.

ReadDocument 09:01:03.9363340 JobId: 1
ReadDocument 09:01:03.9368357 JobId: 5
ReadDocument 09:01:03.9373347 JobId: 6
ReadDocument 09:01:03.9368357 JobId: 8
ReadDocument 09:01:03.9363340 JobId: 4
ReadDocument 09:01:03.9373347 JobId: 3
ReadDocument 09:01:03.9373347 JobId: 7
ReadDocument 09:01:03.9368357 JobId: 2
ReadDocument 09:01:05.2037570 JobId: 9
ReadDocument 09:01:05.3108413 JobId: 10
ReadDocument 09:01:05.5678177 JobId: 11
ReadDocument 09:01:05.6308763 JobId: 12
ValidateXml 09:01:05.6338782 JobId: 1
ValidateXml 09:01:06.3754174 JobId: 2
ReadDocument 09:01:06.3764184 JobId: 13
ReadDocument 09:01:06.3764184 JobId: 14
ReadDocument 09:01:07.3756634 JobId: 15
ReadDocument 09:01:07.3756634 JobId: 18
ValidateXml 09:01:07.3756634 JobId: 3
ValidateXml 09:01:07.3756634 JobId: 4
ReadDocument 09:01:07.3756634 JobId: 17
ReadDocument 09:01:07.3756634 JobId: 16
ReadDocument 09:01:08.3753887 JobId: 19
ReadDocument 09:01:08.3753887 JobId: 20
ValidateXml 09:01:08.3753887 JobId: 5
ProcessJob 09:01:08.3763906 JobId: 1
ReadDocument 09:01:09.3744411 JobId: 21
ReadDocument 09:01:09.3749410 JobId: 24
ProcessJob 09:01:09.3749410 JobId: 2
ReadDocument 09:01:09.3749410 JobId: 22
ReadDocument 09:01:09.3749410 JobId: 23
ReadDocument 09:01:10.3752061 JobId: 25
ReadDocument 09:01:10.3752061 JobId: 27
ValidateXml 09:01:10.3752061 JobId: 6
ValidateXml 09:01:10.3752061 JobId: 7
ValidateXml 09:01:10.3752061 JobId: 8
ReadDocument 09:01:10.3752061 JobId: 26
ReadDocument 09:01:11.3759294 JobId: 29
ReadDocument 09:01:11.3759294 JobId: 28
ValidateXml 09:01:11.3764278 JobId: 10
ReadDocument 09:01:11.3759294 JobId: 31
ValidateXml 09:01:11.3759294 JobId: 9
ReadDocument 09:01:11.3759294 JobId: 30
ValidateXml 09:01:12.3751553 JobId: 11
ReadDocument 09:01:12.3751553 JobId: 33
ValidateXml 09:01:12.3751553 JobId: 12
ReadDocument 09:01:12.3751553 JobId: 34
ReadDocument 09:01:12.3751553 JobId: 32
ValidateXml 09:01:13.3753842 JobId: 13
ValidateXml 09:01:13.3753842 JobId: 14
ValidateXml 09:01:13.3753842 JobId: 16
ReadDocument 09:01:13.3753842 JobId: 35
ReadDocument 09:01:13.3753842 JobId: 36
ValidateXml 09:01:13.3753842 JobId: 15
ReadDocument 09:01:14.3756414 JobId: 37
ValidateXml 09:01:14.3756414 JobId: 19
ValidateXml 09:01:14.3756414 JobId: 18
ValidateXml 09:01:14.3756414 JobId: 17
ReadDocument 09:01:14.3756414 JobId: 40
ReadDocument 09:01:14.3756414 JobId: 38
ReadDocument 09:01:14.3756414 JobId: 39
ProcessJob 09:01:14.3761419 JobId: 3
SendToDataBase 09:01:14.3806453 JobId: 1
SendToDataBase 09:01:14.3821472 JobId: 2
ProcessJob 09:01:14.3821472 JobId: 4
ValidateXml 09:01:15.3763758 JobId: 20
ReadDocument 09:01:15.3763758 JobId: 42
ValidateXml 09:01:15.3763758 JobId: 21
ReadDocument 09:01:15.3773772 JobId: 43
ReadDocument 09:01:15.3763758 JobId: 41
ValidateXml 09:01:15.3768800 JobId: 22
ReadDocument 09:01:15.3773772 JobId: 44
ValidateXml 09:01:16.3761117 JobId: 23
ValidateXml 09:01:16.3761117 JobId: 26
ValidateXml 09:01:16.3761117 JobId: 24
ValidateXml 09:01:16.3761117 JobId: 25
ReadDocument 09:01:16.3761117 JobId: 45
ReadDocument 09:01:16.3761117 JobId: 46
ProcessJob 09:01:16.3761117 JobId: 5
ReadDocument 09:01:17.3758334 JobId: 47
ValidateXml 09:01:17.3763315 JobId: 28
ValidateXml 09:01:17.3763315 JobId: 27
ReadDocument 09:01:17.3763315 JobId: 49
ReadDocument 09:01:17.3763315 JobId: 48
ProcessJob 09:01:17.3763315 JobId: 6
ValidateXml 09:01:17.3763315 JobId: 29
ReadDocument 09:01:17.3763315 JobId: 50
ReadDocument 09:01:18.3755786 JobId: 51
ReadDocument 09:01:18.3755786 JobId: 52
<<<
ProcessJob 09:01:18.3770792 JobId: 10
ProcessJob 09:01:18.3770792 JobId: 9
ProcessJob 09:01:18.3755786 JobId: 7
>>>
ReadDocument 09:01:18.3755786 JobId: 53
ValidateXml 09:01:18.3755786 JobId: 32
ValidateXml 09:01:18.3755786 JobId: 31
ValidateXml 09:01:18.3755786 JobId: 30
ReadDocument 09:01:18.3760794 JobId: 54
ProcessJob 09:01:18.3755786 JobId: 8
ValidateXml 09:01:19.3753274 JobId: 34
ValidateXml 09:01:19.3753274 JobId: 33
ReadDocument 09:01:19.3758261 JobId: 56
ReadDocument 09:01:19.3758261 JobId: 55
ValidateXml 09:01:19.3758261 JobId: 35
ValidateXml 09:01:20.3752782 JobId: 36
ValidateXml 09:01:20.3752782 JobId: 37
ProcessJob 09:01:20.3757709 JobId: 11
ReadDocument 09:01:20.3752782 JobId: 57
ValidateXml 09:01:20.3752782 JobId: 38
ReadDocument 09:01:20.3757709 JobId: 58
ReadDocument 09:01:20.3757709 JobId: 59
ProcessJob 09:01:21.3757202 JobId: 12
ValidateXml 09:01:21.3757202 JobId: 39
ReadDocument 09:01:21.3757202 JobId: 62
ReadDocument 09:01:21.3757202 JobId: 61
ReadDocument 09:01:21.3757202 JobId: 60
ReadDocument 09:01:22.3764154 JobId: 63
ReadDocument 09:01:22.3764154 JobId: 64
ReadDocument 09:01:22.3764154 JobId: 65
ProcessJob 09:01:22.3794167 JobId: 16
ValidateXml 09:01:22.3764154 JobId: 40
ValidateXml 09:01:22.3764154 JobId: 42
ReadDocument 09:01:22.3764154 JobId: 66
ValidateXml 09:01:22.3774149 JobId: 43
ProcessJob 09:01:22.3764154 JobId: 13
ValidateXml 09:01:22.3764154 JobId: 41
ProcessJob 09:01:22.3779160 JobId: 15
SendToDataBase 09:01:22.3784159 JobId: 3
ProcessJob 09:01:22.3764154 JobId: 14
ValidateXml 09:01:22.3859209 JobId: 44
SendToDataBase 09:01:22.4309993 JobId: 4
SendToDataBase 09:01:22.4460051 JobId: 5
SendToDataBase 09:01:22.4465047 JobId: 6
ReadDocument 09:01:23.3760112 JobId: 67
ValidateXml 09:01:23.3760112 JobId: 46
ValidateXml 09:01:23.3760112 JobId: 47
ReadDocument 09:01:23.3760112 JobId: 68
ValidateXml 09:01:23.3760112 JobId: 45
ProcessJob 09:01:23.3760112 JobId: 17
ValidateXml 09:01:24.3762581 JobId: 48
ReadDocument 09:01:24.3762581 JobId: 69
ProcessJob 09:01:24.3762581 JobId: 18
ProcessJob 09:01:24.3762581 JobId: 19
ReadDocument 09:01:24.3762581 JobId: 70
CreateResponse 09:01:24.3777606 JobId: 58
CreateResponse 09:01:24.3994684 JobId: 59
CreateResponse 09:01:24.4059908 JobId: 60
CreateResponse 09:01:24.4114777 JobId: 61
CreateResponse 09:01:24.4134789 JobId: 62
ValidateXml 09:01:25.3759607 JobId: 49
ValidateXml 09:01:25.3759607 JobId: 51
ProcessJob 09:01:25.3784627 JobId: 22
ValidateXml 09:01:25.3759607 JobId: 52
ProcessJob 09:01:25.3759607 JobId: 20
ValidateXml 09:01:25.3774629 JobId: 53
ValidateXml 09:01:25.3759607 JobId: 50
ValidateXml 09:01:25.3774629 JobId: 54
ReadDocument 09:01:25.3759607 JobId: 72
ReadDocument 09:01:25.3774629 JobId: 73
ReadDocument 09:01:25.3759607 JobId: 71
ReadDocument 09:01:25.3779625 JobId: 74
ProcessJob 09:01:25.3759607 JobId: 21
SendToDataBase 09:01:25.3774629 JobId: 7
CreateResponse 09:01:25.3759607 JobId: 39
SendToDataBase 09:01:25.4398495 JobId: 8
SendToDataBase 09:01:25.4448555 JobId: 9
SendToDataBase 09:01:25.4478565 JobId: 10
SendToDataBase 09:01:25.4483570 JobId: 11
CreateResponse 09:01:25.4448555 JobId: 42
CreateResponse 09:01:25.4608868 JobId: 43
SendToDataBase 09:01:25.4553682 JobId: 12
CreateResponse 09:01:25.4613665 JobId: 44
CreateResponse 09:01:25.4698849 JobId: 45
ReadDocument 09:01:26.3754874 JobId: 75
ReadDocument 09:01:26.3754874 JobId: 76
ReadDocument 09:01:26.3754874 JobId: 78
ValidateXml 09:01:26.3754874 JobId: 55
ProcessJob 09:01:26.3759876 JobId: 24
ProcessJob 09:01:26.3754874 JobId: 23
ReadDocument 09:01:26.3754874 JobId: 77
SendToDataBase 09:01:26.3759876 JobId: 13
SendToDataBase 09:01:26.3980055 JobId: 14
SendToDataBase 09:01:26.3985045 JobId: 15
SendToDataBase 09:01:26.4020099 JobId: 16
ReadDocument 09:01:27.3762164 JobId: 79
ValidateXml 09:01:27.3762164 JobId: 56
ProcessJob 09:01:27.3762164 JobId: 26
ReadDocument 09:01:27.3762164 JobId: 82
ProcessJob 09:01:27.3762164 JobId: 25
ReadDocument 09:01:27.3762164 JobId: 81
ReadDocument 09:01:27.3762164 JobId: 80
ValidateXml 09:01:27.3762164 JobId: 63
ValidateXml 09:01:27.3777165 JobId: 64
ProcessJob 09:01:27.3767157 JobId: 27
ValidateXml 09:01:27.3762164 JobId: 57
SendToDataBase 09:01:27.3777165 JobId: 17
SendToDataBase 09:01:27.4327571 JobId: 18
SendToDataBase 09:01:27.4357587 JobId: 19
ReadDocument 09:01:28.3761410 JobId: 83
ProcessJob 09:01:28.3761410 JobId: 28
ProcessJob 09:01:28.3761410 JobId: 29
ValidateXml 09:01:28.3761410 JobId: 66
SendToDataBase 09:01:28.3761410 JobId: 20
ProcessJob 09:01:28.3761410 JobId: 30
ValidateXml 09:01:28.3761410 JobId: 67
ValidateXml 09:01:28.3761410 JobId: 65
SendToDataBase 09:01:28.3861483 JobId: 21
SendToDataBase 09:01:28.4141687 JobId: 22
ReadDocument 09:01:28.6079764 JobId: 84
ReadDocument 09:01:28.6552491 JobId: 85
ReadDocument 09:01:28.7047606 JobId: 86
ValidateXml 09:01:28.7327861 JobId: 68
ProcessJob 09:01:28.7327861 JobId: 31
ReadDocument 09:01:29.1285484 JobId: 87
ProcessJob 09:01:29.1894672 JobId: 32
SendToDataBase 09:01:29.1894672 JobId: 23
SendToDataBase 09:01:29.1944706 JobId: 24
ReadDocument 09:01:29.3910070 JobId: 88
ValidateXml 09:01:29.5569691 JobId: 69
ReadDocument 09:01:29.5995036 JobId: 89
ValidateXml 09:01:29.6085095 JobId: 70
ReadDocument 09:01:29.6581266 JobId: 90
ValidateXml 09:01:29.8797899 JobId: 71
ValidateXml 09:01:30.1244519 JobId: 72
ValidateXml 09:01:30.1584763 JobId: 73
ReadDocument 09:01:30.2100312 JobId: 91
ProcessJob 09:01:30.2490536 JobId: 33
ProcessJob 09:01:30.2950865 JobId: 34
ReadDocument 09:01:30.3290995 JobId: 92
ProcessJob 09:01:30.3636350 JobId: 35
SendToDataBase 09:01:30.3636350 JobId: 25
SendToDataBase 09:01:30.3701300 JobId: 26
SendToDataBase 09:01:30.3706299 JobId: 27
ProcessJob 09:01:30.4987430 JobId: 36
ReadDocument 09:01:30.5642707 JobId: 93
ReadDocument 09:01:30.6088035 JobId: 94
ValidateXml 09:01:30.7213868 JobId: 74
ReadDocument 09:01:30.7544106 JobId: 95
ProcessJob 09:01:30.7544106 JobId: 37
SendToDataBase 09:01:30.7544106 JobId: 28
ProcessJob 09:01:31.1091681 JobId: 38
SendToDataBase 09:01:31.1091681 JobId: 29
SendToDataBase 09:01:31.1151730 JobId: 30
ValidateXml 09:01:31.2012468 JobId: 75
ValidateXml 09:01:31.2827940 JobId: 76
ValidateXml 09:01:31.3143168 JobId: 77
ValidateXml 09:01:31.4073842 JobId: 78
ReadDocument 09:01:31.4369059 JobId: 96
ReadDocument 09:01:31.4699302 JobId: 97
ProcessJob 09:01:31.7201123 JobId: 40
SendToDataBase 09:01:31.7201123 JobId: 31
ProcessJob 09:01:32.1569310 JobId: 41
SendToDataBase 09:01:32.1569310 JobId: 32
ValidateXml 09:01:32.3650822 JobId: 79
ValidateXml 09:01:32.3650822 JobId: 80
ProcessJob 09:01:32.3966047 JobId: 46
ReadDocument 09:01:32.4236247 JobId: 98
ReadDocument 09:01:32.4831869 JobId: 99
ValidateXml 09:01:32.5607342 JobId: 81
ReadDocument 09:01:32.5777363 JobId: 100
ProcessJob 09:01:33.1461630 JobId: 47
ProcessJob 09:01:33.2081967 JobId: 48
SendToDataBase 09:01:33.2081967 JobId: 33
SendToDataBase 09:01:33.2137015 JobId: 34
SendToDataBase 09:01:33.2172021 JobId: 35
ValidateXml 09:01:33.2347146 JobId: 82
ValidateXml 09:01:33.4228519 JobId: 83
ProcessJob 09:01:33.4228519 JobId: 49
ValidateXml 09:01:33.4373638 JobId: 84
ProcessJob 09:01:33.4878995 JobId: 50
SendToDataBase 09:01:33.4878995 JobId: 36
ProcessJob 09:01:33.5819674 JobId: 51
ValidateXml 09:01:33.6239992 JobId: 85
ProcessJob 09:01:33.6239992 JobId: 52
SendToDataBase 09:01:33.6239992 JobId: 37
SendToDataBase 09:01:33.6295082 JobId: 38
ValidateXml 09:01:33.6870563 JobId: 86
ValidateXml 09:01:33.7125626 JobId: 87
ProcessJob 09:01:34.1238635 JobId: 53
ProcessJob 09:01:34.5796949 JobId: 54
<<<
SendToDataBase 09:01:34.5796949 JobId: 40
SendToDataBase 09:01:34.5856995 JobId: 41
SendToDataBase 09:01:34.5887008 JobId: 46
>>>
ValidateXml 09:01:34.7951688 JobId: 88
ValidateXml 09:01:34.9162007 JobId: 89
ProcessJob 09:01:34.9541705 JobId: 55
ValidateXml 09:01:35.0464443 JobId: 90
ProcessJob 09:01:35.3634898 JobId: 56
ProcessJob 09:01:35.3795024 JobId: 57
ValidateXml 09:01:35.5165095 JobId: 91
ValidateXml 09:01:35.8614345 JobId: 92
ProcessJob 09:01:35.9985415 JobId: 63
ValidateXml 09:01:36.0481807 JobId: 93
ProcessJob 09:01:36.0763064 JobId: 64
ProcessJob 09:01:36.0993229 JobId: 65
SendToDataBase 09:01:36.0993229 JobId: 47
SendToDataBase 09:01:36.1048270 JobId: 48
ValidateXml 09:01:36.1572079 JobId: 94
ValidateXml 09:01:36.3791015 JobId: 95
ProcessJob 09:01:36.4212607 JobId: 66
SendToDataBase 09:01:36.4212607 JobId: 49
SendToDataBase 09:01:36.4267655 JobId: 50
SendToDataBase 09:01:36.4272654 JobId: 51
SendToDataBase 09:01:36.4322913 JobId: 52
SendToDataBase 09:01:36.4327837 JobId: 53
ProcessJob 09:01:36.5149796 JobId: 67
SendToDataBase 09:01:36.5149796 JobId: 54
ValidateXml 09:01:36.6861048 JobId: 96
ValidateXml 09:01:36.7845716 JobId: 97
ValidateXml 09:01:37.0175979 JobId: 98
ValidateXml 09:01:37.3788835 JobId: 99
ValidateXml 09:01:37.6477046 JobId: 100
ProcessJob 09:01:37.8269808 JobId: 68
SendToDataBase 09:01:37.8269808 JobId: 55
ProcessJob 09:01:37.8940108 JobId: 69
ProcessJob 09:01:38.2955556 JobId: 70
ProcessJob 09:01:38.3110583 JobId: 71
SendToDataBase 09:01:38.3110583 JobId: 56
SendToDataBase 09:01:38.3125586 JobId: 57
CreateResponse 09:01:38.4551538 JobId: 95
CreateResponse 09:01:38.4925304 JobId: 96
ProcessJob 09:01:38.5382532 JobId: 72
ProcessJob 09:01:38.9129894 JobId: 73
SendToDataBase 09:01:38.9129894 JobId: 63
SendToDataBase 09:01:38.9185062 JobId: 64
SendToDataBase 09:01:38.9189949 JobId: 65
ProcessJob 09:01:38.9852121 JobId: 74
ProcessJob 09:01:39.0317458 JobId: 75
SendToDataBase 09:01:39.0317458 JobId: 66
SendToDataBase 09:01:39.0377511 JobId: 67
ProcessJob 09:01:39.6129381 JobId: 76
SendToDataBase 09:01:39.6129381 JobId: 68
ProcessJob 09:01:39.7833004 JobId: 77
SendToDataBase 09:01:39.7833004 JobId: 69
ProcessJob 09:01:39.8740443 JobId: 78
ProcessJob 09:01:40.3145731 JobId: 79
SendToDataBase 09:01:40.3145731 JobId: 70
SendToDataBase 09:01:40.3205708 JobId: 71
ProcessJob 09:01:40.4912084 JobId: 80
ProcessJob 09:01:40.5307205 JobId: 81
SendToDataBase 09:01:40.5317212 JobId: 72
ProcessJob 09:01:40.5652454 JobId: 82
ProcessJob 09:01:41.2902736 JobId: 83
ProcessJob 09:01:41.2902736 JobId: 84
ProcessJob 09:01:41.3598244 JobId: 85
SendToDataBase 09:01:41.3598244 JobId: 73
SendToDataBase 09:01:41.3663284 JobId: 74
SendToDataBase 09:01:41.3713317 JobId: 75
SendToDataBase 09:01:41.3718392 JobId: 76
SendToDataBase 09:01:41.3723328 JobId: 77
ProcessJob 09:01:42.2677493 JobId: 86
SendToDataBase 09:01:42.2677493 JobId: 78
ProcessJob 09:01:42.6466081 JobId: 87
ProcessJob 09:01:42.8947969 JobId: 88
SendToDataBase 09:01:42.8947969 JobId: 79
ProcessJob 09:01:43.0012509 JobId: 89
ProcessJob 09:01:43.1513589 JobId: 90
ProcessJob 09:01:43.4545800 JobId: 91
SendToDataBase 09:01:43.4545800 JobId: 80
SendToDataBase 09:01:43.4600832 JobId: 81
SendToDataBase 09:01:43.4605919 JobId: 82
ProcessJob 09:01:43.5946813 JobId: 92
ProcessJob 09:01:44.1731027 JobId: 93
SendToDataBase 09:01:44.1731027 JobId: 83
SendToDataBase 09:01:44.1786068 JobId: 84
SendToDataBase 09:01:44.1816090 JobId: 85
ProcessJob 09:01:44.4678171 JobId: 94
SendToDataBase 09:01:44.4678171 JobId: 86
ProcessJob 09:01:45.3426043 JobId: 97
SendToDataBase 09:01:45.3426043 JobId: 87
ProcessJob 09:01:45.3751270 JobId: 98
ProcessJob 09:01:45.7363757 JobId: 99
ProcessJob 09:01:45.7809216 JobId: 100
SendToDataBase 09:01:45.7809216 JobId: 88
SendToDataBase 09:01:45.7879270 JobId: 89
SendToDataBase 09:01:45.7925566 JobId: 90
SendToDataBase 09:01:45.8776726 JobId: 91
SendToDataBase 09:01:45.8776726 JobId: 92
SendToDataBase 09:01:46.5813640 JobId: 93
SendToDataBase 09:01:46.5813640 JobId: 94
SendToDataBase 09:01:47.7407165 JobId: 97
SendToDataBase 09:01:47.7407165 JobId: 98
SendToDataBase 09:01:48.4382058 JobId: 99
SendToDataBase 09:01:48.7357557 JobId: 100

Matthew Wats.. 5

如果您将a链接TransformBlock到,则可以执行此操作ActionBlock.

这是最容易使用可编辑的控制台应用程序演示.

此应用程序处理整数序列,但您可以使用自定义工作单元类替换整数.

(我使用相对较慢的LZMA压缩算法从我编写的实用多线程文件压缩的​​实用程序中修改了此代码.该实用程序必须从文件中顺序读取输入数据,然后将其以块的形式传递给处理数据的队列以任何顺序使用多个线程,最后将压缩块输出到队列,该队列必须保留数据块的原始顺序.)

示例代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Program
    {
        public static void Main()
        {
            var data = Enumerable.Range(1, 100);
            var task = Process(data);

            Console.WriteLine("Waiting for task to complete");
            task.Wait();
            Console.WriteLine("Task complete.");
        }

        public static async Task Process(IEnumerable data)
        {
            var queue = new TransformBlock(block => process(block), transformBlockOptions());
            var writer = new ActionBlock(block => write(block), actionBlockOptions());

            queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });

            await enqueDataToProcessAndAwaitCompletion(data, queue);

            await writer.Completion;
        }

        static int process(int block)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
            emulateWorkload();
            return -block;
        }

        static void write(int block)
        {
            Console.WriteLine("Output: " + block);
        }

        static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable data, TransformBlock queue)
        {
            await enqueueDataToProcess(data, queue);
            queue.Complete();
        }

        static async Task enqueueDataToProcess(IEnumerable data, ITargetBlock queue)
        {
            foreach (var item in data)
                await queue.SendAsync(item);
        }


        static ExecutionDataflowBlockOptions transformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static Random rng = new Random();
        static object locker = new object();

        static void emulateWorkload()
        {
            int delay;

            lock (locker)
            {
                delay = rng.Next(250, 750);
            }

            Thread.Sleep(delay);
        }
    }
}

输出:

Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .

注意多个线程如何以任何顺序处理"块",但输出顺序与输入顺序相同.

根据方法设置输出操作块选项非常重要actionBlockOptions(),MaxDegreeOfParallelism并且BoundedCapacity都设置为1.

这是导致输出按正确顺序序列化的原因.如果为输出设置BoundedCapacityMaxDegreeOfParallelism大于1,则可能以错误的顺序输出.



1> Matthew Wats..:

如果您将a链接TransformBlock到,则可以执行此操作ActionBlock.

这是最容易使用可编辑的控制台应用程序演示.

此应用程序处理整数序列,但您可以使用自定义工作单元类替换整数.

(我使用相对较慢的LZMA压缩算法从我编写的实用多线程文件压缩的​​实用程序中修改了此代码.该实用程序必须从文件中顺序读取输入数据,然后将其以块的形式传递给处理数据的队列以任何顺序使用多个线程,最后将压缩块输出到队列,该队列必须保留数据块的原始顺序.)

示例代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    class Program
    {
        public static void Main()
        {
            var data = Enumerable.Range(1, 100);
            var task = Process(data);

            Console.WriteLine("Waiting for task to complete");
            task.Wait();
            Console.WriteLine("Task complete.");
        }

        public static async Task Process(IEnumerable data)
        {
            var queue = new TransformBlock(block => process(block), transformBlockOptions());
            var writer = new ActionBlock(block => write(block), actionBlockOptions());

            queue.LinkTo(writer, new DataflowLinkOptions { PropagateCompletion = true });

            await enqueDataToProcessAndAwaitCompletion(data, queue);

            await writer.Completion;
        }

        static int process(int block)
        {
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is processing block {block}");
            emulateWorkload();
            return -block;
        }

        static void write(int block)
        {
            Console.WriteLine("Output: " + block);
        }

        static async Task enqueDataToProcessAndAwaitCompletion(IEnumerable data, TransformBlock queue)
        {
            await enqueueDataToProcess(data, queue);
            queue.Complete();
        }

        static async Task enqueueDataToProcess(IEnumerable data, ITargetBlock queue)
        {
            foreach (var item in data)
                await queue.SendAsync(item);
        }


        static ExecutionDataflowBlockOptions transformBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 8,
                BoundedCapacity = 32
            };
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
                BoundedCapacity = 1
            };
        }

        static Random rng = new Random();
        static object locker = new object();

        static void emulateWorkload()
        {
            int delay;

            lock (locker)
            {
                delay = rng.Next(250, 750);
            }

            Thread.Sleep(delay);
        }
    }
}

输出:

Waiting for task to complete
Thread 8 is processing block 8
Thread 5 is processing block 2
Thread 6 is processing block 6
Thread 4 is processing block 5
Thread 7 is processing block 7
Thread 10 is processing block 4
Thread 9 is processing block 1
Thread 3 is processing block 3
Thread 3 is processing block 9
Thread 8 is processing block 10
Thread 5 is processing block 11
Thread 6 is processing block 12
Thread 9 is processing block 13
Thread 10 is processing block 14
Thread 7 is processing block 15
Thread 8 is processing block 16
Thread 4 is processing block 17
Thread 5 is processing block 18
Thread 3 is processing block 19
Thread 9 is processing block 20
Thread 8 is processing block 21
Output: -1
Output: -2
Output: -3
Output: -4
Output: -5
Output: -6
Output: -7
Output: -8
Output: -9
Output: -10
Output: -11
Output: -12
Output: -13
Thread 6 is processing block 22
Thread 10 is processing block 23
Output: -14
Thread 7 is processing block 24
Output: -15
Output: -16
Thread 6 is processing block 25
Output: -17
Thread 4 is processing block 26
Thread 5 is processing block 27
----------------->SNIP<-----------------
Thread 10 is processing block 93
Thread 8 is processing block 94
Output: -83
Thread 4 is processing block 95
Output: -84
Output: -85
Output: -86
Output: -87
Thread 3 is processing block 96
Output: -88
Thread 6 is processing block 97
Thread 5 is processing block 98
Thread 10 is processing block 99
Thread 9 is processing block 100
Output: -89
Output: -90
Output: -91
Output: -92
Output: -93
Output: -94
Output: -95
Output: -96
Output: -97
Output: -98
Output: -99
Output: -100
Task complete.
Press any key to continue . . .

注意多个线程如何以任何顺序处理"块",但输出顺序与输入顺序相同.

根据方法设置输出操作块选项非常重要actionBlockOptions(),MaxDegreeOfParallelism并且BoundedCapacity都设置为1.

这是导致输出按正确顺序序列化的原因.如果为输出设置BoundedCapacityMaxDegreeOfParallelism大于1,则可能以错误的顺序输出.



2> JSteward..:

@Matthew Watson有一个很好的建议我只是想投入,除非你使用的是Microsoft.Tpl.Dataflow包,否则没有必要将MaxDegreeOfParallelism和BoundedCapacity的最终操作块限制为1.较新且正确的System.Threading.Tasks.Dataflow将属性EnsureOrdered添加到执行块选项.虽然这似乎没有在MSDN中记录,但您可以在TPL数据流源中找到此属性及其用法.

下面是演示此行为的示例和测试,将执行选项中的EnsureOrdered更改为false将导致测试失败.默认值为true,不需要为有序行为显式设置.

编辑: 正如@Matthew Watson在下面指出的那样,当EnsureOrdered在Propagator Blocks之间保持秩序时,一旦在动作块中消息可以按任何顺序处理.

EDIT2:请注意:如果ActionBlock具有MaxDegreeOfParllelismBoundedCapacity设置为一个,但是EnsureOrdered是假的,会导致测试失败,其结果将是无序.

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock buffer;
    private TransformBlock xForm1;
    private ActionBlock action;
    public IList OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 2,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock();

        xForm1 = new TransformBlock(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        action = new ActionBlock(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Output  Id: {x.Id} Value: {x.Value}");

            //this delay will cause the messages to be unordered
            Task.Delay(rnd.Next(1000, 3000)).Wait();

            OutputMessages.Add(x);
        }, options);
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);
        xForm1.LinkTo(action, options);
    }

    public Task PostData(IEnumerable data) {

        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return action.Completion;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}

编辑: 遗憾的是我们无法直接访问内部 ReorderingBuffer.因此,ActionBlock使用BoundedCapacityMaxDegreeOfParallelism等于1 的替代方法是将TransformBlock有序输出链接到流.请注意,在上面的代码中,并行启用的延迟ActionBlock会导致结果无序,但在下面的代码中,流处理的延迟不会干扰顺序.基本上,提供与同步相同的行为,ActionBlock并可以提供另一部分网格等.

[TestFixture]
public class TestRunner {

    [Test]
    public void TestPipeline() {
        var data = Enumerable.Range(0, 30).Select(x => new Message(x, x)).ToList();

        var target = new MyDataflow();
        target.PostData(data).Wait();

        Assert.IsTrue(data.SequenceEqual(target.OutputMessages));
    }
}

public class MyDataflow {

    private static Random rnd = new Random();

    private BufferBlock buffer;
    private TransformBlock xForm1;
    private IObservable output;
    private TaskCompletionSource areWeDoneYet;
    public IList OutputMessages { get; set; }

    public MyDataflow() {
        OutputMessages = new List();
        CreatePipeline();
        LinkPipeline();
    }

    public void CreatePipeline() {
        var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 13,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };

        buffer = new BufferBlock();

        xForm1 = new TransformBlock(x => {
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Started Id: {x.Id}");
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            Console.WriteLine($"{DateTime.Now.TimeOfDay} - Finished Id: {x.Id}");
            return x;
        }, options);

        output = xForm1.AsObservable();

        areWeDoneYet = new TaskCompletionSource();
    }

    public void LinkPipeline() {
        var options = new DataflowLinkOptions() {
            PropagateCompletion = true
        };

        buffer.LinkTo(xForm1, options);

        var subscription = output.Subscribe(msg => {
            Task.Delay(rnd.Next(1000, 3000)).Wait();
            OutputMessages.Add(msg);
        }, () => areWeDoneYet.SetResult(true));            
    }

    public Task PostData(IEnumerable data) {            
        foreach (var item in data) {
            buffer.Post(item);
        }
        buffer.Complete();
        return areWeDoneYet.Task;
    }
}

public class Message {
    public Message(int id, int value) {
        this.Id = id;
        this.Value = value;
    }
    public int Id { get; set; }
    public int Value { get; set; }
}

Edit2: 另外,我的管道应该有3个阶段,我怎么能链接那些?因此,当第一个块完成第一个文件时,它会开始将数据输出到下一个块,这将再次与并行和异步工作.

这不是由它们的联系方式驱动的,而是在ExecutionDataflowBlockOptions.使用下面显示的选项,第一个块将根据发布的文件数量及其给定的处理时间来完成任务,当它们完成时,它们将输出到下一个处理阶段或ActionBlock基于Job.ReturnCode谓词处理故障,以及接下来的阶段也将如此.您还可以修改ActionBlock选项以处理您的多个成功/失败TransformBlocks.

var options = new ExecutionDataflowBlockOptions() {
            BoundedCapacity = 10,
            MaxDegreeOfParallelism = 10,
            EnsureOrdered = true
        };
var loadXml = new TransformBlock(job => { ... }, options); // I/O
var validateData = new TransformBlock(job => { ... }, options); // Parsing&Validating&Calculations
var importJob = new TransformBlock(job => { ... }, options); // Saving to database

var loadingFailed = new ActionBlock(job => CreateResponse(job));
var validationFailed = new ActionBlock(job => CreateResponse(job));
var reportImport = new ActionBlock(job => CreateResponse(job));

loadXml.LinkTo(validateData, job => job.ReturnCode == 100);
loadXml.LinkTo(loadingFailed);

validateData.LinkTo(importJob, Job => Job.ReturnCode == 100);
validateData.LinkTo(validationFailed);

importJob.LinkTo(reportImport);

编辑 3响应OP添加的源代码:通过将MaxDegreeOfParallelism和设置BoundedCapcity为1来丢失最后一个转换块中的有序行为.让我明确不要这样做以"确保命令"它只与图书馆作战.以下是来自以下内容的相关摘录TransformBlock:

            // If parallelism is employed, we will need to support reordering messages that complete out-of-order.
            // However, a developer can override this with EnsureOrdered == false.
            if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered)
            {
                _reorderingBuffer = new ReorderingBuffer(this, (owningSource, message) => ((TransformBlock)owningSource)._source.AddMessage(message));
            }

这是一个包含20个数据点的运行,修改了代码以在最终的TBlock中使用并行性.修改为基本csv以在Excel中查看,即替换""=>",":)

Function,TimeStamp/Inserted JobId,Other,Other,Other,Other,Other,Other,Other,JobId From functions
ReadDocument,04:54.0,|,Thread,6,is,processing,Job,Id:,1
ReadDocument,04:54.0,|,Thread,11,is,processing,Job,Id:,2
ReadDocument,04:56.0,|,Thread,13,is,processing,Job,Id:,3
ReadDocument,04:56.0,|,Thread,6,is,processing,Job,Id:,4
ReadDocument,04:57.0,|,Thread,11,is,processing,Job,Id:,5
ReadDocument,04:57.0,|,Thread,14,is,processing,Job,Id:,6
ReadDocument,04:58.0,|,Thread,15,is,processing,Job,Id:,7
ReadDocument,04:58.0,|,Thread,6,is,processing,Job,Id:,8
ReadDocument,04:59.0,|,Thread,11,is,processing,Job,Id:,9
ReadDocument,04:59.0,|,Thread,16,is,processing,Job,Id:,10
ReadDocument,05:00.0,|,Thread,17,is,processing,Job,Id:,12
ReadDocument,05:00.0,|,Thread,15,is,processing,Job,Id:,11
ReadDocument,05:01.0,|,Thread,16,is,processing,Job,Id:,13
ReadDocument,05:01.0,|,Thread,18,is,processing,Job,Id:,14
ReadDocument,05:02.0,|,Thread,15,is,processing,Job,Id:,15
ReadDocument,05:02.0,|,Thread,17,is,processing,Job,Id:,20
ValidateXml,05:02.0,|,Thread,19,is,processing,Job,Id:,1
ReadDocument,05:02.0,|,Thread,14,is,processing,Job,Id:,17
ReadDocument,05:02.0,|,Thread,13,is,processing,Job,Id:,16
ReadDocument,05:02.0,|,Thread,11,is,processing,Job,Id:,18
ReadDocument,05:02.0,|,Thread,6,is,processing,Job,Id:,19
ValidateXml,05:03.0,|,Thread,16,is,processing,Job,Id:,2
ValidateXml,05:03.0,|,Thread,20,is,processing,Job,Id:,3
ValidateXml,05:04.0,|,Thread,11,is,processing,Job,Id:,4
ValidateXml,05:04.0,|,Thread,21,is,processing,Job,Id:,7
ValidateXml,05:04.0,|,Thread,18,is,processing,Job,Id:,5
ValidateXml,05:04.0,|,Thread,15,is,processing,Job,Id:,6
ValidateXml,05:04.5,|,Thread,16,is,processing,Job,Id:,8
ValidateXml,05:04.5,|,Thread,6,is,processing,Job,Id:,9
ValidateXml,05:04.6,|,Thread,19,is,processing,Job,Id:,10
ProcessJob,05:04.6,|,Thread,14,is,processing,Job,Id:,2
ProcessJob,05:04.6,|,Thread,22,is,processing,Job,Id:,1
ValidateXml,05:05.5,|,Thread,18,is,processing,Job,Id:,11
ValidateXml,05:05.6,|,Thread,20,is,processing,Job,Id:,12
ProcessJob,05:05.6,|,Thread,23,is,processing,Job,Id:,3
ValidateXml,05:06.5,|,Thread,6,is,processing,Job,Id:,13
ValidateXml,05:06.5,|,Thread,21,is,processing,Job,Id:,15
ID,1,was,successfully,imported.,,,,,
ValidateXml,05:06.5,|,Thread,16,is,processing,Job,Id:,14
ValidateXml,05:06.8,|,Thread,15,is,processing,Job,Id:,17
ProcessJob,05:06.8,|,Thread,24,is,processing,Job,Id:,4
ValidateXml,05:06.8,|,Thread,11,is,processing,Job,Id:,16
ProcessJob,05:06.8,|,Thread,22,is,processing,Job,Id:,5
ProcessJob,05:07.5,|,Thread,17,is,processing,Job,Id:,6
ProcessJob,05:07.5,|,Thread,25,is,processing,Job,Id:,8
ValidateXml,05:07.5,|,Thread,19,is,processing,Job,Id:,18
ProcessJob,05:07.5,|,Thread,14,is,processing,Job,Id:,7
ValidateXml,05:08.5,|,Thread,16,is,processing,Job,Id:,19
ProcessJob,05:08.5,|,Thread,23,is,processing,Job,Id:,9
ValidateXml,05:08.5,|,Thread,18,is,processing,Job,Id:,20
ProcessJob,05:09.5,|,Thread,19,is,processing,Job,Id:,10
ID,2,was,successfully,imported.,,,,,
ProcessJob,05:09.5,|,Thread,15,is,processing,Job,Id:,11
ID,3,was,successfully,imported.,,,,,
ProcessJob,05:10.6,|,Thread,14,is,processing,Job,Id:,12
ProcessJob,05:10.9,|,Thread,25,is,processing,Job,Id:,13
ProcessJob,05:11.0,|,Thread,24,is,processing,Job,Id:,14
ID,4,was,successfully,imported.,,,,,
ProcessJob,05:11.1,|,Thread,17,is,processing,Job,Id:,15
ProcessJob,05:11.3,|,Thread,22,is,processing,Job,Id:,16
ID,5,was,successfully,imported.,,,,,
ID,6,was,successfully,imported.,,,,,
ID,7,was,successfully,imported.,,,,,
ID,8,was,successfully,imported.,,,,,
ProcessJob,05:11.6,|,Thread,19,is,processing,Job,Id:,17
ProcessJob,05:11.7,|,Thread,23,is,processing,Job,Id:,18
ID,9,was,successfully,imported.,,,,,
ID,10,was,successfully,imported.,,,,,
ProcessJob,05:12.0,|,Thread,14,is,processing,Job,Id:,19
ProcessJob,05:12.4,|,Thread,15,is,processing,Job,Id:,20
ID,11,was,successfully,imported.,,,,,
ID,12,was,successfully,imported.,,,,,
ID,13,was,successfully,imported.,,,,,
ID,14,was,successfully,imported.,,,,,
ID,15,was,successfully,imported.,,,,,
ID,16,was,successfully,imported.,,,,,
ID,17,was,successfully,imported.,,,,,
ID,18,was,successfully,imported.,,,,,
ID,19,was,successfully,imported.,,,,,
ID,20,was,successfully,imported.,,,,,

最后一点注意:返回bool成功函数并将异常映射到返回代码可能会有问题,但这不属于本问题的范围.您可以通过在Stack Exchange Code Review上发布代码来获得有关最佳实践的许多好建议

推荐阅读
360691894_8a5c48
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有