上游文档:
- flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 生成 watermark》学习笔记
- Flink|《Flink 官方文档 - 应用开发 - DataStream API - 事件时间 - 内置 Watermark 生成器》学习笔记
- Flink|《Flink 官方文档 - 概念透析 - 及时流处理》学习笔记
Watermark
Watermark 是在各个算子生成的、用于标记当前数据流事件时间的对象。当 Watermark 到达后,就意味着该数据流原则上将 不会 再到达比 Watermark 的事件时间更小的消息,即在 Watermark 后到达的事件时间更小的消息视作延迟消息。
首先,让我们来看一下 Watermark
类的源码。
源码:org.apache.flink.api.***mon.eventtime.Watermark
【Github】
package org.apache.flink.api.***mon.eventtime;
import org.apache.flink.annotation.Public;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
@Public
public final class Watermark implements Serializable {
private static final long serialVersionUID = 1L;
/** Thread local formatter for stringifying the timestamps. */
private static final ThreadLocal<SimpleDateFormat> TS_FORMATTER =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"));
// ------------------------------------------------------------------------
/** The watermark that signifies end-of-event-time. */
public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
// ------------------------------------------------------------------------
/** The timestamp of the watermark in milliseconds. */
private final long timestamp;
/** Creates a new watermark with the given timestamp in milliseconds. */
public Watermark(long timestamp) {
this.timestamp = timestamp;
}
/** Returns the timestamp associated with this Watermark. */
public long getTimestamp() {
return timestamp;
}
/**
* Formats the timestamp of this watermark, assuming it is a millisecond timestamp. The returned
* format is "yyyy-MM-dd HH:mm:ss.SSS".
*/
public String getFormattedTimestamp() {
return TS_FORMATTER.get().format(new Date(timestamp));
}
// ------------------------------------------------------------------------
@Override
public boolean equals(Object o) {
return this == o
|| o != null
&& o.getClass() == Watermark.class
&& ((Watermark) o).timestamp == this.timestamp;
}
@Override
public int hashCode() {
return Long.hashCode(timestamp);
}
@Override
public String toString() {
return "Watermark @ " + timestamp + " (" + getFormattedTimestamp() + ')';
}
}
可以看到,Watermark
类主要就是用来存储当前 watermark 的毫秒级时间戳,具体地:
- 使用时间戳构造实例化
Watermark
对象,Watermark
对象在实例化后,不能修改其存储的时间戳 - 提供
long getTimestamp()
和String getFormattedTimestamp()
两种查询Watermark
对象时间戳的方法
WatermarkGenerator
接着,我们来看 watermark 的生成接口 WatermarkGenerator
的源码。
源码:org.apache.flink.api.***mon.eventtime.WatermarkGenerator
【Github】
package org.apache.flink.api.***mon.eventtime;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.***mon.ExecutionConfig;
/**
* The {@code WatermarkGenerator} generates watermarks either based on events or periodically (in a
* fixed interval).
*
* <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the {@code
* AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.
*/
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the event
* timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated depends on {@link
* ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
WatermarkGernator
接口,既可以基于消息,也可以基于周期。WatermarkGenerator
接口有两个方法:
-
void onEvent(T event, long eventTimestamp, WatermarkOutput output)
:这个方法会在每个消息到达时被调用一次,其参数event
为消息本身,参数eventTimestamp
为消息的事件时间,output
为接收生成的 watermark 的对象。 -
void (WatermarkOutput output)
:这个方法会被周期性地调用,其参数output
为接收生成的 watermark 的对象。
在实现 WatermarkGenerator
接口时,既可以在 onEvent
方法中生成 watermark,也可以在 onPeriodicEmit
方法中生成 watermark。因此,基于 WatermarkGenerator
接口,可以实现 标记生成 或 周期性生成 两种 watermark 生成器。
下面,我们来看 Flink 内置的几个 watermark 生成器。
Flink 内置的 watermark 生成器
在这里,我们仅介绍 Flink 的 flink-core
项目中如下内置的 watermark 生成器:
-
NoWatermarksGenerator
:不生成 watermark 的生成器 -
BoundedOutOfOrdernessWatermarks
:固定延迟时间的周期性 watermark 生成器 -
AscendingTimestampsWatermarks
:零延迟时间的周期性 watermark 生成器
不生成 watermark 的生成器:NoWatermarksGenerator
最简单的,不生成任何 watermark 的生成器。在实现上,在 onEvent
方法和 onPeriodicEmit
方法中均不生成 watermark。
源码:org.apache.flink.api.***mon.eventtime.NoWatermarksGenerator
【Github】
package org.apache.flink.api.***mon.eventtime;
import org.apache.flink.annotation.Public;
@Public
public final class NoWatermarksGenerator<E> implements WatermarkGenerator<E> {
@Override
public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {}
@Override
public void onPeriodicEmit(WatermarkOutput output) {}
}
固定延迟时间的周期性 watermark 生成器:BoundedOutOfOrdernessWatermarks
当输入数据流中消息的事件时间不完全有序,但是对于绝大部分元素,滞后时间通常不会超过一个固定的时间长度时,我们可以通过在当前最大事件时间的基础上减去一个固定延迟时间,来生成 watermark。Flink 内置的 watermark 生成器 BoundedOutOfOrdernessWatermarks
实现了这种功能。
源码:org.apache.flink.api.***mon.eventtime.BoundedOutOfOrdernessWatermarks
【Github】
package org.apache.flink.api.***mon.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
可以看到,在 BoundedOutOfOrdernessWatermarks
类中:
- 使用固定的延迟时间
maxOutOfOrderness
来实例化 - 使用示例属性
maxTimestamp
存储当前所有消息的最大事件时间,当每个消息到达时,onEvent
方法被调用,并更新maxTimestamp
属性 - 周期性地生成 watermark,当
onPeriodicEmit
方法被周期性地调用时,会根据当前的最大事件时间以及固定延迟时间来生成 watermark
零延迟时间的周期性 watermark 生成器:AscendingTimestampsWatermarks
当数据源中消息的事件时间单调递增时,当前事件时间(同时也是最大事件时间)就可以充当 watermark,因为后续到达的消息的事件时间一定不会比当前事件时间小。例如,当只读取一个 Kafka 分区,并使用 Kafka 的消息时间戳作为事件时间时,则可以保证事件时间的单调递增。
此时的 watermark 生成规则,就相当于是延迟为 0 的 “固定延迟时间的周期性生成器”。Flink 内置的 watermark 生成器 AscendingTimestampsWatermarks
实现了这个功能。
源码:org.apache.flink.api.***mon.eventtime.AscendingTimestampsWatermarks
【Github】
package org.apache.flink.api.***mon.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/** Creates a new watermark generator with for ascending timestamps. */
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
在实现上,AscendingTimestampsWatermarks
继承了 BoundedOutOfOrdernessWatermarks
,并将延迟指定为 0。