聊聊flink的FencedAkkaInvocationHandler
序
本文主要研究一下flink的FencedAkkaInvocationHandler
FencedRpcGateway
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedRpcGateway.java
public interface FencedRpcGateway<F extends Serializable> extends RpcGateway { /** * Get the current fencing token. * * @return current fencing token */ F getFencingToken(); }
- FencedRpcGateway接口继承了RpcGateway接口,它定义一个泛型F,即为fencing token的泛型
FencedMainThreadExecutable
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FencedMainThreadExecutable.java
public interface FencedMainThreadExecutable extends MainThreadExecutable { /** * Run the given runnable in the main thread without attaching a fencing token. * * @param runnable to run in the main thread without validating the fencing token. */ void runAsyncWithoutFencing(Runnable runnable); /** * Run the given callable in the main thread without attaching a fencing token. * * @param callable to run in the main thread without validating the fencing token. * @param timeout for the operation * @param <V> type of the callable result * @return Future containing the callable result */ <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout); }
- FencedMainThreadExecutable接口继承了MainThreadExecutable,它定义了runAsyncWithoutFencing、callAsyncWithoutFencing方法用于运行unfenced runnable或者unfenced callable,之所以这样定义主要是因为FencedMainThreadExecutable继承了MainThreadExecutable,因而MainThreadExecutable里头定义的runAsync、callAsync、scheduleRunAsync方法的语义就变成是Fenced
FencedAkkaInvocationHandler
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/FencedAkkaInvocationHandler.java
public class FencedAkkaInvocationHandler<F extends Serializable> extends AkkaInvocationHandler implements FencedMainThreadExecutable, FencedRpcGateway<F> { private final Supplier<F> fencingTokenSupplier; public FencedAkkaInvocationHandler( String address, String hostname, ActorRef rpcEndpoint, Time timeout, long maximumFramesize, @Nullable CompletableFuture<Void> terminationFuture, Supplier<F> fencingTokenSupplier) { super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture); this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class<?> declaringClass = method.getDeclaringClass(); if (declaringClass.equals(FencedMainThreadExecutable.class) || declaringClass.equals(FencedRpcGateway.class)) { return method.invoke(this, args); } else { return super.invoke(proxy, method, args); } } @Override public void runAsyncWithoutFencing(Runnable runnable) { checkNotNull(runnable, "runnable"); if (isLocal) { getActorRef().tell( new UnfencedMessage<>(new RunAsync(runnable, 0L)), ActorRef.noSender()); } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + getActorRef().path() + ". This is not supported."); } } @Override public <V> CompletableFuture<V> callAsyncWithoutFencing(Callable<V> callable, Time timeout) { checkNotNull(callable, "callable"); checkNotNull(timeout, "timeout"); if (isLocal) { @SuppressWarnings("unchecked") CompletableFuture<V> resultFuture = (CompletableFuture<V>) FutureUtils.toJava( Patterns.ask( getActorRef(), new UnfencedMessage<>(new CallAsync(callable)), timeout.toMilliseconds())); return resultFuture; } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + getActorRef().path() + ". This is not supported."); } } @Override public void tell(Object message) { super.tell(fenceMessage(message)); } @Override public CompletableFuture<?> ask(Object message, Time timeout) { return super.ask(fenceMessage(message), timeout); } @Override public F getFencingToken() { return fencingTokenSupplier.get(); } private <P> FencedMessage<F, P> fenceMessage(P message) { if (isLocal) { return new LocalFencedMessage<>(fencingTokenSupplier.get(), message); } else { if (message instanceof Serializable) { @SuppressWarnings("unchecked") FencedMessage<F, P> result = (FencedMessage<F, P>) new RemoteFencedMessage<>(fencingTokenSupplier.get(), (Serializable) message); return result; } else { throw new RuntimeException("Trying to send a non-serializable message " + message + " to a remote " + "RpcEndpoint. Please make sure that the message implements java.io.Serializable."); } } } }
- FencedAkkaInvocationHandler继承了AkkaInvocationHandler,实现了FencedMainThreadExecutable、FencedRpcGateway接口;runAsyncWithoutFencing、callAsyncWithoutFencing发送的均为UnfencedMessage
- FencedAkkaInvocationHandler的invoke方法针对FencedRpcGateway、FencedMainThreadExecutable的方法则对当前对象进行对应方法调用,其他的就转为调用父类的invoke方法
- 父类的runAsync、scheduleRunAsync、callAsync最后调用的是tell或者ask方法,而FencedAkkaInvocationHandler覆盖了父类的tell及ask方法,将runAsync、scheduleRunAsync、callAsync方法的语义变为Fenced;这里的tell及ask方法通过fenceMessage方法构造FencedMessage,而fenceMessage方法通过getFencingToken方法获取fencing token;getFencingToken方法调用的是fencingTokenSupplier.get(),fencingTokenSupplier由构造器传入
UnfencedMessage
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/UnfencedMessage.java
public class UnfencedMessage<P> { private final P payload; public UnfencedMessage(P payload) { this.payload = Preconditions.checkNotNull(payload); } public P getPayload() { return payload; } @Override public String toString() { return "UnfencedMessage(" + payload + ')'; } }
- UnfencedMessage即不需要fencing token的message
FencedMessage
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/FencedMessage.java
public interface FencedMessage<F extends Serializable, P> { F getFencingToken(); P getPayload(); }
- FencedMessage接口定义了getFencingToken及getPayload方法;它有两个子类,分别是LocalFencedMessage及RemoteFencedMessage
LocalFencedMessage
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/LocalFencedMessage.java
public class LocalFencedMessage<F extends Serializable, P> implements FencedMessage<F, P> { private final F fencingToken; private final P payload; public LocalFencedMessage(@Nullable F fencingToken, P payload) { this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } @Override public F getFencingToken() { return fencingToken; } @Override public P getPayload() { return payload; } @Override public String toString() { return "LocalFencedMessage(" + fencingToken + ", " + payload + ')'; } }
- LocalFencedMessage实现了FencedMessage接口,其中fencingToken的类型要求实现Serializable接口,它有fencingToken及payload两个属性
RemoteFencedMessage
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/messages/RemoteFencedMessage.java
public class RemoteFencedMessage<F extends Serializable, P extends Serializable> implements FencedMessage<F, P>, Serializable { private static final long serialVersionUID = 4043136067468477742L; private final F fencingToken; private final P payload; public RemoteFencedMessage(@Nullable F fencingToken, P payload) { this.fencingToken = fencingToken; this.payload = Preconditions.checkNotNull(payload); } @Override public F getFencingToken() { return fencingToken; } @Override public P getPayload() { return payload; } @Override public String toString() { return "RemoteFencedMessage(" + fencingToken + ", " + payload + ')'; } }
- RemoteFencedMessage实现了FencedMessage及Serializable接口,同时payload的类型也要求实现Serializable接口,它有fencingToken及payload两个属性
小结
- FencedRpcGateway接口继承了RpcGateway接口,它定义一个泛型F,即为fencing token的泛型;FencedMainThreadExecutable接口继承了MainThreadExecutable,它定义了runAsyncWithoutFencing、callAsyncWithoutFencing方法用于运行unfenced runnable或者unfenced callable
- FencedAkkaInvocationHandler继承了AkkaInvocationHandler,实现了FencedMainThreadExecutable、FencedRpcGateway接口;runAsyncWithoutFencing、callAsyncWithoutFencing发送的均为UnfencedMessage;父类的runAsync、scheduleRunAsync、callAsync最后调用的是tell或者ask方法,而FencedAkkaInvocationHandler覆盖了父类的tell及ask方法,将runAsync、scheduleRunAsync、callAsync方法的语义变为Fenced;这里的tell及ask方法通过fenceMessage方法构造FencedMessage
- UnfencedMessage即不需要fencing token的message;FencedMessage接口定义了getFencingToken及getPayload方法;它有两个子类,分别是LocalFencedMessage及RemoteFencedMessage;LocalFencedMessage与RemoteFencedMessage的区别在于RemoteFencedMessage实现了Serializable接口,同时payload的类型也要求实现Serializable接口