我是Netty的新手.
我正在寻找一些样品.(优选但不是必须使用Camel Netty Component和Spring)
特别是一个使用TCP消息的示例Netty应用程序.
另外,我如何编写可以测试这个netty应用程序的JUnit测试?
谢谢,Dar
我假设您仍想与Camel集成.我先来看看骆驼文档.在此之后,你需要开始尝试.我有一个例子,我创建了一个Camel处理器作为Netty服务器.Netty组件的工作原理是From端点是消耗的服务器,To端点是生成的客户端.我需要一个To端点,它是一个服务器,组件不支持它.我简单地将Camel Processor实现为一个Spring bean,它在初始化时启动了Netty Server.在JBoss的Netty的文档和示例都非常好,虽然.值得一提的是.
这是我瘦弱的例子.它是一个向所有连接的客户端发送消息的服务器.如果您是Netty的新手,我强烈建议您浏览我上面链接的示例:
public class NettyServer implements Processor { private final ChannelGroup channelGroup = new DefaultChannelGroup(); private NioServerSocketChannelFactory serverSocketChannelFactory = null; private final ExecutorService executor = Executors.newCachedThreadPool(); private String listenAddress = "0.0.0.0"; // overridden by spring-osgi value private int listenPort = 51501; // overridden by spring-osgi value @Override public void process(Exchange exchange) throws Exception { byte[] bytes = (byte[]) exchange.getIn().getBody(); // send over the wire sendMessage(bytes); } public synchronized void sendMessage(byte[] message) { ChannelBuffer cb = ChannelBuffers.copiedBuffer(message); //writes to all clients connected. this.channelGroup.write(cb); } private class NettyServerHandler extends SimpleChannelUpstreamHandler { @Override public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelOpen(ctx, e); //add client to the group. NettyServer.this.channelGroup.add(e.getChannel()); } // Perform an automatic recon. @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { super.channelConnected(ctx, e); // do something here when a clien connects. } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { // Do something when a message is received... } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { // Log the exception/ } } private class PublishSocketServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { // need to set the handler. return Channels.pipeline(new NettyServerHandler()); } } // called by spring to start the server public void init() { try { this.serverSocketChannelFactory = new NioServerSocketChannelFactory(this.executor, this.executor); final ServerBootstrap serverBootstrap = new ServerBootstrap(this.serverSocketChannelFactory); serverBootstrap.setPipelineFactory(new PublishSocketServerPipelineFactory()); serverBootstrap.setOption("reuseAddress", true); final InetSocketAddress listenSocketAddress = new InetSocketAddress(this.listenAddress, this.listenPort); this.channelGroup.add(serverBootstrap.bind(listenSocketAddress)); } catch (Exception e) { } } // called by spring to shut down the server. public void destroy() { try { this.channelGroup.close(); this.serverSocketChannelFactory.releaseExternalResources(); this.executor.shutdown(); } catch (Exception e) { } } // injected by spring public void setListenAddress(String listenAddress) { this.listenAddress = listenAddress; } // injected by spring public void setListenPort(int listenPort) { this.listenPort = listenPort; }
}