我在多线程TCP服务器中遇到了互锁Monitor.Wait和Monitor.Pulse的问题.为了演示我的问题,这是我的服务器代码:
public class Server { TcpListener listener; Object sync; IHandler handler; bool running; public Server(IHandler handler, int port) { this.handler = handler; IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; listener = new TcpListener(address, port); sync = new Object(); running = false; } public void Start() { Thread thread = new Thread(ThreadStart); thread.Start(); } public void Stop() { lock (sync) { listener.Stop(); running = false; Monitor.Pulse(sync); } } void ThreadStart() { if (!running) { listener.Start(); running = true; lock (sync) { while (running) { try { listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); Monitor.Wait(sync); // Release lock and wait for a pulse } catch (Exception e) { Console.WriteLine(e.Message); } } } } } void Accept(IAsyncResult result) { // Let the server continue listening lock (sync) { Monitor.Pulse(sync); } if (running) { TcpListener listener = (TcpListener)result.AsyncState; using (TcpClient client = listener.EndAcceptTcpClient(result)) { handler.Handle(client.GetStream()); } } } }
这是我的客户端代码:
class Client { class EchoHandler : IHandler { public void Handle(Stream stream) { System.Console.Out.Write("Echo Handler: "); StringBuilder sb = new StringBuilder(); byte[] buffer = new byte[1024]; int count = 0; while ((count = stream.Read(buffer, 0, 1024)) > 0) { sb.Append(Encoding.ASCII.GetString(buffer, 0, count)); } System.Console.Out.WriteLine(sb.ToString()); System.Console.Out.Flush(); } } static IPAddress localhost = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; public static int Main() { Server server1 = new Server(new EchoHandler(), 1000); Server server2 = new Server(new EchoHandler(), 1001); server1.Start(); server2.Start(); Console.WriteLine("Press return to test..."); Console.ReadLine(); // Note interleaved ports SendMsg("Test1", 1000); SendMsg("Test2", 1001); SendMsg("Test3", 1000); SendMsg("Test4", 1001); SendMsg("Test5", 1000); SendMsg("Test6", 1001); SendMsg("Test7", 1000); Console.WriteLine("Press return to terminate..."); Console.ReadLine(); server1.Stop(); server2.Stop(); return 0; } public static void SendMsg(String msg, int port) { IPEndPoint endPoint = new IPEndPoint(localhost, port); byte[] buffer = Encoding.ASCII.GetBytes(msg); using (Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)) { s.Connect(endPoint); s.Send(buffer); } } }
客户端发送七条消息,但服务器只打印四条:
Press return to test... Press return to terminate... Echo Handler: Test1 Echo Handler: Test3 Echo Handler: Test2 Echo Handler: Test4
我怀疑显示器在Pulse
发生Accept
之前Wait
(在ThreadStart
方法中)允许发生(在服务器的方法中),即使ThreadStart
在sync
它调用之前仍然应该对对象进行锁定Monitor.Wait()
,然后该Accept
方法可以获取锁定并发送它Pulse
.如果您在服务器的Stop()
方法中注释掉这两行:
//listener.Stop(); //running = false;
Stop()
调用服务器的方法时会显示剩余的消息(即唤醒服务器的sync
对象会导致其分派剩余的传入消息).在我看来,这只能在ThreadStart
和Accept
方法之间的竞争条件下发生,但是sync
对象周围的锁定应该阻止这种情况.
有任何想法吗?
非常感谢,西蒙.
PS.请注意,我知道输出显示无序等,我特别询问锁和监视器之间的竞争条件.干杯,SH.
问题是你使用Pulse/Wait作为信号.正确的信号(例如AutoResetEvent)具有一种状态,使其在线程调用WaitOne()之前保持信号状态.在没有任何线程等待的情况下调用Pulse将成为noop.
这与可以通过同一线程多次锁定的事实相结合.由于您使用的是Async编程,因此可以通过执行BeginAcceptTcpClient的同一线程调用Accept回调.
让我来说明一下.我注释掉了第二台服务器,并更改了服务器上的一些代码.
void ThreadStart() { if (!running) { listener.Start(); running = true; lock (sync) { while (running) { try { Console.WriteLine("BeginAccept [{0}]", Thread.CurrentThread.ManagedThreadId); listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); Console.WriteLine("Wait [{0}]", Thread.CurrentThread.ManagedThreadId); Monitor.Wait(sync); // Release lock and wait for a pulse } catch (Exception e) { Console.WriteLine(e.Message); } } } } } void Accept(IAsyncResult result) { // Let the server continue listening lock (sync) { Console.WriteLine("Pulse [{0}]", Thread.CurrentThread.ManagedThreadId); Monitor.Pulse(sync); } if (running) { TcpListener localListener = (TcpListener)result.AsyncState; using (TcpClient client = localListener.EndAcceptTcpClient(result)) { handler.Handle(client.GetStream()); } } }
我的运行输出如下所示.如果您自己运行此代码,则值会有所不同,但一般情况下它们会相同.
Press return to test... BeginAccept [3] Wait [3] Press return to terminate... Pulse [5] BeginAccept [3] Pulse [3] Echo Handler: Test1 Echo Handler: Test3 Wait [3]
正如您所看到的,有两个Pulse被调用,一个来自一个单独的线程(Pulse [5]),它唤醒了第一个Wait.然后,线程3执行另一个BeginAccept,但具有Pending传入连接,该线程决定立即调用Accept回调.由于Accept由同一个线程调用,因此Lock(sync)不会阻塞,而是立即在空线程队列上阻塞Pulse [3].
调用两个处理程序并处理这两个消息.
一切都很好,ThreadStart再次开始运行并无限期地进入等待状态.
现在,这里的根本问题是您正在尝试将监视器用作信号.因为它不记得第二个脉冲丢失的状态.
但是有一个简单的解决方案.使用AutoResetEvents,这是一个正确的信号,它将记住它的状态.
public Server(IHandler handler, int port) { this.handler = handler; IPAddress address = Dns.GetHostEntry(Dns.GetHostName()).AddressList[0]; listener = new TcpListener(address, port); running = false; _event = new AutoResetEvent(false); } public void Start() { Thread thread = new Thread(ThreadStart); thread.Start(); } public void Stop() { listener.Stop(); running = false; _event.Set(); } void ThreadStart() { if (!running) { listener.Start(); running = true; while (running) { try { listener.BeginAcceptTcpClient(new AsyncCallback(Accept), listener); _event.WaitOne(); } catch (Exception e) { Console.WriteLine(e.Message); } } } } void Accept(IAsyncResult result) { // Let the server continue listening _event.Set(); if (running) { TcpListener localListener = (TcpListener) result.AsyncState; using (TcpClient client = localListener.EndAcceptTcpClient(result)) { handler.Handle(client.GetStream()); } } }