我试图在2台不同的机器(仅)上实现全双工客户端 - 服务器通信方案,其中每个端点(客户端或服务器)可以随时发送内容,异步(非阻塞管道),以及另一端将拿起并阅读它.
我不想将答案转给我除了命名管道之外的任何其他技术,我知道其他技术,但我想要回答这个特定的问题.(我已经在不同的论坛上看过很多次这个问题了,我一直看到回应建议使用其他技术.我觉得这个问题接近于粗鲁?)
我读过Named Pipes只能单向或者锁定,但我猜这可能是错的.我认为管道是基于套接字的,我无法想象底层套接字只是单向的.
这个问题的任何答案都需要解决这些问题才能真正发挥作用:
答案需要解决异步管道,我不能使用同步解决方案.
答案需要证明或允许管道保持开放的事实.我已经厌倦了阅读管道打开的示例,传输了一个字符串,然后管道立即关闭.我想要一个答案,假设管道保持打开状态并随机传输大量垃圾,并不断重复.没有挂起.
基于C#的解决方案
我很遗憾听起来很苛刻,但是经过几天的互联网搜索后,我仍然没有找到一个很好的例子,我不想使用WFC.如果你知道这个答案的细节并且回答得很好,那么这个话题将成为未来几年的真正赢家,我敢肯定.如果我搞清楚的话,我会自己发布答案.
如果您要撰写并说"您需要使用两个管道",请解释原因,以及您如何知道这是真的,因为我读过的任何内容都没有解释为什么会出现这种情况.
谢谢!
您不必使用两个管道.我在网上发现了很多答案,说明你需要使用两根管子.我四处乱逛,熬夜,尝试再试一次,想出怎么做,这很简单,但你必须把一切都弄好(特别是要按照正确的呼叫顺序),否则它就行不通.另一个技巧是始终确保您有一个未完成的读取通话,否则它也会锁定.在你知道有人在读书之前不要写.除非先设置事件,否则不要启动读取呼叫.那种事.
这是我正在使用的管道类.它可能不够强大,无法处理管道错误,闭包和溢出.
好吧我不知道这里有什么问题,但格式有些偏差!VVVV
namespace Squall { public interface PipeSender { Task SendCommandAsync(PipeCommandPlusString pCmd); } /****************************************************************************** * * * * ******************************************************************************/ public class ClientPipe : BasicPipe { NamedPipeClientStream m_pPipe; public ClientPipe(string szServerName, string szPipeName) : base("Client") { m_szPipeName = szPipeName; // debugging m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous); base.SetPipeStream(m_pPipe); // inform base class what to read/write from } public void Connect() { Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server"); m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout. StartReadingAsync(); } // the client's pipe index is always 0 internal override int PipeId() { return 0; } } /****************************************************************************** * * * * ******************************************************************************/ public class ServerPipe : BasicPipe { public event EventHandlerGotConnectionEvent; NamedPipeServerStream m_pPipe; int m_nPipeId; public ServerPipe(string szPipeName, int nPipeId) : base("Server") { m_szPipeName = szPipeName; m_nPipeId = nPipeId; m_pPipe = new NamedPipeServerStream( szPipeName, PipeDirection.InOut, NamedPipeServerStream.MaxAllowedServerInstances, PipeTransmissionMode.Message, PipeOptions.Asynchronous); base.SetPipeStream(m_pPipe); m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this); } static void StaticGotPipeConnection(IAsyncResult pAsyncResult) { ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe; pThis.GotPipeConnection(pAsyncResult); } void GotPipeConnection(IAsyncResult pAsyncResult) { m_pPipe.EndWaitForConnection(pAsyncResult); Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection"); if (GotConnectionEvent != null) { GotConnectionEvent(this, new EventArgs()); } // lodge the first read request to get us going // StartReadingAsync(); } internal override int PipeId() { return m_nPipeId; } } /****************************************************************************** * * * * ******************************************************************************/ public abstract class BasicPipe : PipeSender { public static int MaxLen = 1024 * 1024; // why not protected string m_szPipeName; protected string m_szDebugPipeName; public event EventHandler ReadDataEvent; public event EventHandler PipeClosedEvent; protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen]; PipeStream m_pPipeStream; public BasicPipe(string szDebugPipeName) { m_szDebugPipeName = szDebugPipeName; } protected void SetPipeStream(PipeStream p) { m_pPipeStream = p; } protected string FullPipeNameDebug() { return m_szDebugPipeName + "-" + m_szPipeName; } internal abstract int PipeId(); public void Close() { m_pPipeStream.WaitForPipeDrain(); m_pPipeStream.Close(); m_pPipeStream.Dispose(); m_pPipeStream = null; } // called when Server pipe gets a connection, or when Client pipe is created public void StartReadingAsync() { Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync"); // okay we're connected, now immediately listen for incoming buffers // byte[] pBuffer = new byte[MaxLen]; m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t => { Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request"); int ReadLen = t.Result; if (ReadLen == 0) { Debug.WriteLine("Got a null read length, remote pipe was closed"); if (PipeClosedEvent != null) { PipeClosedEvent(this, new EventArgs()); } return; } if (ReadDataEvent != null) { ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen)); } else { Debug.Assert(false, "something happened"); } // lodge ANOTHER read request // StartReadingAsync(); }); } protected Task WriteByteArray(byte[] pBytes) { // this will start writing, but does it copy the memory before returning? return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length); } public Task SendCommandAsync(PipeCommandPlusString pCmd) { Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); string szSerializedCmd = JsonConvert.SerializeObject(pCmd); byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd); Task t = WriteByteArray(pSerializedCmd); return t; } } /****************************************************************************** * * * * ******************************************************************************/ public class PipeEventArgs { public byte[] m_pData; public int m_nDataLen; public PipeEventArgs(byte[] pData, int nDataLen) { // is this a copy, or an alias copy? I can't remember right now. m_pData = pData; m_nDataLen = nDataLen; } } /****************************************************************************** * if we're just going to send a string back and forth, then we can use this * class. It it allows us to get the bytes as a string. sort of silly. ******************************************************************************/ [Serializable] public class PipeCommandPlusString { public string m_szCommand; // must be public to be serialized public string m_szString; // ditto public PipeCommandPlusString(string sz, string szString) { m_szCommand = sz; m_szString = szString; } public string GetCommand() { return m_szCommand; } public string GetTransmittedString() { return m_szString; } } }
这是我的管道测试,在一个进程上运行.它也运行在两个进程上,我查了一下
namespace NamedPipeTest { public partial class Form1 : Form { SynchronizationContext _context; Thread m_pThread = null; volatile bool m_bDieThreadDie; ServerPipe m_pServerPipe; ClientPipe m_pClientPipe; public Form1() { InitializeComponent(); } private void Form1_Load(object sender, EventArgs e) { _context = SynchronizationContext.Current; m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0); m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent; m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent; // m_pThread = new Thread(StaticThreadProc); // m_pThread.Start( this ); } private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e) { Debug.WriteLine("Server: Pipe was closed, shutting down"); // have to post this on the main thread _context.Post(delegate { Close(); }, null); } private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e) { // this gets called on an anonymous thread byte[] pBytes = e.m_pData; string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length); PipeCommandPlusString pCmd = JsonConvert.DeserializeObject(szBytes); string szValue = pCmd.GetTransmittedString(); if (szValue == "CONNECT") { Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client"); PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED"); // fire off an async write Task t = m_pServerPipe.SendCommandAsync(pCmdToSend); } } static void StaticThreadProc(Object o) { Form1 pThis = o as Form1; pThis.ThreadProc(); } void ThreadProc() { m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE"); m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent; m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent; m_pClientPipe.Connect(); PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT"); int Counter = 1; while (Counter++ < 10) { Debug.WriteLine("Counter = " + Counter); m_pClientPipe.SendCommandAsync(pCmd); Thread.Sleep(3000); } while (!m_bDieThreadDie) { Thread.Sleep(1000); } m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent; m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent; m_pClientPipe.Close(); m_pClientPipe = null; } private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e) { // wait around for server to shut us down } private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e) { byte[] pBytes = e.m_pData; string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen); PipeCommandPlusString pCmd = JsonConvert.DeserializeObject (szBytes); string szValue = pCmd.GetTransmittedString(); Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString()); if (szValue == "CONNECTED") { PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA"); m_pClientPipe.SendCommandAsync(pCmdToSend); } } private void Form1_FormClosing(object sender, FormClosingEventArgs e) { if (m_pThread != null) { m_bDieThreadDie = true; m_pThread.Join(); m_bDieThreadDie = false; } m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent; m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent; m_pServerPipe.Close(); m_pServerPipe = null; } } }