聊聊flink的NetworkEnvironmentConfiguration

本文主要研究一下flink的NetworkEnvironmentConfiguration

NetworkEnvironmentConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java

public class NetworkEnvironmentConfiguration {

    private final float networkBufFraction;

    private final long networkBufMin;

    private final long networkBufMax;

    private final int networkBufferSize;

    private final IOMode ioMode;

    private final int partitionRequestInitialBackoff;

    private final int partitionRequestMaxBackoff;

    private final int networkBuffersPerChannel;

    private final int floatingNetworkBuffersPerGate;

    private final NettyConfig nettyConfig;

    /**
     * Constructor for a setup with purely local communication (no netty).
     */
    public NetworkEnvironmentConfiguration(
            float networkBufFraction,
            long networkBufMin,
            long networkBufMax,
            int networkBufferSize,
            IOMode ioMode,
            int partitionRequestInitialBackoff,
            int partitionRequestMaxBackoff,
            int networkBuffersPerChannel,
            int floatingNetworkBuffersPerGate) {

        this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
                ioMode,
                partitionRequestInitialBackoff, partitionRequestMaxBackoff,
                networkBuffersPerChannel, floatingNetworkBuffersPerGate,
                null);
        
    }

    public NetworkEnvironmentConfiguration(
            float networkBufFraction,
            long networkBufMin,
            long networkBufMax,
            int networkBufferSize,
            IOMode ioMode,
            int partitionRequestInitialBackoff,
            int partitionRequestMaxBackoff,
            int networkBuffersPerChannel,
            int floatingNetworkBuffersPerGate,
            @Nullable NettyConfig nettyConfig) {

        this.networkBufFraction = networkBufFraction;
        this.networkBufMin = networkBufMin;
        this.networkBufMax = networkBufMax;
        this.networkBufferSize = networkBufferSize;
        this.ioMode = ioMode;
        this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
        this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
        this.networkBuffersPerChannel = networkBuffersPerChannel;
        this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
        this.nettyConfig = nettyConfig;
    }

    // ------------------------------------------------------------------------

    public float networkBufFraction() {
        return networkBufFraction;
    }

    public long networkBufMin() {
        return networkBufMin;
    }

    public long networkBufMax() {
        return networkBufMax;
    }

    public int networkBufferSize() {
        return networkBufferSize;
    }

    public IOMode ioMode() {
        return ioMode;
    }

    public int partitionRequestInitialBackoff() {
        return partitionRequestInitialBackoff;
    }

    public int partitionRequestMaxBackoff() {
        return partitionRequestMaxBackoff;
    }

    public int networkBuffersPerChannel() {
        return networkBuffersPerChannel;
    }

    public int floatingNetworkBuffersPerGate() {
        return floatingNetworkBuffersPerGate;
    }

    public NettyConfig nettyConfig() {
        return nettyConfig;
    }

    // ------------------------------------------------------------------------

    @Override
    public int hashCode() {
        int result = 1;
        result = 31 * result + networkBufferSize;
        result = 31 * result + ioMode.hashCode();
        result = 31 * result + partitionRequestInitialBackoff;
        result = 31 * result + partitionRequestMaxBackoff;
        result = 31 * result + networkBuffersPerChannel;
        result = 31 * result + floatingNetworkBuffersPerGate;
        result = 31 * result + (nettyConfig != null ? nettyConfig.hashCode() : 0);
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        else if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        else {
            final NetworkEnvironmentConfiguration that = (NetworkEnvironmentConfiguration) obj;

            return this.networkBufFraction == that.networkBufFraction &&
                    this.networkBufMin == that.networkBufMin &&
                    this.networkBufMax == that.networkBufMax &&
                    this.networkBufferSize == that.networkBufferSize &&
                    this.partitionRequestInitialBackoff == that.partitionRequestInitialBackoff &&
                    this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
                    this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
                    this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
                    this.ioMode == that.ioMode && 
                    (nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
        }
    }

    @Override
    public String toString() {
        return "NetworkEnvironmentConfiguration{" +
                "networkBufFraction=" + networkBufFraction +
                ", networkBufMin=" + networkBufMin +
                ", networkBufMax=" + networkBufMax +
                ", networkBufferSize=" + networkBufferSize +
                ", ioMode=" + ioMode +
                ", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
                ", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +
                ", networkBuffersPerChannel=" + networkBuffersPerChannel +
                ", floatingNetworkBuffersPerGate=" + floatingNetworkBuffersPerGate +
                ", nettyConfig=" + nettyConfig +
                '}';
    }
}
  • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性

TaskManagerServicesConfiguration

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java

public class TaskManagerServicesConfiguration {

    //......

