聊聊flink的TimeCharacteristic

本文主要研究一下flink的TimeCharacteristic

TimeCharacteristic

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java

/**
 * The time characteristic defines how the system determines time for time-dependent
 * order and operations that depend on time (such as time windows).
 */
@PublicEvolving
public enum TimeCharacteristic {

    /**
     * Processing time for operators means that the operator uses the system clock of the machine
     * to determine the current time of the data stream. Processing-time windows trigger based
     * on wall-clock time and include whatever elements happen to have arrived at the operator at
     * that point in time.
     *
     * <p>Using processing time for window operations results in general in quite non-deterministic
     * results, because the contents of the windows depends on the speed in which elements arrive.
     * It is, however, the cheapest method of forming windows and the method that introduces the
     * least latency.
     */
    ProcessingTime,

    /**
     * Ingestion time means that the time of each individual element in the stream is determined
     * when the element enters the Flink streaming data flow. Operations like windows group the
     * elements based on that time, meaning that processing speed within the streaming dataflow
     * does not affect windowing, but only the speed at which sources receive elements.
     *
     * <p>Ingestion time is often a good compromise between processing time and event time.
     * It does not need and special manual form of watermark generation, and events are typically
     * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
     * only be introduced by streaming shuffles or split/join/union operations. The fact that
     * elements are not very much out-of-order means that the latency increase is moderate,
     * compared to event
     * time.
     */
    IngestionTime,

    /**
     * Event time means that the time of each individual element in the stream (also called event)
     * is determined by the event's individual custom timestamp. These timestamps either exist in
     * the elements from before they entered the Flink streaming dataflow, or are user-assigned at
     * the sources. The big implication of this is that it allows for elements to arrive in the
     * sources and in all operators out of order, meaning that elements with earlier timestamps may
     * arrive after elements with later timestamps.
     *
     * <p>Operators that window or order data with respect to event time must buffer data until they
     * can be sure that all timestamps for a certain time interval have been received. This is
     * handled by the so called "time watermarks".
     *
     * <p>Operations based on event time are very predictable - the result of windowing operations
     * is typically identical no matter when the window is executed and how fast the streams
     * operate. At the same time, the buffering and tracking of event time is also costlier than
     * operating with processing time, and typically also introduces more latency. The amount of
     * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
     * time span between the arrival of early and late elements is. With respect to the
     * "time watermarks", this means that the cost typically depends on how early or late the
     * watermarks can be generated for their timestamp.
     *
     * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
     * event's original time, rather than the time assigned at the data source. Practically, that
     * means that event time has generally more meaning, but also that it takes longer to determine
     * that all elements for a certain time have arrived.
     */
    EventTime
}
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
  • IngestionTime是以数据进入flink streaming data flow的时间为准
  • EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段

区别

聊聊flink的TimeCharacteristic

各个时间的区别如上图

实例

public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());

        popularPlaces.print();

        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }

    /**
     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
     */
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

        public TaxiRideTSExtractor() {
            super(Time.seconds(MAX_EVENT_DELAY));
        }

        @Override
        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            }
            else {
                return ride.endTime.getMillis();
            }
        }
    }
  • 这里消费kafka的时候setStreamTimeCharacteristic为TimeCharacteristic.EventTime,同时assignTimestampsAndWatermarks指定为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor,这里的extractTimestamp根据ride的start与否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),来自定义了eventTime

小结

  • flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、IngestionTime、EventTime
  • ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;IngestionTime是以数据进入flink streaming data flow的时间为准;EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
  • 指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定

doc

相关推荐