当前位置:  开发笔记 > 编程语言 > 正文

c#全双工异步命名管道.NET

如何解决《c#全双工异步命名管道.NET》经验,为你挑选了1个好方法。

我试图在2台不同的机器(仅)上实现全双工客户端 - 服务器通信方案,其中每个端点(客户端或服务器)可以随时发送内容,异步(非阻塞管道),以及另一端将拿起并阅读它.

我不想将答案转给我除了命名管道之外的任何其他技术,我知道其他技术,但我想要回答这个特定的问题.(我已经在不同的论坛上看过很多次这个问题了,我一直看到回应建议使用其他技术.我觉得这个问题接近于粗鲁?)

我读过Named Pipes只能单向或者锁定,但我猜这可能是错的.我认为管道是基于套接字的,我无法想象底层套接字只是单向的.

这个问题的任何答案都需要解决这些问题才能真正发挥作用:

    答案需要解决异步管道,我不能使用同步解决方案.

    答案需要证明或允许管道保持开放的事实.我已经厌倦了阅读管道打开的示例,传输了一个字符串,然后管道立即关闭.我想要一个答案,假设管道保持打开状态并随机传输大量垃圾,并不断重复.没有挂起.

    基于C#的解决方案

我很遗憾听起来很苛刻,但是经过几天的互联网搜索后,我仍然没有找到一个很好的例子,我不想使用WFC.如果你知道这个答案的细节并且回答得很好,那么这个话题将成为未来几年的真正赢家,我敢肯定.如果我搞清楚的话,我会自己发布答案.

如果您要撰写并说"您需要使用两个管道",请解释原因,以及您如何知道这是真的,因为我读过的任何内容都没有解释为什么会出现这种情况.

谢谢!



1> eric frazer..:

您不必使用两个管道.我在网上发现了很多答案,说明你需要使用两根管子.我四处乱逛,熬夜,尝试再试一次,想出怎么做,这很简单,但你必须把一切都弄好(特别是要按照正确的呼叫顺序),否则它就行不通.另一个技巧是始终确保您有一个未完成的读取通话,否则它也会锁定.在你知道有人在读书之前不要写.除非先设置事件,否则不要启动读取呼叫.那种事.

这是我正在使用的管道类.它可能不够强大,无法处理管道错误,闭包和溢出.

好吧我不知道这里有什么问题,但格式有些偏差!VVVV

namespace Squall
{
    public interface PipeSender
    {
        Task SendCommandAsync(PipeCommandPlusString pCmd);
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/
    public class ClientPipe : BasicPipe
    {
        NamedPipeClientStream m_pPipe;

        public ClientPipe(string szServerName, string szPipeName)
            : base("Client")
        {
            m_szPipeName = szPipeName; // debugging
            m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
            base.SetPipeStream(m_pPipe); // inform base class what to read/write from
        }

        public void Connect()
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server");
            m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout.
            StartReadingAsync();
        }

        // the client's pipe index is always 0
        internal override int PipeId() { return 0; }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/
    public class ServerPipe : BasicPipe
    {
        public event EventHandler GotConnectionEvent;

        NamedPipeServerStream m_pPipe;
        int m_nPipeId;

        public ServerPipe(string szPipeName, int nPipeId)
            : base("Server")
        {
            m_szPipeName = szPipeName;
            m_nPipeId = nPipeId;
            m_pPipe = new NamedPipeServerStream(
                szPipeName,
                PipeDirection.InOut,
                NamedPipeServerStream.MaxAllowedServerInstances,
                PipeTransmissionMode.Message,
                PipeOptions.Asynchronous);
            base.SetPipeStream(m_pPipe);
            m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this);
        }

        static void StaticGotPipeConnection(IAsyncResult pAsyncResult)
        {
            ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe;
            pThis.GotPipeConnection(pAsyncResult);
        }

        void GotPipeConnection(IAsyncResult pAsyncResult)
        {
            m_pPipe.EndWaitForConnection(pAsyncResult);

            Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection");

            if (GotConnectionEvent != null)
            {
                GotConnectionEvent(this, new EventArgs());
            }

            // lodge the first read request to get us going
            //
            StartReadingAsync();
        }

        internal override int PipeId() { return m_nPipeId; }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/

    public abstract class BasicPipe : PipeSender
    {
        public static int MaxLen = 1024 * 1024; // why not
        protected string m_szPipeName;
        protected string m_szDebugPipeName;

        public event EventHandler ReadDataEvent;
        public event EventHandler PipeClosedEvent;

        protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen];

        PipeStream m_pPipeStream;

        public BasicPipe(string szDebugPipeName)
        {
            m_szDebugPipeName = szDebugPipeName;
        }

        protected void SetPipeStream(PipeStream p)
        {
            m_pPipeStream = p;
        }

        protected string FullPipeNameDebug()
        {
            return m_szDebugPipeName + "-" + m_szPipeName;
        }

        internal abstract int PipeId();

