聊聊Elasticsearch的RunOnce
序
本文主要研究一下Elasticsearch的RunOnce
RunOnce
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/RunOnce.java
public class RunOnce implements Runnable { private final Runnable delegate; private final AtomicBoolean hasRun; public RunOnce(final Runnable delegate) { this.delegate = Objects.requireNonNull(delegate); this.hasRun = new AtomicBoolean(false); } @Override public void run() { if (hasRun.compareAndSet(false, true)) { delegate.run(); } } /** * {@code true} if the {@link RunOnce} has been executed once. */ public boolean hasRun() { return hasRun.get(); } }
- RunOnce实现了Runnable接口,它的构造器要求输入Runnable,同时构造了hasRun变量;run方法会先使用compareAndSet将hasRun由false设置为true,如果成功则执行代理的Runnable的run方法;hasRun方法则返回hasRun值
实例
elasticsearch-7.0.1/server/src/test/java/org/elasticsearch/common/util/concurrent/RunOnceTests.java
public class RunOnceTests extends ESTestCase { public void testRunOnce() { final AtomicInteger counter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(counter::incrementAndGet); assertFalse(runOnce.hasRun()); runOnce.run(); assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); runOnce.run(); assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); } public void testRunOnceConcurrently() throws InterruptedException { final AtomicInteger counter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(counter::incrementAndGet); final Thread[] threads = new Thread[between(3, 10)]; final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(() -> { try { latch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } runOnce.run(); }); threads[i].start(); } latch.countDown(); for (Thread thread : threads) { thread.join(); } assertTrue(runOnce.hasRun()); assertEquals(1, counter.get()); } public void testRunOnceWithAbstractRunnable() { final AtomicInteger onRun = new AtomicInteger(0); final AtomicInteger onFailure = new AtomicInteger(0); final AtomicInteger onAfter = new AtomicInteger(0); final RunOnce runOnce = new RunOnce(new AbstractRunnable() { @Override protected void doRun() throws Exception { onRun.incrementAndGet(); throw new RuntimeException("failure"); } @Override public void onFailure(Exception e) { onFailure.incrementAndGet(); } @Override public void onAfter() { onAfter.incrementAndGet(); } }); final int iterations = randomIntBetween(1, 10); for (int i = 0; i < iterations; i++) { runOnce.run(); assertEquals(1, onRun.get()); assertEquals(1, onFailure.get()); assertEquals(1, onAfter.get()); assertTrue(runOnce.hasRun()); } } }
- testRunOnce方法验证了顺序多次执行runOnce的场景;testRunOnceConcurrently方法则验证了并发多次执行runOnce的场景;testRunOnceWithAbstractRunnable则验证了使用AbstractRunnable作为runnable的场景
小结
RunOnce实现了Runnable接口,它的构造器要求输入Runnable,同时构造了hasRun变量;run方法会先使用compareAndSet将hasRun由false设置为true,如果成功则执行代理的Runnable的run方法;hasRun方法则返回hasRun值
doc
相关推荐
newbornzhao 2020-09-14
做对一件事很重要 2020-09-07
renjinlong 2020-09-03
明瞳 2020-08-19
李玉志 2020-08-19
mengyue 2020-08-07
molong0 2020-08-06
AFei00 2020-08-03
molong0 2020-08-03
wenwentana 2020-08-03
YYDU 2020-08-03
另外一部分,则需要先做聚类、分类处理,将聚合出的分类结果存入ES集群的聚类索引中。数据处理层的聚合结果存入ES中的指定索引,同时将每个聚合主题相关的数据存入每个document下面的某个field下。
sifeimeng 2020-08-03
心丨悦 2020-08-03
liangwenrong 2020-07-31
sifeimeng 2020-08-01
mengyue 2020-07-30
tigercn 2020-07-29
IceStreamLab 2020-07-29