Netty中使用Apache Common FileUpload
/** * 用Netty来实现上传 */ public class NettyFileUpload extends FileUpload { private NettyRequestContext context; public static final boolean isMultipartContent(HttpRequest request) { if (HttpMethod.POST != request.getMethod()) { return false; } if (request.getHeaders("Content-Type") == null && request.getHeaders("Content-Type").size() == 0) { return false; } String contentType = request.getHeaders("Content-Type").get(0); if (contentType == null) { return false; } if (contentType.toLowerCase().startsWith("multipart/")) { return true; } return false; } public NettyFileUpload(NettyRequestContext context) { this.context = context; } public NettyFileUpload(FileItemFactory fileItemFactory) { super(fileItemFactory); } public FileItemIterator getItemIterator() throws FileUploadException, IOException { return super.getItemIterator(context); } } public class NettyRequestContext implements RequestContext { private String encoding; private String contentType; private int contentLength = -1; /** * 上传的内容流 */ private InputStream inputStream; public NettyRequestContext(String encoding, String contentType, int contentLength, InputStream inputStream) { this.encoding = encoding; this.contentType = contentType; this.contentLength = contentLength; this.inputStream = inputStream; } @Override public String getCharacterEncoding() { return encoding; } @Override public String getContentType() { return contentType; } @Override public int getContentLength() { return contentLength; } @Override public InputStream getInputStream() throws IOException { // 不能直接用request的流,因为有HttpChunk return inputStream; } @Override public String toString() { return "ContentLength=" + this.getContentLength() + ", ContentType=" + this.getContentType(); } public void closeInputStream() throws IOException { getInputStream().close(); } } public class NettyChunkInputStream extends InputStream { private BlockingQueue<HttpChunk> chunkQueue = new ArrayBlockingQueue<HttpChunk>(128); private HttpChunk currentChunk = null; private volatile boolean closed; public boolean putChunk(HttpChunk chunk) throws IOException { if (!closed) { try { chunkQueue.put(chunk); } catch (InterruptedException e) { throw new IOException(e); } return true; } throw new IOException(" this inputstream has been closed!"); } @Override public int read() throws IOException { byte resultByte = -1; try { if (getChunk().getContent().readable()) { resultByte = getChunk().getContent().readByte(); } else if (!getChunk().isLast()) { nextChunk(); if (getChunk().getContent().readable()) { resultByte = getChunk().getContent().readByte(); } else { return -1; } } else { return -1; } } catch (InterruptedException e) { throw new IOException(e); } // InputStream.read()返回0-255之间的int return resultByte >= 0 ? resultByte : 256 + resultByte; } private HttpChunk getChunk() throws InterruptedException { if (currentChunk == null) { currentChunk = chunkQueue.take(); } return currentChunk; } private void nextChunk() throws InterruptedException { currentChunk = chunkQueue.take(); } @Override public int available() throws IOException { throw new UnsupportedOperationException("unsupport available()"); } @Override public void close() throws IOException { chunkQueue = null; closed = true; } public boolean isClosed() { return closed; } }
应用:
public class NettyUploadHandler extends SimpleChannelUpstreamHandler { private static ExecutorService EXECUTOR = Executors.newFixedThreadPool(32); private boolean hasReadChunk; private NettyChunkInputStream chunkStream = new NettyChunkInputStream(); private NettyRequestContext context; private volatile Map<String, String> resultMap = null; @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { if (!hasReadChunk) { handleHttpRequest(ctx, e); } else { handleHttpChunk(e); } } private void handleHttpRequest(ChannelHandlerContext ctx, MessageEvent e) throws IOException { HttpRequest request = (HttpRequest) e.getMessage(); if (isUploadFile(request)) { handleUploadRequest(request); } else { ctx.sendUpstream(e); } } private void handleUploadRequest(HttpRequest request) throws IOException { context = new NettyRequestContext("UTF-8", request.getHeader("Content-Type"), -1, chunkStream); if (request.isChunked()) { hasReadChunk = true; } else { HttpChunk chunk = new DefaultHttpChunk(request.getContent()); chunkStream.putChunk(chunk); } startUpload(); } private void handleHttpChunk(MessageEvent e) throws IOException { if (isUploadFinished()) { writeResult(e.getChannel()); return; } HttpChunk chunk = (HttpChunk) e.getMessage(); chunkStream.putChunk(chunk); if (chunk.isLast()) { for (;;) { if (isUploadFinished()) { writeResult(e.getChannel()); return; } } } } private boolean isUploadFinished() { return resultMap != null || chunkStream.isClosed(); } private boolean isUploadFile(HttpRequest request) { return request.getUri().equals("/upload/uploadfile") && NettyFileUpload.isMultipartContent(request); } private void startUpload() { EXECUTOR.execute(new UploadTask()); } private void writeResult(Channel channel) { String json = JsonUtil.beanToJson(resultMap); byte[] data = json.getBytes(); ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); response.setContent(buffer); response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8"); response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buffer.readableBytes())); channel.write(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { e.getCause().printStackTrace(); } class UploadTask implements Runnable { public UploadTask() { super(); } @Override public void run() { long start = System.currentTimeMillis(); try { NettyFileUpload upload = new NettyFileUpload(context); FileItemIterator iter = upload.getItemIterator(); while (iter.hasNext()) { FileItemStream item = iter.next(); //这里处理逻辑 } resultMap = handler.getResult(); context.closeInputStream(); long end = System.currentTimeMillis(); System.out.println("spend time : " + (end - start)); } catch (Exception e) { e.printStackTrace(); } } } }
该NettyChunkInputStream必须一个线程来putChunk(...),另一个线程使用getInputStream()来消耗数据。
PS:可以在NettyChunkInputStream中重写InputStream.read(bs,offset,len),避免每次调用read()都进行边界判断,使之效率更高。
相关推荐
javamagicsun 2019-11-10
shumark 2014-07-07
RoyKings 2015-10-31
Sweetdream 2013-02-24
牧场SZShepherd 2012-03-20
南鹏飞技术 2016-06-11
ApachePHPMySQL 2012-02-15
dinux 2016-01-19
Andrewtao00 2011-10-23
shangsoft 2011-10-22
ziyifengfei 2015-07-28
ErixHao 2011-03-30
jacky的部落 2011-03-29
zrtlin 2014-07-24
jackyzhuyuanlu 2014-07-07
delmarks 2013-12-20