    /**
     * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}.
     *
     * @param configuration to create the network environment configuration from
     * @param localTaskManagerCommunication true if task manager communication is local
     * @param taskManagerAddress address of the task manager
     * @param slots to start the task manager with
     * @return Network environment configuration
     */
    @SuppressWarnings("deprecation")
    private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration(
        Configuration configuration,
        boolean localTaskManagerCommunication,
        InetAddress taskManagerAddress,
        int slots) throws Exception {

        // ----> hosts / ports for communication and data exchange

        int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT);

        checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(),
            "Leave config parameter empty or use 0 to let the system choose a port automatically.");

        checkConfigParameter(slots >= 1, slots, TaskManagerOptions.NUM_TASK_SLOTS.key(),
            "Number of task slots must be at least one.");

        final int pageSize = checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());

        // check page size of for minimum size
        checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize,
            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
            "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE);

        // check page size for power of two
        checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize,
            TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
            "Memory segment size must be a power of 2.");

        // network buffer memory fraction

        float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
        long networkBufMin = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
        long networkBufMax = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
        checkNetworkBufferConfig(pageSize, networkBufFraction, networkBufMin, networkBufMax);

        // fallback: number of network buffers
        final int numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
        checkNetworkConfigOld(numNetworkBuffers);

        if (!hasNewNetworkBufConf(configuration)) {
            // map old config to new one:
            networkBufMin = networkBufMax = ((long) numNetworkBuffers) * pageSize;
        } else {
            if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
                LOG.info("Ignoring old (but still present) network buffer configuration via {}.",
                    TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
            }
        }

        final NettyConfig nettyConfig;
        if (!localTaskManagerCommunication) {
            final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport);

            nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(),
                taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration);
        } else {
            nettyConfig = null;
        }

        // Default spill I/O mode for intermediate results
        final String syncOrAsync = configuration.getString(
            ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE,
            ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE);

        final IOManager.IOMode ioMode;
        if (syncOrAsync.equals("async")) {
            ioMode = IOManager.IOMode.ASYNC;
        } else {
            ioMode = IOManager.IOMode.SYNC;
        }

        int initialRequestBackoff = configuration.getInteger(
            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
        int maxRequestBackoff = configuration.getInteger(
            TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);

        int buffersPerChannel = configuration.getInteger(
            TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
        int extraBuffersPerGate = configuration.getInteger(
            TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);

        return new NetworkEnvironmentConfiguration(
            networkBufFraction,
            networkBufMin,
            networkBufMax,
            pageSize,
            ioMode,
            initialRequestBackoff,
            maxRequestBackoff,
            buffersPerChannel,
            extraBuffersPerGate,
            nettyConfig);
    }

    //......
}
  • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置

TaskManagerOptions

flink-1.7.2/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java

@PublicEvolving
public class TaskManagerOptions {
    //......

    /**
     * Size of memory buffers used by the network stack and the memory manager.
     */
    public static final ConfigOption<String> MEMORY_SEGMENT_SIZE =
            key("taskmanager.memory.segment-size")
            .defaultValue("32kb")
            .withDescription("Size of memory buffers used by the network stack and the memory manager.");

    /**
     * Fraction of JVM memory to use for network buffers.
     */
    public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
            key("taskmanager.network.memory.fraction")
            .defaultValue(0.1f)
            .withDescription("Fraction of JVM memory to use for network buffers. This determines how many streaming" +
                " data exchange channels a TaskManager can have at the same time and how well buffered the channels" +
                " are. If a job is rejected or you get a warning that the system has not enough buffers available," +
                " increase this value or the min/max values below. Also note, that \"taskmanager.network.memory.min\"" +
                "` and \"taskmanager.network.memory.max\" may override this fraction.");

    /**
     * Minimum memory size for network buffers.
     */
    public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MIN =
            key("taskmanager.network.memory.min")
            .defaultValue("64mb")
            .withDescription("Minimum memory size for network buffers.");

    /**
     * Maximum memory size for network buffers.
     */
    public static final ConfigOption<String> NETWORK_BUFFERS_MEMORY_MAX =
            key("taskmanager.network.memory.max")
            .defaultValue("1gb")
            .withDescription("Maximum memory size for network buffers.");

    /**
     * Number of buffers used in the network stack. This defines the number of possible tasks and
     * shuffles.
     *
     * @deprecated use {@link #NETWORK_BUFFERS_MEMORY_FRACTION}, {@link #NETWORK_BUFFERS_MEMORY_MIN},
     * and {@link #NETWORK_BUFFERS_MEMORY_MAX} instead
     */
    @Deprecated
    public static final ConfigOption<Integer> NETWORK_NUM_BUFFERS =
            key("taskmanager.network.numberOfBuffers")
            .defaultValue(2048);

