flink-watermark
- - ITeye博客 当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计. 模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.
一.背景
当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计。这里的延迟统计分为两种:
模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计。我们希望还是能统计到
二.基本代码
@Data
public class UserTimeInfo implements Serializable {
private String userId;
/** 实际时间-偏移量 偏移后的时间*/
private Timestamp pTime;
public UserTimeInfo() {
}
public UserTimeInfo(String userId, Timestamp pTime) {
this.userId = userId;
this.pTime = pTime;
}
}
public class UserTimeSource implements SourceFunction<UserTimeInfo> {
/**
* 为了id 统计方便,我们只留一个id
*/
static String[] userIds = {"id->"};
Random random = new Random();
/**
* 模拟发送20次
*/
int times = 20;
@Override
public void run(SourceContext sc) throws Exception {
while (true) {
TimeUnit.SECONDS.sleep(1);
int m = (int) (System.currentTimeMillis() % userIds.length);
// 随机延迟几秒
int defTime = random.nextInt(5);
// 发送时间
DateTime dateTime = new DateTime();
// 计算延迟后的时间,并且打印时间
DateTime dateTimePrint = dateTime.plusSeconds(-defTime);
System.out.println("实际时间:" + print(dateTime) + ",延迟:" + defTime + ":-->" + print(dateTimePrint));
// 发送延迟时间
dateTime = dateTime.plusSeconds(-defTime);
sc.collect(new UserTimeInfo(userIds[m], new Timestamp(dateTime.getMillis())));
// 只持续固定时间方便观察
if (--times == 0) {
break;
}
}
}
@Override
public void cancel() {
System.out.println("cancel to do ...");
}
private static String print(DateTime dateTime) {
return dateTime.toString("yyyy-MM-dd hh:mm:ss");
}
}
三.定义我们的两种watermark
a. 基于系统时间
/**
* 这里逻辑,模拟按系统时间进行统计
* 所有数据和系统自身时间有关
*/
public class UserTimeWaterMarkBySystem implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
/**
* 默认允许 5秒延迟
*/
long maxDelayTime = 5000;
/**
* 该时间由于基于系统时间来做,
* 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15
* a.根据系统时间 想减,小于5秒就会统计到
* b.注意,如果程序挂了,12点重启消费这个数据,就统计不到了
* @return
*/
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis() - maxDelayTime);
}
@Override
public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
long timestamp = element.getPTime().getTime();
return timestamp;
}
}
b.根据数据自生时间进行做延迟判断
public class UserTimeWaterMarkByRowTime implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
/**
* 默认允许 5秒延迟
*/
long maxDelayTime = 5000;
/**
* 该时间由于基于数据时间来做,
* 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15
* a.根据系统时间 想减,小于5秒就会统计到
* b.只要消息 时间延迟小于5 就能被统计。
* 这种对点击事件来说,更符合要求
* @return
*/
private long currentMaxTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxDelayTime);
}
@Override
public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
long timestamp = element.getPTime().getTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
}
四.source 类,和以前一样
public class UserTimeWaterMarkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());
// UserTimeWaterMarkByRowTime 这个时间可以替换
DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" +
" GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
// deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
.flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
out.collect(value.f1);
}).returns(Row.class);
// add sink or print
allClick.print();
env.execute("test");
}
}
public class UserTimeWaterMarkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());
DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" +
" GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
// deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
.flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
out.collect(value.f1);
}).returns(Row.class);
// add sink or print
allClick.print();
env.execute("test");
}
小结:
1.这个是基于flink 1.7 跑的
2.代码比较简单,也好理解,有问题直接私信我