聊聊flink的TimeCharacteristic
序
本文主要研究一下flink的TimeCharacteristic
TimeCharacteristic
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.java
/** * The time characteristic defines how the system determines time for time-dependent * order and operations that depend on time (such as time windows). */ @PublicEvolving public enum TimeCharacteristic { /** * Processing time for operators means that the operator uses the system clock of the machine * to determine the current time of the data stream. Processing-time windows trigger based * on wall-clock time and include whatever elements happen to have arrived at the operator at * that point in time. * * <p>Using processing time for window operations results in general in quite non-deterministic * results, because the contents of the windows depends on the speed in which elements arrive. * It is, however, the cheapest method of forming windows and the method that introduces the * least latency. */ ProcessingTime, /** * Ingestion time means that the time of each individual element in the stream is determined * when the element enters the Flink streaming data flow. Operations like windows group the * elements based on that time, meaning that processing speed within the streaming dataflow * does not affect windowing, but only the speed at which sources receive elements. * * <p>Ingestion time is often a good compromise between processing time and event time. * It does not need and special manual form of watermark generation, and events are typically * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can * only be introduced by streaming shuffles or split/join/union operations. The fact that * elements are not very much out-of-order means that the latency increase is moderate, * compared to event * time. */ IngestionTime, /** * Event time means that the time of each individual element in the stream (also called event) * is determined by the event's individual custom timestamp. These timestamps either exist in * the elements from before they entered the Flink streaming dataflow, or are user-assigned at * the sources. The big implication of this is that it allows for elements to arrive in the * sources and in all operators out of order, meaning that elements with earlier timestamps may * arrive after elements with later timestamps. * * <p>Operators that window or order data with respect to event time must buffer data until they * can be sure that all timestamps for a certain time interval have been received. This is * handled by the so called "time watermarks". * * <p>Operations based on event time are very predictable - the result of windowing operations * is typically identical no matter when the window is executed and how fast the streams * operate. At the same time, the buffering and tracking of event time is also costlier than * operating with processing time, and typically also introduces more latency. The amount of * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the * time span between the arrival of early and late elements is. With respect to the * "time watermarks", this means that the cost typically depends on how early or late the * watermarks can be generated for their timestamp. * * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the * event's original time, rather than the time assigned at the data source. Practically, that * means that event time has generally more meaning, but also that it takes longer to determine * that all elements for a certain time have arrived. */ EventTime }
- ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间
- IngestionTime是以数据进入flink streaming data flow的时间为准
- EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
区别
各个时间的区别如上图
实例
public static void main(String[] args) throws Exception { final int popThreshold = 20; // threshold for popular places // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP); // always read the Kafka topic from the start kafkaProps.setProperty("auto.offset.reset", "earliest"); // create a Kafka consumer FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>( "cleansedRides", new TaxiRideSchema(), kafkaProps); // assign a timestamp extractor to the consumer consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor()); // create a TaxiRide data stream DataStream<TaxiRide> rides = env.addSource(consumer); // find popular places DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides // match ride to grid cell and event type (start or end) .map(new GridCellMatcher()) // partition by cell id and event type .keyBy(0, 1) // build sliding window .timeWindow(Time.minutes(15), Time.minutes(5)) // count ride events in window .apply(new RideCounter()) // filter by popularity threshold .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold)) // map grid cell to coordinates .map(new GridToCoordinates()); popularPlaces.print(); // execute the transformation pipeline env.execute("Popular Places from Kafka"); } /** * Assigns timestamps to TaxiRide records. * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted. */ public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> { public TaxiRideTSExtractor() { super(Time.seconds(MAX_EVENT_DELAY)); } @Override public long extractTimestamp(TaxiRide ride) { if (ride.isStart) { return ride.startTime.getMillis(); } else { return ride.endTime.getMillis(); } } }
- 这里消费kafka的时候setStreamTimeCharacteristic为TimeCharacteristic.EventTime,同时assignTimestampsAndWatermarks指定为TaxiRideTSExtractor,它继承了BoundedOutOfOrdernessTimestampExtractor,这里的extractTimestamp根据ride的start与否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),来自定义了eventTime
小结
- flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、IngestionTime、EventTime
- ProcessingTime是以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;IngestionTime是以数据进入flink streaming data flow的时间为准;EventTime是以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段
- 指定为EventTime的source需要自己定义event time以及emit watermark,或者在source之外通过assignTimestampsAndWatermarks在程序手工指定
doc
相关推荐
raidtest 2020-10-09
匆匆那些年 2020-06-27
oXiaoChong 2020-06-20
yuchuanchen 2020-06-16
Spark高级玩法 2020-06-14
Leonwey 2020-06-11
Spark高级玩法 2020-06-09
文报 2020-06-09
xorxos 2020-06-07
xiaoyutongxue 2020-05-27
yuchuanchen 2020-05-27
阿尼古 2020-05-26
千慧 2020-05-18
yuchuanchen 2020-05-17
yuchuanchen 2020-05-16
Spark高级玩法 2020-05-11
yuchuanchen 2020-05-11