基于netty写的网络通信框架

最近在做一个项目,用到了远程调用方式,开始采用的是rmi,后来经过测试,rmi可能无法达到项目的一些性能上的要求,于是采用了基于tcp/udp的netty,但是直接用netty开发,有些麻烦了,我们想把服务抽取出来部署在远程服务器上,开发的兄弟们只是在自己的项目中负责调用一下,就跟rmi类似,非常方便。

但是又有一个问题,调用的兄弟需要在web中请求这种tcp服务,netty内部是异步处理机制,http是伪长连接,调用结束后,异步请求还没有返回,http连接就断开了,返回的是null。所以这个问题要解决一下。

下面说下封装的各个类的代码吧

首先当客户端对远程服务器发起tcp请求时,这时候请求一般会到达服务器端的handler里,我写的这个handler继承了netty的SimpleChannelUpstreamHandler,代码如下:

public abstract class ChannelServerHandler extends SimpleChannelUpstreamHandler {
	private final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelServerHandler.class);
	protected final Map<String, InvokeHandler> handlers = new HashMap<String, InvokeHandler>();
	protected final Map<String, Method> initMethods = new HashMap<String, Method>();

	public ChannelServerHandler() {
		WSCFInit.register(handlers, initMethods);
	}
	
	protected abstract void processor(Channel channel, Object message);


	@Override
	public final void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		Transport t = (Transport) e.getMessage();
		String className = t.getClazz();
		String methodName = t.getMethod();
		logger.info("Invoke Handler:" + className + ", Invoke Method:" + methodName);
		processor(ctx.getChannel(), t);
	}

里面有几个变量需要解释一下,handlers是开发tcp服务端的handler存放的map,initMethods是里面需要调用的方法,通过WSCFInit类来进行初始化工作。

它主要做了如下工作,在服务器端Server启动的时候,扫描固定包下的handler和他们的方法,然后以clazz+method的方式存放在handlers和initMthods这两个map中。

Reflections reflections = new Reflections("packagename");
			Set<Class<?>> annotated = reflections.getTypesAnnotatedWith((Class<? extends Annotation>) annClass);
			Iterator<Class<?>> it = annotated.iterator();
			while (it.hasNext()) {
				Class<?> next = it.next();
				if (next.isAnnotationPresent(Handler.class)) {
					Annotation ann = (Annotation) next.getAnnotation((Class<? extends Annotation>) annClass);
					handlers.put(((Handler) ann).name(), (InvokeHandler) next.newInstance());

					Method[] methods = next.getDeclaredMethods();
					for (Method method : methods) {
						if (method.isAnnotationPresent(Remote.class)) {
							Remote path = method.getAnnotation(Remote.class);
							initMethods.put(((Handler) ann).name() + path.url(), method);
						}
					}

				}
			}

protectedabstractvoidprocessor(Channelchannel,Objectmessage);这个方法具体的逻辑是由它的子类来处理的。

再看一下ServerHandler类里面processor的代码,这个类继承了ChannelServerHandler

@Override
	protected void processor(Channel channel, Object message) {
		Transport transport = (Transport) message;

		InvokeHandler handler = handlers.get(transport.getClazz());

		Object[] params = (Object[]) transport.getMessage();
		Object ret = null;
		try {
			Method method = initMethods.get(transport.getClazz() + transport.getMethod());
			if (method == null) {
			} else {
				ret = method.invoke(handler, params);
			}
			ServerSender sender = new ServerSender(channel, transport);
			sender.send(ret);
		} catch (Exception e) {
			throw new IllegalAccessError(e.getMessage());
		}

	}

客户端向服务器端发起的请求真正处理的逻辑在这个方法里面,这个方法在处理完调用了相应的服务端handler进行响应后,会将需要返回给客户端的信息封装在transport这个对象然后传递出去,这个对象是封装服务器和客户端通信消息的。

那么Transport这个类定义了些什么内容呢

public final class Transport implements Serializable {

	private static final long serialVersionUID = 1675991188209117209L;
	private String clazz;
	private String method;
	private Object message;
	private String key;

clazz是要调用的handler的注解名,method是要调用的方法的注解名,message是封装通信消息的,key这个后面再说,是可以多次调用重复的url进行的唯一标识。

ok,处理服务端信息的handler看完了,我们再来看看客户端的

public abstract class ChannelClientHandler extends SimpleChannelUpstreamHandler {
	private final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelClientHandler.class);
	public final Map<String, ResultHandler> ret = new ConcurrentHashMap<String, ResultHandler>();

	protected abstract void processor(Channel channel, Object message);

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		processor(ctx.getChannel(), e.getMessage());
	}
	

}

ret是封装服务器端返回结果的,它的子类负责实现processor方法。

子类代码如下

@Override
	protected void processor(Channel channel, Object message) {
		Transport t = (Transport) message;
		String key = t.getClazz() + t.getMethod() + t.getKey();
		ResultHandler r = ret.remove(key);
		r.processor(t.getMessage());
		
	}

ResultHandler这个是我定义的一个接口,专门处理异步返回的结果,可以通过匿名函数调用

public interface ResultHandler<T> {
	
	public void processor(T message);

}

上面说了如何处理netty内部的异步机制,让主线程能够等待异步返回的结果

处理代码如下

public Object get(String url, Object... params) {
		class Result {
			public Object o;
		}

		final Result ret = new Result();
		synchronized (ret) {
			try {
				invoke(url, params, new ResultHandler() {
					@Override
					public void processor(Object message) {
						synchronized (ret) {
							ret.o = message;
							ret.notify();
						}
					}
				});
				ret.wait();
			} catch (InterruptedException e) {
			} 
			return ret.o;
		}
	}