        public void Close()
        {
            m_pPipeStream.WaitForPipeDrain();
            m_pPipeStream.Close();
            m_pPipeStream.Dispose();
            m_pPipeStream = null;
        }

        // called when Server pipe gets a connection, or when Client pipe is created
        public void StartReadingAsync()
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync");

            // okay we're connected, now immediately listen for incoming buffers
            //
            byte[] pBuffer = new byte[MaxLen];
            m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t =>
            {
                Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request");

                int ReadLen = t.Result;
                if (ReadLen == 0)
                {
                    Debug.WriteLine("Got a null read length, remote pipe was closed");
                    if (PipeClosedEvent != null)
                    {
                        PipeClosedEvent(this, new EventArgs());
                    }
                    return;
                }

                if (ReadDataEvent != null)
                {
                    ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen));
                }
                else
                {
                    Debug.Assert(false, "something happened");
                }

                // lodge ANOTHER read request
                //
                StartReadingAsync();

            });
        }

        protected Task WriteByteArray(byte[] pBytes)
        {
            // this will start writing, but does it copy the memory before returning?
            return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length);
        }

        public Task SendCommandAsync(PipeCommandPlusString pCmd)
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
            string szSerializedCmd = JsonConvert.SerializeObject(pCmd);
            byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd);
            Task t = WriteByteArray(pSerializedCmd);
            return t;
        }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/

    public class PipeEventArgs
    {
        public byte[] m_pData;
        public int m_nDataLen;

        public PipeEventArgs(byte[] pData, int nDataLen)
        {
            // is this a copy, or an alias copy? I can't remember right now.
            m_pData = pData;
            m_nDataLen = nDataLen;
        }
    }

    /******************************************************************************
     * if we're just going to send a string back and forth, then we can use this
     * class. It it allows us to get the bytes as a string. sort of silly.
     ******************************************************************************/

    [Serializable]
    public class PipeCommandPlusString
    {
        public string m_szCommand;  // must be public to be serialized
        public string m_szString;   // ditto

        public PipeCommandPlusString(string sz, string szString)
        {
            m_szCommand = sz;
            m_szString = szString;
        }

        public string GetCommand()
        {
            return m_szCommand;
        }

        public string GetTransmittedString()
        {
            return m_szString;
        }
    }
}

这是我的管道测试,在一个进程上运行.它也运行在两个进程上,我查了一下

namespace NamedPipeTest
{
    public partial class Form1 : Form
    {
        SynchronizationContext _context;
        Thread m_pThread = null;
        volatile bool m_bDieThreadDie;
        ServerPipe m_pServerPipe;
        ClientPipe m_pClientPipe;

        public Form1()
        {
            InitializeComponent();
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            _context = SynchronizationContext.Current;

            m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0);
            m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent;
            m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent;

            // m_pThread = new Thread(StaticThreadProc);
            // m_pThread.Start( this );
        }

        private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e)
        {
            Debug.WriteLine("Server: Pipe was closed, shutting down");

            // have to post this on the main thread
            _context.Post(delegate
            {
                Close();
            }, null);
        }

        private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e)
        {
            // this gets called on an anonymous thread

            byte[] pBytes = e.m_pData;
            string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length);
            PipeCommandPlusString pCmd = JsonConvert.DeserializeObject(szBytes);
            string szValue = pCmd.GetTransmittedString();

            if (szValue == "CONNECT")
            {
                Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client");
                PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED");
                // fire off an async write
                Task t = m_pServerPipe.SendCommandAsync(pCmdToSend);
            }
        }

        static void StaticThreadProc(Object o)
        {
            Form1 pThis = o as Form1;
            pThis.ThreadProc();
        }

        void ThreadProc()
        {
            m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE");
            m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent;
            m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent;
            m_pClientPipe.Connect();

            PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT");
            int Counter = 1;
            while (Counter++ < 10)
            {
                Debug.WriteLine("Counter = " + Counter);
                m_pClientPipe.SendCommandAsync(pCmd);
                Thread.Sleep(3000);
            }

            while (!m_bDieThreadDie)
            {
                Thread.Sleep(1000);
            }

            m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent;
            m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent;
            m_pClientPipe.Close();
            m_pClientPipe = null;
        }

        private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e)
        {
            // wait around for server to shut us down
        }

        private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e)
        {
            byte[] pBytes = e.m_pData;
            string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen);
            PipeCommandPlusString pCmd = JsonConvert.DeserializeObject(szBytes);
            string szValue = pCmd.GetTransmittedString();

            Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());

            if (szValue == "CONNECTED")
            {
                PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA");
                m_pClientPipe.SendCommandAsync(pCmdToSend);
            }
        }

        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (m_pThread != null)
            {
                m_bDieThreadDie = true;
                m_pThread.Join();
                m_bDieThreadDie = false;
            }

            m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent;
            m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent;
            m_pServerPipe.Close();
            m_pServerPipe = null;

        }
    }
}

推荐阅读
女女的家_747
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有