当前位置:  开发笔记 > 编程语言 > 正文

处理基于线路的网络I/O流的好方法是什么?

如何解决《处理基于线路的网络I/O流的好方法是什么?》经验,为你挑选了1个好方法。

注意:让我为这个问题的长度道歉,我不得不把很多信息都写进去.我希望这不会导致太多人简单地浏览它并做出假设.请完整阅读.谢谢.

我有一个通过套接字进入的数据流.这些数据是面向行的.

我正在使用.NET的APM(异步编程方法)(BeginRead等).这排除了使用基于流的I/O,因为异步I/O是基于缓冲区的.可以重新打包数据并将其发送到流,例如内存流,但也存在问题.

问题是我的输入流(我无法控制)没有给我任何关于流有多长的信息.它只是一个看起来像这样的换行符流:

COMMAND\n
...Unpredictable number of lines of data...\n
END COMMAND\n
....repeat....

因此,使用APM,并且由于我不知道任何给定数据集将持续多长时间,因此很可能数据块将跨越需要多次读取的缓冲区边界,但这些多次读取也将跨越多个数据块.

例:

Byte buffer[1024] = ".................blah\nThis is another l"
[another read]
                    "ine\n.............................More Lines..."

我的第一个想法是使用StringBuilder并简单地将缓冲线附加到SB.这在某种程度上起作用,但我发现很难提取数据块.我尝试使用StringReader来读取新的数据但是无法知道你是否得到一个完整的行,因为StringReader在最后一个块的末尾返回一个部分行,然后返回null.没有办法知道返回的内容是否是完整的新行数据.

例:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

更糟糕的是,如果我继续追加数据,缓冲区会变得越来越大,因为这可能会持续数周或数月,而这不是一个好的解决方案.

我的下一个想法是在读取它时从SB中删除数据块.这需要编写我自己的ReadLine函数,但后来我在读写期间卡住了数据.此外,较大的数据块(可能包含数百个读取和数兆字节的数据)需要扫描整个缓冲区以查找换行符.它不高效且非常丑陋.

我正在寻找具有StreamReader/Writer简单性和易于异步I/O的东西.

我的下一个想法是使用MemoryStream,并将数据块写入内存流,然后将StreamReader附加到流并使用ReadLine,但我再次知道缓冲区中的最后一次读取是完整的行还是不,再加上从流中删除"陈旧"数据就更难了.

我还考虑过使用带有同步读取的线程.这样做的好处是,使用StreamReader时,除了断开的连接情况外,它总是从ReadLine()返回一个完整的行.然而,这在取消连接时存在问题,并且某些类型的网络问题可能导致长时间挂起的阻塞套接字.我正在使用异步IO,因为我不想在程序阻塞数据接收的生命周期中占用一个线程.

这种联系持久.数据将随着时间的推移而持续流动.在初始连接期间,存在大量数据,并且一旦完成该流程,套接字保持打开以等待实时更新.我不确切地知道初始流程何时"完成",因为唯一的方法是不再发送更多数据.这意味着我不能等待在处理之前完成初始数据加载,我几乎在实时处理"实时"处理.

那么,任何人都可以提出一个很好的方法来处理这种情况,而不是过于复杂吗?我真的希望它尽可能简单和优雅,但由于所有边缘情况,我不断提出越来越复杂的解决方案.我想我想要的是某种FIFO,在其中我可以轻松地追加更多数据,同时从中弹出符合特定条件的数据(即换行终止字符串).



1> Noldorin..:

这是一个非常有趣的问题.对我来说,解决方案一直是使用一个单独的线程和同步操作,如你所建议的那样.(我设法解决了使用锁和许多异常处理程序阻塞套接字的大多数问题.)尽管如此,使用内置异步操作通常也是可取的,因为它允许真正的操作系统级异步I/O,所以我理解你的观点.

好吧,我已经去写了一堂课来完成我认为你需要的东西(我会以相对干净的方式说).让我知道你的想法.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

应为每个NetworkStream创建此类的实例,并且每当收到新数据时都应调用Process函数(在BeginRead的回调方法中,在调用我想象的下一个BeginRead之前).

注意:我只用测试数据验证了这段代码,而不是通过网络传输的实际数据.但是,我不希望有任何差异......

此外,警告该类当然不是线程安全的,但只要在处理完当前数据之后不再执行BeginRead(正如我假设您正在做的那样),应该没有任何问题.

希望这对你有用.如果有剩余问题,请告诉我,我会尝试修改解决方案来处理它们.(尽管仔细阅读,但我错过的问题可能会有些微妙!)

推荐阅读
雨天是最美
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有