聊聊Elasticsearch的MonitorService
序
本文主要研究一下Elasticsearch的MonitorService
MonitorService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/MonitorService.java
public class MonitorService extends AbstractLifecycleComponent { private final JvmGcMonitorService jvmGcMonitorService; private final OsService osService; private final ProcessService processService; private final JvmService jvmService; private final FsService fsService; public MonitorService(Settings settings, NodeEnvironment nodeEnvironment, ThreadPool threadPool, ClusterInfoService clusterInfoService) throws IOException { this.jvmGcMonitorService = new JvmGcMonitorService(settings, threadPool); this.osService = new OsService(settings); this.processService = new ProcessService(settings); this.jvmService = new JvmService(settings); this.fsService = new FsService(settings, nodeEnvironment, clusterInfoService); } public OsService osService() { return this.osService; } public ProcessService processService() { return this.processService; } public JvmService jvmService() { return this.jvmService; } public FsService fsService() { return this.fsService; } @Override protected void doStart() { jvmGcMonitorService.start(); } @Override protected void doStop() { jvmGcMonitorService.stop(); } @Override protected void doClose() { jvmGcMonitorService.close(); } }
- MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法
JvmGcMonitorService
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java
public class JvmGcMonitorService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(JvmGcMonitorService.class); private final ThreadPool threadPool; private final boolean enabled; private final TimeValue interval; private final Map<String, GcThreshold> gcThresholds; private final GcOverheadThreshold gcOverheadThreshold; private volatile Cancellable scheduledFuture; public static final Setting<Boolean> ENABLED_SETTING = Setting.boolSetting("monitor.jvm.gc.enabled", true, Property.NodeScope); public static final Setting<TimeValue> REFRESH_INTERVAL_SETTING = Setting.timeSetting("monitor.jvm.gc.refresh_interval", TimeValue.timeValueSeconds(1), TimeValue.timeValueSeconds(1), Property.NodeScope); private static String GC_COLLECTOR_PREFIX = "monitor.jvm.gc.collector."; public static final Setting<Settings> GC_SETTING = Setting.groupSetting(GC_COLLECTOR_PREFIX, Property.NodeScope); public static final Setting<Integer> GC_OVERHEAD_WARN_SETTING = Setting.intSetting("monitor.jvm.gc.overhead.warn", 50, 0, 100, Property.NodeScope); public static final Setting<Integer> GC_OVERHEAD_INFO_SETTING = Setting.intSetting("monitor.jvm.gc.overhead.info", 25, 0, 100, Property.NodeScope); public static final Setting<Integer> GC_OVERHEAD_DEBUG_SETTING = Setting.intSetting("monitor.jvm.gc.overhead.debug", 10, 0, 100, Property.NodeScope); //...... public JvmGcMonitorService(Settings settings, ThreadPool threadPool) { this.threadPool = threadPool; this.enabled = ENABLED_SETTING.get(settings); this.interval = REFRESH_INTERVAL_SETTING.get(settings); Map<String, GcThreshold> gcThresholds = new HashMap<>(); Map<String, Settings> gcThresholdGroups = GC_SETTING.get(settings).getAsGroups(); for (Map.Entry<String, Settings> entry : gcThresholdGroups.entrySet()) { String name = entry.getKey(); TimeValue warn = getValidThreshold(entry.getValue(), entry.getKey(), "warn"); TimeValue info = getValidThreshold(entry.getValue(), entry.getKey(), "info"); TimeValue debug = getValidThreshold(entry.getValue(), entry.getKey(), "debug"); gcThresholds.put(name, new GcThreshold(name, warn.millis(), info.millis(), debug.millis())); } gcThresholds.putIfAbsent(GcNames.YOUNG, new GcThreshold(GcNames.YOUNG, 1000, 700, 400)); gcThresholds.putIfAbsent(GcNames.OLD, new GcThreshold(GcNames.OLD, 10000, 5000, 2000)); gcThresholds.putIfAbsent("default", new GcThreshold("default", 10000, 5000, 2000)); this.gcThresholds = unmodifiableMap(gcThresholds); if (GC_OVERHEAD_WARN_SETTING.get(settings) <= GC_OVERHEAD_INFO_SETTING.get(settings)) { final String message = String.format( Locale.ROOT, "[%s] must be greater than [%s] [%d] but was [%d]", GC_OVERHEAD_WARN_SETTING.getKey(), GC_OVERHEAD_INFO_SETTING.getKey(), GC_OVERHEAD_INFO_SETTING.get(settings), GC_OVERHEAD_WARN_SETTING.get(settings)); throw new IllegalArgumentException(message); } if (GC_OVERHEAD_INFO_SETTING.get(settings) <= GC_OVERHEAD_DEBUG_SETTING.get(settings)) { final String message = String.format( Locale.ROOT, "[%s] must be greater than [%s] [%d] but was [%d]", GC_OVERHEAD_INFO_SETTING.getKey(), GC_OVERHEAD_DEBUG_SETTING.getKey(), GC_OVERHEAD_DEBUG_SETTING.get(settings), GC_OVERHEAD_INFO_SETTING.get(settings)); throw new IllegalArgumentException(message); } this.gcOverheadThreshold = new GcOverheadThreshold( GC_OVERHEAD_WARN_SETTING.get(settings), GC_OVERHEAD_INFO_SETTING.get(settings), GC_OVERHEAD_DEBUG_SETTING.get(settings)); logger.debug( "enabled [{}], interval [{}], gc_threshold [{}], overhead [{}, {}, {}]", this.enabled, this.interval, this.gcThresholds, this.gcOverheadThreshold.warnThreshold, this.gcOverheadThreshold.infoThreshold, this.gcOverheadThreshold.debugThreshold); } @Override protected void doStart() { if (!enabled) { return; } scheduledFuture = threadPool.scheduleWithFixedDelay(new JvmMonitor(gcThresholds, gcOverheadThreshold) { @Override void onMonitorFailure(Exception e) { logger.debug("failed to monitor", e); } @Override void onSlowGc(final Threshold threshold, final long seq, final SlowGcEvent slowGcEvent) { logSlowGc(logger, threshold, seq, slowGcEvent, JvmGcMonitorService::buildPools); } @Override void onGcOverhead(final Threshold threshold, final long current, final long elapsed, final long seq) { logGcOverhead(logger, threshold, current, elapsed, seq); } }, interval, Names.SAME); } @Override protected void doStop() { if (!enabled) { return; } scheduledFuture.cancel(); } @Override protected void doClose() { } //...... }
- JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务
JvmMonitor
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java
abstract static class JvmMonitor implements Runnable { enum Threshold { DEBUG, INFO, WARN } static class SlowGcEvent { final GarbageCollector currentGc; final long collectionCount; final TimeValue collectionTime; final long elapsed; final JvmStats lastJvmStats; final JvmStats currentJvmStats; final ByteSizeValue maxHeapUsed; SlowGcEvent( final GarbageCollector currentGc, final long collectionCount, final TimeValue collectionTime, final long elapsed, final JvmStats lastJvmStats, final JvmStats currentJvmStats, final ByteSizeValue maxHeapUsed) { this.currentGc = currentGc; this.collectionCount = collectionCount; this.collectionTime = collectionTime; this.elapsed = elapsed; this.lastJvmStats = lastJvmStats; this.currentJvmStats = currentJvmStats; this.maxHeapUsed = maxHeapUsed; } } private long lastTime = now(); private JvmStats lastJvmStats = jvmStats(); private long seq = 0; private final Map<String, JvmGcMonitorService.GcThreshold> gcThresholds; final GcOverheadThreshold gcOverheadThreshold; JvmMonitor(final Map<String, GcThreshold> gcThresholds, final GcOverheadThreshold gcOverheadThreshold) { this.gcThresholds = Objects.requireNonNull(gcThresholds); this.gcOverheadThreshold = Objects.requireNonNull(gcOverheadThreshold); } @Override public void run() { try { monitorGc(); } catch (Exception e) { onMonitorFailure(e); } } abstract void onMonitorFailure(Exception e); synchronized void monitorGc() { seq++; final long currentTime = now(); JvmStats currentJvmStats = jvmStats(); final long elapsed = TimeUnit.NANOSECONDS.toMillis(currentTime - lastTime); monitorSlowGc(currentJvmStats, elapsed); monitorGcOverhead(currentJvmStats, elapsed); lastTime = currentTime; lastJvmStats = currentJvmStats; } final void monitorSlowGc(JvmStats currentJvmStats, long elapsed) { for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) { GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i]; GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i]; // no collection has happened long collections = gc.getCollectionCount() - prevGc.getCollectionCount(); if (collections == 0) { continue; } long collectionTime = gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis(); if (collectionTime == 0) { continue; } GcThreshold gcThreshold = gcThresholds.get(gc.getName()); if (gcThreshold == null) { gcThreshold = gcThresholds.get("default"); } long avgCollectionTime = collectionTime / collections; Threshold threshold = null; if (avgCollectionTime > gcThreshold.warnThreshold) { threshold = Threshold.WARN; } else if (avgCollectionTime > gcThreshold.infoThreshold) { threshold = Threshold.INFO; } else if (avgCollectionTime > gcThreshold.debugThreshold) { threshold = Threshold.DEBUG; } if (threshold != null) { onSlowGc(threshold, seq, new SlowGcEvent( gc, collections, TimeValue.timeValueMillis(collectionTime), elapsed, lastJvmStats, currentJvmStats, JvmInfo.jvmInfo().getMem().getHeapMax())); } } } final void monitorGcOverhead(final JvmStats currentJvmStats, final long elapsed) { long current = 0; for (int i = 0; i < currentJvmStats.getGc().getCollectors().length; i++) { GarbageCollector gc = currentJvmStats.getGc().getCollectors()[i]; GarbageCollector prevGc = lastJvmStats.getGc().getCollectors()[i]; current += gc.getCollectionTime().millis() - prevGc.getCollectionTime().millis(); } checkGcOverhead(current, elapsed, seq); } void checkGcOverhead(final long current, final long elapsed, final long seq) { final int fraction = (int) ((100 * current) / (double) elapsed); Threshold overheadThreshold = null; if (fraction >= gcOverheadThreshold.warnThreshold) { overheadThreshold = Threshold.WARN; } else if (fraction >= gcOverheadThreshold.infoThreshold) { overheadThreshold = Threshold.INFO; } else if (fraction >= gcOverheadThreshold.debugThreshold) { overheadThreshold = Threshold.DEBUG; } if (overheadThreshold != null) { onGcOverhead(overheadThreshold, current, elapsed, seq); } } JvmStats jvmStats() { return JvmStats.jvmStats(); } long now() { return System.nanoTime(); } abstract void onSlowGc(Threshold threshold, long seq, SlowGcEvent slowGcEvent); abstract void onGcOverhead(Threshold threshold, long total, long elapsed, long seq); }
- JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现
- monitorGc方法首先获取currentJvmStats,然后执行monitorSlowGc及monitorGcOverhead
- monitorSlowGc方法主要是计算avgCollectionTime,然后判断是否超出指定level的阈值,超出则回调onSlowGc方法;monitorGcOverhead方法主要是计算gc耗时占比,如果判断是否超过指定level的阈值,超出则回调onGcOverhead方法
小结
- MonitorService的构造器创建了jvmGcMonitorService、osService、processService、jvmService、fsService;其doStart、doStop、doClose分别调用了jvmGcMonitorService的start、stop、close方法
- JvmGcMonitorService的doStart方法通过scheduleWithFixedDelay注册了JvmMonitor的定时任务;其doStop方法则是cancel掉该定时任务
- JvmMonitor实现了Runnable接口,其run方法执行monitorGc方法,异常时执行onMonitorFailure方法;JvmMonitor还定义了onMonitorFailure、onSlowGc、onGcOverhead抽象方法需要子类去实现
doc
相关推荐
newbornzhao 2020-09-14
做对一件事很重要 2020-09-07
renjinlong 2020-09-03
明瞳 2020-08-19
李玉志 2020-08-19
mengyue 2020-08-07
molong0 2020-08-06
AFei00 2020-08-03
molong0 2020-08-03
wenwentana 2020-08-03
YYDU 2020-08-03
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。
sifeimeng 2020-08-03
心丨悦 2020-08-03
liangwenrong 2020-07-31
sifeimeng 2020-08-01
mengyue 2020-07-30
tigercn 2020-07-29
IceStreamLab 2020-07-29