聊聊flink的ConnectionManager
序
本文主要研究一下flink的ConnectionManager
ConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
public interface ConnectionManager { void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException; /** * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}. */ PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException; /** * Closes opened ChannelConnections in case of a resource release. */ void closeOpenChannelConnections(ConnectionID connectionId); int getNumberOfActiveConnections(); int getDataPort(); void shutdown() throws IOException; }
- ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager
LocalConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
public class LocalConnectionManager implements ConnectionManager { @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) { } @Override public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) { return null; } @Override public void closeOpenChannelConnections(ConnectionID connectionId) {} @Override public int getNumberOfActiveConnections() { return 0; } @Override public int getDataPort() { return -1; } @Override public void shutdown() {} }
- LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作
NettyConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
public class NettyConnectionManager implements ConnectionManager { private final NettyServer server; private final NettyClient client; private final NettyBufferPool bufferPool; private final PartitionRequestClientFactory partitionRequestClientFactory; public NettyConnectionManager(NettyConfig nettyConfig) { this.server = new NettyServer(nettyConfig); this.client = new NettyClient(nettyConfig); this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas()); this.partitionRequestClientFactory = new PartitionRequestClientFactory(client); } @Override public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { NettyProtocol partitionRequestProtocol = new NettyProtocol( partitionProvider, taskEventDispatcher, client.getConfig().isCreditBasedEnabled()); client.init(partitionRequestProtocol, bufferPool); server.init(partitionRequestProtocol, bufferPool); } @Override public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { return partitionRequestClientFactory.createPartitionRequestClient(connectionId); } @Override public void closeOpenChannelConnections(ConnectionID connectionId) { partitionRequestClientFactory.closeOpenChannelConnections(connectionId); } @Override public int getNumberOfActiveConnections() { return partitionRequestClientFactory.getNumberOfActiveClients(); } @Override public int getDataPort() { if (server != null && server.getLocalAddress() != null) { return server.getLocalAddress().getPort(); } else { return -1; } } @Override public void shutdown() { client.shutdown(); server.shutdown(); } NettyClient getClient() { return client; } NettyServer getServer() { return server; } NettyBufferPool getBufferPool() { return bufferPool; } }
- NettyConnectionManager实现了ConnectionManager接口;它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory
- start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer;shutdown方法则关闭NettyClient、NettyServer;closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId
- createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient
PartitionRequestClientFactory
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
class PartitionRequestClientFactory { private final NettyClient nettyClient; private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>(); PartitionRequestClientFactory(NettyClient nettyClient) { this.nettyClient = nettyClient; } /** * Atomically establishes a TCP connection to the given remote address and * creates a {@link PartitionRequestClient} instance for this connection. */ PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException { Object entry; PartitionRequestClient client = null; while (client == null) { entry = clients.get(connectionId); if (entry != null) { // Existing channel or connecting channel if (entry instanceof PartitionRequestClient) { client = (PartitionRequestClient) entry; } else { ConnectingChannel future = (ConnectingChannel) entry; client = future.waitForChannel(); clients.replace(connectionId, future, client); } } else { // No channel yet. Create one, but watch out for a race. // We create a "connecting future" and atomically add it to the map. // Only the thread that really added it establishes the channel. // The others need to wait on that original establisher's future. ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this); Object old = clients.putIfAbsent(connectionId, connectingChannel); if (old == null) { nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel); client = connectingChannel.waitForChannel(); clients.replace(connectionId, connectingChannel, client); } else if (old instanceof ConnectingChannel) { client = ((ConnectingChannel) old).waitForChannel(); clients.replace(connectionId, old, client); } else { client = (PartitionRequestClient) old; } } // Make sure to increment the reference count before handing a client // out to ensure correct bookkeeping for channel closing. if (!client.incrementReferenceCounter()) { destroyPartitionRequestClient(connectionId, client); client = null; } } return client; } public void closeOpenChannelConnections(ConnectionID connectionId) { Object entry = clients.get(connectionId); if (entry instanceof ConnectingChannel) { ConnectingChannel channel = (ConnectingChannel) entry; if (channel.dispose()) { clients.remove(connectionId, channel); } } } int getNumberOfActiveClients() { return clients.size(); } /** * Removes the client for the given {@link ConnectionID}. */ void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) { clients.remove(connectionId, client); } //...... }
- PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系
- createPartitionRequestClient方法会先从ConcurrentHashMap查找是否有对应ConnectionID的PartitionRequestClient或ConnectingChannel,如果存在且是PartitionRequestClient实例则返回,如果存在且是ConnectingChannel实例则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换对应ConnectionID在ConcurrentHashMap的值为PartitionRequestClient;如果ConcurrentHashMap没有对应ConnectionID的值,则会创建一个ConnectingChannel,然后放入到ConcurrentHashMap中,同时获取old object,如果old为null,则使用nettyClient.connect进行连接,然后获取PartitionRequestClient,之后替换ConcurrentHashMap中的值;如果old是ConnectingChannel则调用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替换ConcurrentHashMap中的值;在返回PartitionRequestClient之前会通过client.incrementReferenceCounter()来递增引用,如果递增不成功则调用destroyPartitionRequestClient,返回null,递增成功则返回PartitionRequestClient
- closeOpenChannelConnections方法则判断,如果是ConnectingChannel,则调用ConnectingChannel.dispose,成功之后从ConcurrentHashMap中移除
ConnectingChannel
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
private static final class ConnectingChannel implements ChannelFutureListener { private final Object connectLock = new Object(); private final ConnectionID connectionId; private final PartitionRequestClientFactory clientFactory; private boolean disposeRequestClient = false; public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) { this.connectionId = connectionId; this.clientFactory = clientFactory; } private boolean dispose() { boolean result; synchronized (connectLock) { if (partitionRequestClient != null) { result = partitionRequestClient.disposeIfNotUsed(); } else { disposeRequestClient = true; result = true; } connectLock.notifyAll(); } return result; } private void handInChannel(Channel channel) { synchronized (connectLock) { try { NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class); partitionRequestClient = new PartitionRequestClient( channel, clientHandler, connectionId, clientFactory); if (disposeRequestClient) { partitionRequestClient.disposeIfNotUsed(); } connectLock.notifyAll(); } catch (Throwable t) { notifyOfError(t); } } } private volatile PartitionRequestClient partitionRequestClient; private volatile Throwable error; private PartitionRequestClient waitForChannel() throws IOException, InterruptedException { synchronized (connectLock) { while (error == null && partitionRequestClient == null) { connectLock.wait(2000); } } if (error != null) { throw new IOException("Connecting the channel failed: " + error.getMessage(), error); } return partitionRequestClient; } private void notifyOfError(Throwable error) { synchronized (connectLock) { this.error = error; connectLock.notifyAll(); } } @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { handInChannel(future.channel()); } else if (future.cause() != null) { notifyOfError(new RemoteTransportException( "Connecting to remote task manager + '" + connectionId.getAddress() + "' has failed. This might indicate that the remote task " + "manager has been lost.", connectionId.getAddress(), future.cause())); } else { notifyOfError(new LocalTransportException( String.format( "Connecting to remote task manager '%s' has been cancelled.", connectionId.getAddress()), null)); } } }
- ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回
小结
- ConnectionManager定义了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有两个子类,一个是LocalConnectionManager,一个是NettyConnectionManager
- LocalConnectionManager实现了ConnectionManager接口,不过它的实现基本是空操作;NettyConnectionManager实现了ConnectionManager接口,它的构造器使用NettyConfig创建了NettyServer、NettyClient、NettyBufferPool,同时使用NettyClient创建了PartitionRequestClientFactory,start方法创建了NettyProtocol,同时初始化NettyClient、NettyServer,shutdown方法则关闭NettyClient、NettyServer,closeOpenChannelConnections则是使用partitionRequestClientFactory.closeOpenChannelConnections来关闭指定的connectionId,createPartitionRequestClient方法通过partitionRequestClientFactory.createPartitionRequestClient来创建PartitionRequestClient
- PartitionRequestClientFactory的构造器需要一个NettyClient;它使用ConcurrentHashMap在内存维护了一个ConnectionID与PartitionRequestClient或ConnectingChannel的映射关系;ConnectingChannel实现了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的时候会调用handInChannel方法,该方法会创建PartitionRequestClient;waitForChannel方法则会等待partitionRequestClient创建成功然后返回
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11