    /**
     * Minimum backoff for partition requests of input channels.
     */
    public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
            key("taskmanager.network.request-backoff.initial")
            .defaultValue(100)
            .withDeprecatedKeys("taskmanager.net.request-backoff.initial")
            .withDescription("Minimum backoff in milliseconds for partition requests of input channels.");

    /**
     * Maximum backoff for partition requests of input channels.
     */
    public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
            key("taskmanager.network.request-backoff.max")
            .defaultValue(10000)
            .withDeprecatedKeys("taskmanager.net.request-backoff.max")
            .withDescription("Maximum backoff in milliseconds for partition requests of input channels.");

    /**
     * Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).
     *
     * <p>Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization.
     */
    public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
            key("taskmanager.network.memory.buffers-per-channel")
            .defaultValue(2)
            .withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
                "In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
                " configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
                " for parallel serialization.");

    /**
     * Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate).
     */
    public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
            key("taskmanager.network.memory.floating-buffers-per-gate")
            .defaultValue(8)
            .withDescription("Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate)." +
                " In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels." +
                " The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can" +
                " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" +
                " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.");                                                                                                        
    //......
}
  • taskmanager.memory.segment-size指定memory segment的大小,默认为32kb;taskmanager.network.memory.fraction指定network buffers使用的memory的比例,默认为0.1;taskmanager.network.memory.min指定network buffers使用的最小内存,默认为64mb;taskmanager.network.memory.max指定network buffers使用的最大内存,默认为1gb;taskmanager.network.numberOfBuffers指定network使用的buffers数量,默认为2048,该配置已经被废弃,使用taskmanager.network.memory.fraction、taskmanager.network.memory.min、taskmanager.network.memory.max这几个配置来替代
  • taskmanager.network.request-backoff.initial指定input channels的partition requests的最小backoff时间(毫秒),默认为100;taskmanager.network.request-backoff.max指定input channels的partition requests的最大backoff时间(毫秒),默认为10000
  • taskmanager.network.memory.buffers-per-channel指定每个outgoing/incoming channel使用buffers数量,默认为2;taskmanager.network.memory.floating-buffers-per-gate指定每个outgoing/incoming gate使用buffers数量,默认为8

NettyConfig

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java

public class NettyConfig {

