注意:让我为这个问题的长度道歉,我不得不把很多信息都写进去.我希望这不会导致太多人简单地浏览它并做出假设.请完整阅读.谢谢.
我有一个通过套接字进入的数据流.这些数据是面向行的.
我正在使用.NET的APM(异步编程方法)(BeginRead等).这排除了使用基于流的I/O,因为异步I/O是基于缓冲区的.可以重新打包数据并将其发送到流,例如内存流,但也存在问题.
问题是我的输入流(我无法控制)没有给我任何关于流有多长的信息.它只是一个看起来像这样的换行符流:
COMMAND\n ...Unpredictable number of lines of data...\n END COMMAND\n ....repeat....
因此,使用APM,并且由于我不知道任何给定数据集将持续多长时间,因此很可能数据块将跨越需要多次读取的缓冲区边界,但这些多次读取也将跨越多个数据块.
例:
Byte buffer[1024] = ".................blah\nThis is another l" [another read] "ine\n.............................More Lines..."
我的第一个想法是使用StringBuilder并简单地将缓冲线附加到SB.这在某种程度上起作用,但我发现很难提取数据块.我尝试使用StringReader来读取新的数据但是无法知道你是否得到一个完整的行,因为StringReader在最后一个块的末尾返回一个部分行,然后返回null.没有办法知道返回的内容是否是完整的新行数据.
例:
// Note: no newline at the end StringBuilder sb = new StringBuilder("This is a line\nThis is incomp.."); StringReader sr = new StringReader(sb); string s = sr.ReadLine(); // returns "This is a line" s = sr.ReadLine(); // returns "This is incomp.."
更糟糕的是,如果我继续追加数据,缓冲区会变得越来越大,因为这可能会持续数周或数月,而这不是一个好的解决方案.
我的下一个想法是在读取它时从SB中删除数据块.这需要编写我自己的ReadLine函数,但后来我在读写期间卡住了数据.此外,较大的数据块(可能包含数百个读取和数兆字节的数据)需要扫描整个缓冲区以查找换行符.它不高效且非常丑陋.
我正在寻找具有StreamReader/Writer简单性和易于异步I/O的东西.
我的下一个想法是使用MemoryStream,并将数据块写入内存流,然后将StreamReader附加到流并使用ReadLine,但我再次知道缓冲区中的最后一次读取是完整的行还是不,再加上从流中删除"陈旧"数据就更难了.
我还考虑过使用带有同步读取的线程.这样做的好处是,使用StreamReader时,除了断开的连接情况外,它总是从ReadLine()返回一个完整的行.然而,这在取消连接时存在问题,并且某些类型的网络问题可能导致长时间挂起的阻塞套接字.我正在使用异步IO,因为我不想在程序阻塞数据接收的生命周期中占用一个线程.
这种联系持久.数据将随着时间的推移而持续流动.在初始连接期间,存在大量数据,并且一旦完成该流程,套接字保持打开以等待实时更新.我不确切地知道初始流程何时"完成",因为唯一的方法是不再发送更多数据.这意味着我不能等待在处理之前完成初始数据加载,我几乎在实时处理"实时"处理.
那么,任何人都可以提出一个很好的方法来处理这种情况,而不是过于复杂吗?我真的希望它尽可能简单和优雅,但由于所有边缘情况,我不断提出越来越复杂的解决方案.我想我想要的是某种FIFO,在其中我可以轻松地追加更多数据,同时从中弹出符合特定条件的数据(即换行终止字符串).
这是一个非常有趣的问题.对我来说,解决方案一直是使用一个单独的线程和同步操作,如你所建议的那样.(我设法解决了使用锁和许多异常处理程序阻塞套接字的大多数问题.)尽管如此,使用内置异步操作通常也是可取的,因为它允许真正的操作系统级异步I/O,所以我理解你的观点.
好吧,我已经去写了一堂课来完成我认为你需要的东西(我会以相对干净的方式说).让我知道你的想法.
using System; using System.Collections.Generic; using System.IO; using System.Text; public class AsyncStreamProcessor : IDisposable { protected StringBuilder _buffer; // Buffer for unprocessed data. private bool _isDisposed = false; // True if object has been disposed public AsyncStreamProcessor() { _buffer = null; } public IEnumerableProcess(byte[] newData) { // Note: replace the following encoding method with whatever you are reading. // The trick here is to add an extra line break to the new data so that the algorithm recognises // a single line break at the end of the new data. using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine)) { // Read all lines from new data, returning all but the last. // The last line is guaranteed to be incomplete (or possibly complete except for the line break, // which will be processed with the next packet of data). string line, prevLine = null; while ((line = newDataReader.ReadLine()) != null) { if (prevLine != null) { yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine; _buffer = null; } prevLine = line; } // Store last incomplete line in buffer. if (_buffer == null) // Note: the (* 2) gives you the prediction of the length of the incomplete line, // so that the buffer does not have to be expanded in most/all situations. // Change it to whatever seems appropiate. _buffer = new StringBuilder(prevLine, prevLine.Length * 2); else _buffer.Append(prevLine); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (!_isDisposed) { if (disposing) { // Dispose managed resources. _buffer = null; GC.Collect(); } // Dispose native resources. // Remember that object has been disposed. _isDisposed = true; } } }
应为每个NetworkStream创建此类的实例,并且每当收到新数据时都应调用Process函数(在BeginRead的回调方法中,在调用我想象的下一个BeginRead之前).
注意:我只用测试数据验证了这段代码,而不是通过网络传输的实际数据.但是,我不希望有任何差异......
此外,警告该类当然不是线程安全的,但只要在处理完当前数据之后不再执行BeginRead(正如我假设您正在做的那样),应该没有任何问题.
希望这对你有用.如果有剩余问题,请告诉我,我会尝试修改解决方案来处理它们.(尽管仔细阅读,但我错过的问题可能会有些微妙!)