聊聊rocketmq的AsyncAppender
序
本文主要研究一下rocketmq的AsyncAppender
AsyncAppender
org/apache/rocketmq/logging/inner/LoggingBuilder.java
public static class AsyncAppender extends Appender implements Appender.AppenderPipeline { public static final int DEFAULT_BUFFER_SIZE = 128; private final List<LoggingEvent> buffer = new ArrayList<LoggingEvent>(); private final Map<String, DiscardSummary> discardMap = new HashMap<String, DiscardSummary>(); private int bufferSize = DEFAULT_BUFFER_SIZE; private final AppenderPipelineImpl appenderPipeline; private final Thread dispatcher; private boolean blocking = true; public AsyncAppender() { appenderPipeline = new AppenderPipelineImpl(); dispatcher = new Thread(new Dispatcher(this, buffer, discardMap, appenderPipeline)); dispatcher.setDaemon(true); dispatcher.setName("AsyncAppender-Dispatcher-" + dispatcher.getName()); dispatcher.start(); } public void addAppender(final Appender newAppender) { synchronized (appenderPipeline) { appenderPipeline.addAppender(newAppender); } } public void append(final LoggingEvent event) { if ((dispatcher == null) || !dispatcher.isAlive() || (bufferSize <= 0)) { synchronized (appenderPipeline) { appenderPipeline.appendLoopOnAppenders(event); } return; } event.getThreadName(); event.getRenderedMessage(); synchronized (buffer) { while (true) { int previousSize = buffer.size(); if (previousSize < bufferSize) { buffer.add(event); if (previousSize == 0) { buffer.notifyAll(); } break; } boolean discard = true; if (blocking && !Thread.interrupted() && Thread.currentThread() != dispatcher) { try { buffer.wait(); discard = false; } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (discard) { String loggerName = event.getLoggerName(); DiscardSummary summary = discardMap.get(loggerName); if (summary == null) { summary = new DiscardSummary(event); discardMap.put(loggerName, summary); } else { summary.add(event); } break; } } } } public void close() { synchronized (buffer) { closed = true; buffer.notifyAll(); } try { dispatcher.join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); SysLogger.error( "Got an InterruptedException while waiting for the " + "dispatcher to finish.", e); } synchronized (appenderPipeline) { Enumeration iter = appenderPipeline.getAllAppenders(); if (iter != null) { while (iter.hasMoreElements()) { Object next = iter.nextElement(); if (next instanceof Appender) { ((Appender) next).close(); } } } } } public Enumeration getAllAppenders() { synchronized (appenderPipeline) { return appenderPipeline.getAllAppenders(); } } public Appender getAppender(final String name) { synchronized (appenderPipeline) { return appenderPipeline.getAppender(name); } } public boolean isAttached(final Appender appender) { synchronized (appenderPipeline) { return appenderPipeline.isAttached(appender); } } public void removeAllAppenders() { synchronized (appenderPipeline) { appenderPipeline.removeAllAppenders(); } } public void removeAppender(final Appender appender) { synchronized (appenderPipeline) { appenderPipeline.removeAppender(appender); } } public void removeAppender(final String name) { synchronized (appenderPipeline) { appenderPipeline.removeAppender(name); } } public void setBufferSize(final int size) { if (size < 0) { throw new NegativeArraySizeException("size"); } synchronized (buffer) { bufferSize = (size < 1) ? 1 : size; buffer.notifyAll(); } } public int getBufferSize() { return bufferSize; } public void setBlocking(final boolean value) { synchronized (buffer) { blocking = value; buffer.notifyAll(); } } public boolean getBlocking() { return blocking; } }
- 初始化Dispatcher,构造器调用Dispatcher的start,然后close方法调用dispatcher.join()
- append方法会判断buffer够不够,够的话往buffer添加事件,不够则丢弃同时进行DiscardSummary统计
- dispatcher则从buffer消费日志然后做真正的append
DiscardSummary
private final class DiscardSummary { private LoggingEvent maxEvent; private int count; public DiscardSummary(final LoggingEvent event) { maxEvent = event; count = 1; } public void add(final LoggingEvent event) { if (event.getLevel().toInt() > maxEvent.getLevel().toInt()) { maxEvent = event; } count++; } public LoggingEvent createEvent() { String msg = MessageFormat.format( "Discarded {0} messages due to full event buffer including: {1}", count, maxEvent.getMessage()); return new LoggingEvent( "AsyncAppender.DONT_REPORT_LOCATION", Logger.getLogger(maxEvent.getLoggerName()), maxEvent.getLevel(), msg, null); } }
- 记录LoggingEvent及其丢弃的次数
Dispatcher
private class Dispatcher implements Runnable { private final AsyncAppender parent; private final List<LoggingEvent> buffer; private final Map<String, DiscardSummary> discardMap; private final AppenderPipelineImpl appenderPipeline; public Dispatcher( final AsyncAppender parent, final List<LoggingEvent> buffer, final Map<String, DiscardSummary> discardMap, final AppenderPipelineImpl appenderPipeline) { this.parent = parent; this.buffer = buffer; this.appenderPipeline = appenderPipeline; this.discardMap = discardMap; } public void run() { boolean isActive = true; try { while (isActive) { LoggingEvent[] events = null; synchronized (buffer) { int bufferSize = buffer.size(); isActive = !parent.closed; while ((bufferSize == 0) && isActive) { buffer.wait(); bufferSize = buffer.size(); isActive = !parent.closed; } if (bufferSize > 0) { events = new LoggingEvent[bufferSize + discardMap.size()]; buffer.toArray(events); int index = bufferSize; Collection<DiscardSummary> values = discardMap.values(); for (DiscardSummary value : values) { events[index++] = value.createEvent(); } buffer.clear(); discardMap.clear(); buffer.notifyAll(); } } if (events != null) { for (LoggingEvent event : events) { synchronized (appenderPipeline) { appenderPipeline.appendLoopOnAppenders(event); } } } } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } }
- 这里锁住buffer,然后调用buffer.toArray(events),得到LoggingEvent数组,之后循环遍历调用appenderPipeline.appendLoopOnAppenders(event)
AppenderPipelineImpl
public static class AppenderPipelineImpl implements AppenderPipeline { protected Vector<Appender> appenderList; public void addAppender(Appender newAppender) { if (newAppender == null) { return; } if (appenderList == null) { appenderList = new Vector<Appender>(1); } if (!appenderList.contains(newAppender)) { appenderList.addElement(newAppender); } } public int appendLoopOnAppenders(LoggingEvent event) { int size = 0; Appender appender; if (appenderList != null) { size = appenderList.size(); for (int i = 0; i < size; i++) { appender = appenderList.elementAt(i); appender.doAppend(event); } } return size; } public Enumeration getAllAppenders() { if (appenderList == null) { return null; } else { return appenderList.elements(); } } public Appender getAppender(String name) { if (appenderList == null || name == null) { return null; } int size = appenderList.size(); Appender appender; for (int i = 0; i < size; i++) { appender = appenderList.elementAt(i); if (name.equals(appender.getName())) { return appender; } } return null; } public boolean isAttached(Appender appender) { if (appenderList == null || appender == null) { return false; } int size = appenderList.size(); Appender a; for (int i = 0; i < size; i++) { a = appenderList.elementAt(i); if (a == appender) { return true; } } return false; } public void removeAllAppenders() { if (appenderList != null) { int len = appenderList.size(); for (int i = 0; i < len; i++) { Appender a = appenderList.elementAt(i); a.close(); } appenderList.removeAllElements(); appenderList = null; } } public void removeAppender(Appender appender) { if (appender == null || appenderList == null) { return; } appenderList.removeElement(appender); } public void removeAppender(String name) { if (name == null || appenderList == null) { return; } int size = appenderList.size(); for (int i = 0; i < size; i++) { if (name.equals((appenderList.elementAt(i)).getName())) { appenderList.removeElementAt(i); break; } } } }
- 这里的appendLoopOnAppenders方法,会挨个对appender进行doAppend操作
小结
rocketmq的这个AsyncAppender实现不是很高效,大量地锁住buffer进行append以及appendLoopOnAppenders
doc
相关推荐
IT农场 2020-11-13
LCFlxfldy 2020-08-17
ljcsdn 2020-07-27
LCFlxfldy 2020-07-05
lypgcs 2020-06-27
陈晨软件五千言 2020-06-17
qingyuerji 2020-06-14
MojitoBlogs 2020-06-14
lypgcs 2020-06-14
陈晨软件五千言 2020-06-14
meilongwhpu 2020-06-13
陈晨软件五千言 2020-06-11
qingyuerji 2020-06-09
MojitoBlogs 2020-06-09
meilongwhpu 2020-06-08
meilongwhpu 2020-06-08
lypgcs 2020-06-07
MojitoBlogs 2020-06-04
meilongwhpu 2020-05-30