当前位置:  开发笔记 > 编程语言 > 正文

C#Begin/EndReceive - 如何读取大数据?

如何解决《C#Begin/EndReceive-如何读取大数据?》经验,为你挑选了2个好方法。

当以1024块的数据块读取数据时,如何继续从接收大于1024字节的消息的套接字读取,直到没有数据为止?我应该只使用BeginReceive来读取数据包的长度前缀,然后一旦检索到它,使用Receive()(在异步线程中)读取数据包的其余部分?或者还有另一种方式吗?

编辑:

我认为Jon Skeet的链接有解决方案,但是有一些关于该代码的speedbump.我使用的代码是:

public class StateObject
{
    public Socket workSocket = null;
    public const int BUFFER_SIZE = 1024;
    public byte[] buffer = new byte[BUFFER_SIZE];
    public StringBuilder sb = new StringBuilder();
}

public static void Read_Callback(IAsyncResult ar)
{
    StateObject so = (StateObject) ar.AsyncState;
    Socket s = so.workSocket;

    int read = s.EndReceive(ar);

    if (read > 0) 
    {
        so.sb.Append(Encoding.ASCII.GetString(so.buffer, 0, read));

        if (read == StateObject.BUFFER_SIZE)
        {
            s.BeginReceive(so.buffer, 0, StateObject.BUFFER_SIZE, 0, 
                    new AyncCallback(Async_Send_Receive.Read_Callback), so);
            return;
        }
    }

    if (so.sb.Length > 0)
    {
        //All of the data has been read, so displays it to the console
        string strContent;
        strContent = so.sb.ToString();
        Console.WriteLine(String.Format("Read {0} byte from socket" + 
        "data = {1} ", strContent.Length, strContent));
    }
    s.Close();
}

现在这个纠正在大多数情况下工作正常,但是当数据包的大小是缓冲区的倍数时它会失败.原因是如果缓冲区在读取时被填充,则假设有更多数据; 但同样的问题和以前一样.例如,2字节缓冲区在4字节数据包上填充两次,并假设有更多数据.然后阻止,因为没有什么可以阅读.问题是接收功能不知道数据包的结尾是什么时候.


这让我想到了两个可能的解决方案:我可以有一个数据包结束分隔符,或者我可以读取数据包标题来查找长度然后接收到这个数量(正如我最初的建议).

但是,每个都存在问题.我不喜欢使用分隔符的想法,因为用户可以某种方式将其用于应用程序的输入字符串中的数据包并将其搞砸.它对我来说似乎有点草率.

长度标题听起来不错,但我打算使用协议缓冲区 - 我不知道数据的格式.有长度标题吗?这是多少字节?这会是我自己实现的吗?等等..

我该怎么办?



1> Jon Skeet..:

否 - BeginReceive从回调处理程序再次调用,直到EndReceive返回0.基本上,您应该继续异步接收,假设您希望获得异步IO的最大好处.

如果你查看MSDN页面,Socket.BeginReceive你会看到一个这样的例子.(不可否认,这并不像现在这么容易.)



2> Matt Davis..:

荡.鉴于已经权衡过的要人,我甚至对回复这个问题犹豫不决,但现在就去了.温柔,O Ones!

没有阅读Marc博客的好处(由于公司的互联网政策,它已被封锁),我将提供"另一种方式".

在我看来,技巧是将数据的接收与该数据的处理分开.

我使用像这样定义的StateObject类.它与MSDN StateObject实现的不同之处在于它不包含StringBuilder对象,BUFFER_SIZE常量是私有的,并且为了方便起见它包含一个构造函数.

public class StateObject
{
    private const int BUFFER_SIZE = 65535;
    public byte[] Buffer = new byte[BUFFER_SIZE];
    public readonly Socket WorkSocket = null;

    public StateObject(Socket workSocket)
    {
        WorkSocket = workSocket;
    }
}

我还有一个Packet类,它只是一个缓冲区和时间戳的包装器.

public class Packet
{
    public readonly byte[] Buffer;
    public readonly DateTime Timestamp;

    public Packet(DateTime timestamp, byte[] buffer, int size)
    {
        Timestamp = timestamp;
        Buffer = new byte[size];
        System.Buffer.BlockCopy(buffer, 0, Buffer, 0, size);
    }
}

我的ReceiveCallback()函数看起来像这样.

public static ManualResetEvent PacketReceived = new ManualResetEvent(false);
public static List PacketList = new List();
public static object SyncRoot = new object();
public static void ReceiveCallback(IAsyncResult ar)
{
    try {
        StateObject so = (StateObject)ar.AsyncState;
        int read = so.WorkSocket.EndReceive(ar);

        if (read > 0) {
            Packet packet = new Packet(DateTime.Now, so.Buffer, read);
            lock (SyncRoot) {
                PacketList.Add(packet);
            }
            PacketReceived.Set();
        }

        so.WorkSocket.BeginReceive(so.Buffer, 0, so.Buffer.Length, 0, ReceiveCallback, so);
    } catch (ObjectDisposedException) {
        // Handle the socket being closed with an async receive pending
    } catch (Exception e) {
        // Handle all other exceptions
    }
}

请注意,此实现绝对不会处理接收到的数据,也不会对应该接收多少字节进行任何预测.它只接收套接字上发生的任何数据(最多65535字节)并将该数据存储在数据包列表中,然后立即将另一个异步接收排队.

由于处理每个异步接收的线程中不再发生处理,因此数据显然将由不同的线程处理,这就是通过lock语句同步Add()操作的原因.此外,处理线程(无论是主线程还是其他专用线程)需要知道何时有数据要处理.为此,我通常使用ManualResetEvent,这就是我上面所示的.

以下是处理的工作原理.

static void Main(string[] args)
{
    Thread t = new Thread(
        delegate() {
            List packets;
            while (true) {
                PacketReceived.WaitOne();
                PacketReceived.Reset();
                lock (SyncRoot) {
                    packets = PacketList;
                    PacketList = new List();
                }

                foreach (Packet packet in packets) {
                    // Process the packet
                }
            }
        }
    );
    t.IsBackground = true;
    t.Name = "Data Processing Thread";
    t.Start();
}

这是我用于所有套接字通信的基本基础结构.它在接收数据和处理数据之间提供了很好的分离.

至于你遇到的另一个问题,重要的是要记住这种方法,每个Packet实例不一定代表应用程序上下文中的完整消息.数据包实例可能包含部分消息,单个消息或多个消息,并且您的消息可能跨越多个数据包实例.我已经解决了如何知道您在此处发布的相关问题中收到完整消息的时间.

推荐阅读
凹凸曼00威威_694
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有