Netty中使用Apache Common FileUpload

/**
 * 用Netty来实现上传
 */
public class NettyFileUpload extends FileUpload {

	private NettyRequestContext context;

	public static final boolean isMultipartContent(HttpRequest request) {
		if (HttpMethod.POST != request.getMethod()) {
			return false;
		}
		if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) {
			return false;
		}
		String contentType = request.getHeaders("Content-Type").get(0);
		if (contentType == null) {
			return false;
		}
		if (contentType.toLowerCase().startsWith("multipart/")) {
			return true;
		}
		return false;
	}

	public NettyFileUpload(NettyRequestContext context) {
		this.context = context;
	}

	public NettyFileUpload(FileItemFactory fileItemFactory) {
		super(fileItemFactory);
	}

	public FileItemIterator getItemIterator() throws FileUploadException, IOException {
		return super.getItemIterator(context);
	}
}


public class NettyRequestContext implements RequestContext {
	private String encoding;
	private String contentType;
	private int contentLength = -1;
	/**
	 * 上传的内容流
	 */
	private InputStream inputStream;
	public NettyRequestContext(String encoding, String contentType,
 int contentLength, InputStream inputStream) {
		this.encoding = encoding;
		this.contentType = contentType;
		this.contentLength = contentLength;
		this.inputStream = inputStream;
	}
	@Override
	public String getCharacterEncoding() {
		return encoding;
	}
	@Override
	public String getContentType() {
		return contentType;
	}
	@Override
	public int getContentLength() {
		return contentLength;
	}
	@Override
	public InputStream getInputStream() throws IOException {
		// 不能直接用request的流,因为有HttpChunk
		return inputStream;
	}
	@Override
	public String toString() {
		return "ContentLength=" + this.getContentLength() + ", ContentType="
				+ this.getContentType();
	}

	public void closeInputStream() throws IOException {
		getInputStream().close();
	}
}


public class NettyChunkInputStream extends InputStream {

	private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128);

	private HttpChunk currentChunk = null;

	private volatile boolean closed;

	public boolean putChunk(HttpChunk chunk) throws IOException {
		if (!closed) {
			try {
				chunkQueue.put(chunk);
			} catch (InterruptedException e) {
				throw new IOException(e);
			}
			return true;
		}
		throw new IOException(" this inputstream has been closed!");

	}

	@Override
	public int read() throws IOException {
		byte resultByte = -1;
		try {
			if (getChunk().getContent().readable()) {
				resultByte = getChunk().getContent().readByte();
			} else if (!getChunk().isLast()) {
				nextChunk();
				if (getChunk().getContent().readable()) {
					resultByte = getChunk().getContent().readByte();
				} else {
					return -1;
				}
			} else {
				return -1;
			}
		} catch (InterruptedException e) {
			throw new IOException(e);
		}
		// InputStream.read()返回0-255之间的int
		return resultByte >= 0 ? resultByte : 256 + resultByte;
	}

	private HttpChunk getChunk() throws InterruptedException {
		if (currentChunk == null) {
			currentChunk = chunkQueue.take();
		}

		return currentChunk;
	}

	private void nextChunk() throws InterruptedException {
		currentChunk = chunkQueue.take();
	}

	@Override
	public int available() throws IOException {
		throw new UnsupportedOperationException("unsupport available()");
	}

	@Override
	public void close() throws IOException {
		chunkQueue = null;
		closed = true;
	}

	public boolean isClosed() {
		return closed;
	}

}

应用:

public class NettyUploadHandler extends SimpleChannelUpstreamHandler {
	private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32);
	private boolean hasReadChunk;
	private NettyChunkInputStream chunkStream = new NettyChunkInputStream();
	private NettyRequestContext context;

	private volatile Map<String, String> resultMap = null;

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		if (!hasReadChunk) {
			handleHttpRequest(ctx, e);
		} else {
			handleHttpChunk(e);
		}
	}

	private void handleHttpRequest(ChannelHandlerContext ctx, MessageEvent e) throws IOException {
		HttpRequest request = (HttpRequest) e.getMessage();
		if (isUploadFile(request)) {
			handleUploadRequest(request);
		} else {
			ctx.sendUpstream(e);
		}
	}

	private void handleUploadRequest(HttpRequest request) throws IOException {
		context = new NettyRequestContext("UTF-8", request.getHeader("Content-Type"), -1, chunkStream);
		if (request.isChunked()) {
			hasReadChunk = true;
		} else {
			HttpChunk chunk = new DefaultHttpChunk(request.getContent());
			chunkStream.putChunk(chunk);
		}
		startUpload();
	}

	private void handleHttpChunk(MessageEvent e) throws IOException {

		if (isUploadFinished()) {
			writeResult(e.getChannel());
			return;
		}
		HttpChunk chunk = (HttpChunk) e.getMessage();
		chunkStream.putChunk(chunk);

		if (chunk.isLast()) {
			for (;;) {
				if (isUploadFinished()) {
					writeResult(e.getChannel());
					return;
				}
			}
		}
	}

	private boolean isUploadFinished() {
		return resultMap != null || chunkStream.isClosed();
	}

	private boolean isUploadFile(HttpRequest request) {
		return request.getUri().equals("/upload/uploadfile") && NettyFileUpload.isMultipartContent(request);
	}

	private void startUpload() {
		EXECUTOR.execute(new UploadTask());
	}

	private void writeResult(Channel channel) {
		String json = JsonUtil.beanToJson(resultMap);
		byte[] data = json.getBytes();
		ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
		HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
		response.setContent(buffer);
		response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
		response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes()));
		channel.write(response);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
		e.getCause().printStackTrace();
	}

	class UploadTask implements Runnable {

		public UploadTask() {
			super();
		}

		@Override
		public void run() {
			long start = System.currentTimeMillis();

			try {

				NettyFileUpload upload = new NettyFileUpload(context);
				FileItemIterator iter = upload.getItemIterator();

				while (iter.hasNext()) {
					FileItemStream item = iter.next();
					//这里处理逻辑

				}
				resultMap = handler.getResult();
				context.closeInputStream();
				long end = System.currentTimeMillis();
				System.out.println("spend time : " + (end - start));

			} catch (Exception e) {
				e.printStackTrace();
			}
		}

	}
}

该NettyChunkInputStream必须一个线程来putChunk(...),另一个线程使用getInputStream()来消耗数据。

PS:可以在NettyChunkInputStream中重写InputStream.read(bs,offset,len),避免每次调用read()都进行边界判断,使之效率更高。

相关推荐