Netty入门基础
熟悉使用netty,写博客备忘
1.首先写服务端代码,依赖的类后面体现
public class MessageServer {
public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageServer start..");
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// 与下面的addLast操作选择一个即可
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());
//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageServerHandler());
// Bind and start to accept incoming connections
bootstrap.bind(new InetSocketAddress("localhost",9550));
}
}
2.相应的客户端:
public class MessageClient {
public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageClient start..");
start();
}
private static void start() {
String host = "127.0.0.1";
int port = 9550;
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
// 不能既有MessageServerPipelineFactory,又有bootstrap.getPipeline().addLast()这个方法
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());
//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageClientHandler());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit
future.getChannel().write("--------Shut down thread pools to exit: future getChannel");
//release resource
bootstrap.releaseExternalResources();
}
}
3.服务端的handler
public class MessageServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageServerHandler.class);
private final AtomicLong transferredBytes = new AtomicLong();
@Override
public void messageReceived(ChannelHandlerContext cxt, MessageEvent event){
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// /ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
transferredBytes.addAndGet(((ChannelBuffer) event.getMessage()).readableBytes());
System.out.println("MessageServerHandler : messageReceived, length = "
+ ((ChannelBuffer) event.getMessage()).readableBytes()
+ transferredBytes.byteValue());
event.getChannel().write(event.getMessage());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
}
@Override
public void exceptionCaught(ChannelHandlerContext cxt, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
logger.log(Level.WARNING, "******ServerHandler: exceptionCaught", event.getCause());
System.out.println("ServerHandler: exceptionCaught ******" + event.getCause().getLocalizedMessage());
event.getChannel().close();
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelOpen, channel id = " + event.getChannel().getId());
super.channelOpen(ctx, event);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelConnected, context name = " + ctx.getName());
super.channelConnected(ctx, e);
}
}
4.客户端的handler
public class MessageClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageClientHandler.class);
//private ChannelBuffer responseBuffer = ChannelBuffers.dynamicBuffer();
private static byte[] content = {'A','2','3','4','5','7','8','9','e','a','b','c','d','f','g','h','i','j'};
@Override
public void channelConnected(ChannelHandlerContext cxt, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
//System.out.println("当连接到服务器的时候,就开始发送256字节的字符串");
logger.info("channelConnected: client try to connect server...");
//当messageReceived接收到服务器的消息时,又把消息发送给服务器
event.getChannel().write(nextMessage());//this.getTestString(256)
super.channelConnected(cxt, event);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
//System.out.println("Client: messageReceived , buffer.readableBytes = " + buffer.readableBytes());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
event.getChannel().write(event.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
//logger.log(Level.WARNING, "CLIENT: exceptionCaught " , event.getCause());
System.out.println(event.getCause().getMessage());
ctx.getChannel().close();
}
public static String getTestString(int size) {
ConstantStep.getInsatnce().printStep();
StringBuilder sb = new StringBuilder(20);
for(int i=0; i<size; i++) {
sb.append("A");
}
return sb.toString();
}
public static ChannelBuffer nextMessage() {
ConstantStep.getInsatnce().printStep();
return ChannelBuffers.wrappedBuffer(content);
}
}
5.MessageServerPipelineFactory:
public class MessageServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//在链条 加上各种处理器 ,注意顺序
pipeline.addLast("handler", new MessageServerHandler());
pipeline.addLast("decoder", new ServerMessageDecoder());
pipeline.addLast("encoder", new ServerMessageEncoder());
return pipeline;
}
}
6.类似的MessageClientPipelineFactory
public class MessageClientPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler", new MessageClientHandler());
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
return pipeline;
}
}
7.ServerMessageDecoder:
public class ServerMessageDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("ServerDDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}
8.ServerMessageEncoder:
public class ServerMessageEncoder extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("Server EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}
9.MessageEncoder:
public class MessageEncoder extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}
10:MessageDecoder:
public class MessageDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);//测试用
System.out.println("DDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}
11:ConstantStep
public class ConstantStep {
private int step = 0;
private static ConstantStep instance = new ConstantStep();
private ConstantStep() {
}
public synchronized static ConstantStep getInsatnce(){//
return instance;
}
public synchronized void printStep() {
System.out.println(" now step : " + step++);
}
}