    private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);

    // - Config keys ----------------------------------------------------------

    public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions
            .key("taskmanager.network.netty.num-arenas")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.num-arenas")
            .withDescription("The number of Netty arenas.");

    public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
            .key("taskmanager.network.netty.server.numThreads")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.server.numThreads")
            .withDescription("The number of Netty server threads.");

    public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions
            .key("taskmanager.network.netty.client.numThreads")
            .defaultValue(-1)
            .withDeprecatedKeys("taskmanager.net.client.numThreads")
            .withDescription("The number of Netty client threads.");

    public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
            .key("taskmanager.network.netty.server.backlog")
            .defaultValue(0) // default: 0 => Netty's default
            .withDeprecatedKeys("taskmanager.net.server.backlog")
            .withDescription("The netty server connection backlog.");

    public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
            .key("taskmanager.network.netty.client.connectTimeoutSec")
            .defaultValue(120) // default: 120s = 2min
            .withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
            .withDescription("The Netty client connection timeout.");

    public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
            .key("taskmanager.network.netty.sendReceiveBufferSize")
            .defaultValue(0) // default: 0 => Netty's default
            .withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize")
            .withDescription("The Netty send and receive buffer size. This defaults to the system buffer size" +
                " (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.");

    public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
            .key("taskmanager.network.netty.transport")
            .defaultValue("nio")
            .withDeprecatedKeys("taskmanager.net.transport")
            .withDescription("The Netty transport type, either \"nio\" or \"epoll\"");

    // ------------------------------------------------------------------------

    enum TransportType {
        NIO, EPOLL, AUTO
    }

    static final String SERVER_THREAD_GROUP_NAME = "Flink Netty Server";

    static final String CLIENT_THREAD_GROUP_NAME = "Flink Netty Client";

    private final InetAddress serverAddress;

    private final int serverPort;

    private final int memorySegmentSize;

    private final int numberOfSlots;

    private final Configuration config; // optional configuration

    public NettyConfig(
            InetAddress serverAddress,
            int serverPort,
            int memorySegmentSize,
            int numberOfSlots,
            Configuration config) {

        this.serverAddress = checkNotNull(serverAddress);

        checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid port number.");
        this.serverPort = serverPort;

        checkArgument(memorySegmentSize > 0, "Invalid memory segment size.");
        this.memorySegmentSize = memorySegmentSize;

        checkArgument(numberOfSlots > 0, "Number of slots");
        this.numberOfSlots = numberOfSlots;

        this.config = checkNotNull(config);

        LOG.info(this.toString());
    }

    InetAddress getServerAddress() {
        return serverAddress;
    }

    int getServerPort() {
        return serverPort;
    }

    int getMemorySegmentSize() {
        return memorySegmentSize;
    }

    public int getNumberOfSlots() {
        return numberOfSlots;
    }

    // ------------------------------------------------------------------------
    // Getters
    // ------------------------------------------------------------------------

    public int getServerConnectBacklog() {
        return config.getInteger(CONNECT_BACKLOG);
    }

    public int getNumberOfArenas() {
        // default: number of slots
        final int configValue = config.getInteger(NUM_ARENAS);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getServerNumThreads() {
        // default: number of task slots
        final int configValue = config.getInteger(NUM_THREADS_SERVER);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getClientNumThreads() {
        // default: number of task slots
        final int configValue = config.getInteger(NUM_THREADS_CLIENT);
        return configValue == -1 ? numberOfSlots : configValue;
    }

    public int getClientConnectTimeoutSeconds() {
        return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
    }

    public int getSendAndReceiveBufferSize() {
        return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
    }

    public TransportType getTransportType() {
        String transport = config.getString(TRANSPORT_TYPE);

        switch (transport) {
            case "nio":
                return TransportType.NIO;
            case "epoll":
                return TransportType.EPOLL;
            default:
                return TransportType.AUTO;
        }
    }

    @Nullable
    public SSLHandlerFactory createClientSSLEngineFactory() throws Exception {
        return getSSLEnabled() ?
                SSLUtils.createInternalClientSSLEngineFactory(config) :
                null;
    }

    @Nullable
    public SSLHandlerFactory createServerSSLEngineFactory() throws Exception {
        return getSSLEnabled() ?
                SSLUtils.createInternalServerSSLEngineFactory(config) :
                null;
    }

    public boolean getSSLEnabled() {
        return config.getBoolean(TaskManagerOptions.DATA_SSL_ENABLED)
            && SSLUtils.isInternalSSLEnabled(config);
    }

    public boolean isCreditBasedEnabled() {
        return config.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
    }

    public Configuration getConfig() {
        return config;
    }

    @Override
    public String toString() {
        String format = "NettyConfig [" +
                "server address: %s, " +
                "server port: %d, " +
                "ssl enabled: %s, " +
                "memory segment size (bytes): %d, " +
                "transport type: %s, " +
                "number of server threads: %d (%s), " +
                "number of client threads: %d (%s), " +
                "server connect backlog: %d (%s), " +
                "client connect timeout (sec): %d, " +
                "send/receive buffer size (bytes): %d (%s)]";

        String def = "use Netty's default";
        String man = "manual";

        return String.format(format, serverAddress, serverPort, getSSLEnabled() ? "true" : "false",
                memorySegmentSize, getTransportType(), getServerNumThreads(),
                getServerNumThreads() == 0 ? def : man,
                getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
                getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
                getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
                getSendAndReceiveBufferSize() == 0 ? def : man);
    }
}
  • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置
  • taskmanager.network.netty.server.backlog用于指定netty server的connection backlog,默认值为0即使用netty默认的配置;taskmanager.network.netty.client.connectTimeoutSec指定netty client的connection timeout,默认为120(单位秒);taskmanager.network.netty.sendReceiveBufferSize指定netty send/receive buffer大小,默认为0即使用netty的默认配置,默认是使用system buffer size,即/proc/sys/net/ipv4/tcp_[rw]mem的配置;taskmanager.network.netty.transport指定的是netty transport的类型,默认是nio
  • taskmanager.network.netty.num-arenas指定的是netty arenas的数量,默认为-1;taskmanager.network.netty.server.numThreads指定的是netty server的threads数量,默认为-1;taskmanager.network.netty.client.numThreads指定的是netty client的threads数量,默认为-1;这几个配置当配置值为-1的时候,对应get方法返回的是numberOfSlots值

小结

  • NetworkEnvironmentConfiguration主要是flink network的相关配置,里头有networkBufFraction、networkBufMin、networkBufMax、networkBufferSize、ioMode、partitionRequestInitialBackoff、partitionRequestMaxBackoff、networkBuffersPerChannel、floatingNetworkBuffersPerGate、nettyConfig属性
  • TaskManagerServicesConfiguration有个私有方法parseNetworkEnvironmentConfiguration,用于创建NetworkEnvironmentConfiguration;它会读取TaskManagerOptions.MEMORY_SEGMENT_SIZE、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN、TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX、TaskManagerOptions.NETWORK_NUM_BUFFERS、ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL、TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX、TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL、TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE等配置
  • NettyConfig的构造器接收serverAddress、serverPort、memorySegmentSize、numberOfSlots、config这几个参数;它还提供了getServerConnectBacklog、getNumberOfArenas、getServerNumThreads、getClientNumThreads、getClientConnectTimeoutSeconds、getSendAndReceiveBufferSize、getTransportType等方法用于从config读取配置

doc

相关推荐