当客户端调用get方法时候,就可以得到服务器端异步返回的结果了。但是对客户端来说,他感觉到的是同步的调用。

最后我定义了一个InvokeHandler,当开发者开发服务端程序时候,需要实现这个接口,定义自己的handler

类似如下

@Handler(name="testhandler")
public class Server1 implements InvokeHandler  
{  
	@Remote(url="test2")
	public String say(String msg) {
		System.out.println(msg);
		return "hi";
	}
	
	@Remote(url= "test2")
	public String say2(Person p) {
		System.out.println(p.getId());
		return p.getName();
	}
  
}

上面定义的这些注解,在WSCFInit初始化的时候会放到一个map里面,类似于spring的配置文件。

最后再说说客户端是怎么调用的,在连接好服务端ip和port后,通过如下调用方式就可以了

public class Client {
	private static ClientSender sender;
	
	public static void main(String[] args) {
		sender = ClientProxy.connect(ip, port);
		Object msg = sender.get("tcp://testhandler/test1", "hello");
		System.out.println(msg);

		Person p = new Person();
		p.setId(1);
		p.setName("zhangsan");
		Object o = sender.get("tcp://testhandler/test2", p);
		System.out.println(o);


	}
}

这样就ok了,也可以自定义要传输的是对象还是xml还是json。同时可以方便的定义自己的解码器。完成自己的业务需求。

接上文,这个服务是基于netty的,每connect一次,就会在服务器上建立一个tcp连接,就是一对pipe,如果不及时释放,那么建立的pipe会越来越多,严重浪费服务器的资源。但是如果释放了,就失去了tcp长连接的作用了。所以折中一下,为了减少连接数,保证客户端的固定连接,服务端不变,在客户端加入连接池功能。

public synchronized ClientSender getClientSender() {
		Channel channel = getChannel();
		if(!channel.isOpen()) {
			connectPool.remove(channel);
			channel = createConnect(address, port);
			connectPool.addLast(channel);
		}
		return new ClientSender(channel, handler.ret, this);
	}

ok,这样就可以用到连接池功能了。每个客户端可以用到固定连接数了。

那么客户端调用的时候需要自己动手创建ConnectPool了

public class ClientProxy {
	private static ConnectPool pool = null;

	public static void connect(String address, int port) {
		if (pool == null) {
			synchronized (ClientProxy.class) {
				if (pool == null) {
					pool = ConnectPool.createProxy(address, port);
				}
			}
		}
	}

	public static Object get(String url, Object... params) {
		if(pool == null) {
			throw new IllegalStateException("must invoke connect method first");
		}
		
		ClientSender sender = pool.getClientSender();
		Object msg =  sender.get(url, params);
		sender.free(); //归还连接
		return msg;
	}

}

调用方式就不再是上面那样了,要如下调用:

ClientProxy.connect(ip, port);
		Person p = new Person();
		p.setId(12);
		p.setName("zhangsan");
		
		Object msg =  ClientProxy.get("tcp://server/test1", p);
		
		System.out.println(msg);

来看下控制台

Jun 19, 2012 3:24:06 PM com.qunar.wscf.pool.ConnectPool
INFO: 当前连接池中连接数量:5
Jun 19, 2012 3:24:06 PM com.qunar.wscf.pool.ConnectPool
INFO: 连接池中剩余连接数量:4
zhangsan

上次加了连接池功能,后来又提出了一个需求,就是,原来是主线程一直在等待异步线程返回,如果没有返回,主线程就阻塞了,进行不下去。后来发现这个太受限制了。主线程可以先边做自己的事情边等待异步线程处理,符合nio的事件处理机制。于是要更改一下,

private static class Result {
		Object obj;
	}

	final Result f = new Result();
	private volatile boolean hasNotified = false; //异步线程是否结束的标志

	public void preGet(String url, Object... params) {
		invoke(url, params, new ResultHandler() {
			@Override
			public void processor(Object message) {
				synchronized (f) {
					f.obj = message;
					f.notify(); //异步线程结束,唤醒主线程
					hasNotified = true;
				}
			}
		});
	}

	public Object get() {
		synchronized (f) {
			if (!hasNotified) //如果异步线程没有结束,则主线程等待
				try {
					f.wait();
				} catch (InterruptedException e) {
					throw new RuntimeException(e);
				}
		}
		free(); // 释放连接
		return f.obj;
	}

测试如下:

ClientProxy.connect(ip, port);
		Person p = new Person();
		p.setId(12);
		p.setName("zhangsan");
		
		Future future =  ClientProxy.get("tcp://server/test1", p);
		String str = "hello "; //主线程继续做自己的事情,边做边等待异步返回
		
		System.out.println(str + future.get());

最后打印结果

Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool
INFO: 当前连接池中连接数量:5
Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool
INFO: 连接池中剩余连接数量:4
Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool
INFO: 归还连接后连接池中连接的数量:5
hello zhangsan

还有一种情况,主线程等待一段时间后,在规定时间内没有返回,主线程就不等待了。

所以代码改造下,加入超时功能

public Object get() {
		return get(0);
	}
	
	public Object get(long timeout) {
		synchronized (f) {
			if (!hasNotified)
				try {
					f.wait(timeout); //主线程等待一定时间
				} catch (InterruptedException e) {
					throw new IllegalStateException(e);
				}
		}
		free(); // 释放连接
		return f.obj;
	}

客户端调用,比如1个毫秒没返回,就不再等待了,主线程可以自行处理。

相关推荐