Netty入门基础

熟悉使用netty,写博客备忘

1.首先写服务端代码,依赖的类后面体现

public class MessageServer {

public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageServer start..");
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

// 与下面的addLast操作选择一个即可
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());

//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageServerHandler());

// Bind and start to accept incoming connections
bootstrap.bind(new InetSocketAddress("localhost",9550));
}
}

2.相应的客户端:

public class MessageClient {

public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageClient start..");
start();
}

private static void start() {
String host = "127.0.0.1";
int port = 9550;
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
// 不能既有MessageServerPipelineFactory,又有bootstrap.getPipeline().addLast()这个方法
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());

//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageClientHandler());

// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails
future.getChannel().getCloseFuture().awaitUninterruptibly();

// Shut down thread pools to exit
future.getChannel().write("--------Shut down thread pools to exit: future getChannel");

//release resource
bootstrap.releaseExternalResources();
}
}

3.服务端的handler

public class MessageServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(MessageServerHandler.class);

private final AtomicLong transferredBytes = new AtomicLong();

@Override
public void messageReceived(ChannelHandlerContext cxt, MessageEvent event){
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// /ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
transferredBytes.addAndGet(((ChannelBuffer) event.getMessage()).readableBytes());
System.out.println("MessageServerHandler : messageReceived, length = "
+ ((ChannelBuffer) event.getMessage()).readableBytes()
+ transferredBytes.byteValue());
event.getChannel().write(event.getMessage());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
}

@Override
public void exceptionCaught(ChannelHandlerContext cxt, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
logger.log(Level.WARNING, "******ServerHandler: exceptionCaught", event.getCause());
System.out.println("ServerHandler: exceptionCaught ******" + event.getCause().getLocalizedMessage());
event.getChannel().close();
}

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelOpen, channel id = " + event.getChannel().getId());
super.channelOpen(ctx, event);
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelConnected, context name = " + ctx.getName());
super.channelConnected(ctx, e);
}


}
4.客户端的handler

public class MessageClientHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(MessageClientHandler.class);

//private ChannelBuffer responseBuffer = ChannelBuffers.dynamicBuffer();
private static byte[] content = {'A','2','3','4','5','7','8','9','e','a','b','c','d','f','g','h','i','j'};

@Override
public void channelConnected(ChannelHandlerContext cxt, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
//System.out.println("当连接到服务器的时候,就开始发送256字节的字符串");
logger.info("channelConnected: client try to connect server...");
//当messageReceived接收到服务器的消息时,又把消息发送给服务器
event.getChannel().write(nextMessage());//this.getTestString(256)
super.channelConnected(cxt, event);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
//System.out.println("Client: messageReceived , buffer.readableBytes = " + buffer.readableBytes());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
event.getChannel().write(event.getMessage());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
//logger.log(Level.WARNING, "CLIENT: exceptionCaught " , event.getCause());
System.out.println(event.getCause().getMessage());
ctx.getChannel().close();
}

public static String getTestString(int size) {
ConstantStep.getInsatnce().printStep();
StringBuilder sb = new StringBuilder(20);
for(int i=0; i<size; i++) {
sb.append("A");
}
return sb.toString();
}

public static ChannelBuffer nextMessage() {
ConstantStep.getInsatnce().printStep();
return ChannelBuffers.wrappedBuffer(content);
}
}

5.MessageServerPipelineFactory:

public class MessageServerPipelineFactory implements ChannelPipelineFactory {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//在链条 加上各种处理器 ,注意顺序
pipeline.addLast("handler", new MessageServerHandler());
pipeline.addLast("decoder", new ServerMessageDecoder());
pipeline.addLast("encoder", new ServerMessageEncoder());

return pipeline;
}

}

6.类似的MessageClientPipelineFactory

public class MessageClientPipelineFactory implements ChannelPipelineFactory {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler", new MessageClientHandler());
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());

return pipeline;
}

}

7.ServerMessageDecoder:

public class ServerMessageDecoder extends FrameDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("ServerDDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}

8.ServerMessageEncoder:

public class ServerMessageEncoder extends OneToOneDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("Server EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}

9.MessageEncoder:

public class MessageEncoder extends OneToOneDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}

10:MessageDecoder:

public class MessageDecoder extends FrameDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);//测试用
System.out.println("DDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}

11:ConstantStep

public class ConstantStep {

private int step = 0;

private static ConstantStep instance = new ConstantStep();

private ConstantStep() {

}

public synchronized static ConstantStep getInsatnce(){//
return instance;
}

public synchronized void printStep() {
System.out.println(" now step : " + step++);
}

}

相关推荐