flink-watermark

标签: flink watermark | 发表时间:2019-03-08 17:33 | 作者:
出处:https://www.iteye.com

一.背景

     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计。这里的延迟统计分为两种:

       模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计。我们希望还是能统计到

       

二.基本代码

@Data
public class UserTimeInfo implements Serializable {

    private String userId;
    /** 实际时间-偏移量 偏移后的时间*/
    private Timestamp pTime;
    public UserTimeInfo() {
    }
    public UserTimeInfo(String userId, Timestamp pTime) {
        this.userId = userId;
        this.pTime = pTime;
    }
}

 

public class UserTimeSource implements SourceFunction<UserTimeInfo> {


    /**
     * 为了id 统计方便,我们只留一个id
     */
    static String[] userIds = {"id->"};
    Random random = new Random();
    /**
     * 模拟发送20次
     */
    int times = 20;

    @Override
    public void run(SourceContext sc) throws Exception {
        while (true) {
            TimeUnit.SECONDS.sleep(1);
            int m = (int) (System.currentTimeMillis() % userIds.length);
            // 随机延迟几秒
            int defTime = random.nextInt(5);
            // 发送时间
            DateTime dateTime = new DateTime();
            // 计算延迟后的时间,并且打印时间
            DateTime dateTimePrint = dateTime.plusSeconds(-defTime);
            System.out.println("实际时间:" + print(dateTime) + ",延迟:" + defTime + ":-->" + print(dateTimePrint));
            // 发送延迟时间
            dateTime = dateTime.plusSeconds(-defTime);
            sc.collect(new UserTimeInfo(userIds[m], new Timestamp(dateTime.getMillis())));
            // 只持续固定时间方便观察
            if (--times == 0) {
                break;
            }
        }
    }

    @Override
    public void cancel() {
        System.out.println("cancel to do ...");
    }

    private static String print(DateTime dateTime) {
        return dateTime.toString("yyyy-MM-dd hh:mm:ss");
    }
}

 

三.定义我们的两种watermark

    a. 基于系统时间 

   

/**
 * 这里逻辑,模拟按系统时间进行统计
 * 所有数据和系统自身时间有关
 */
public class UserTimeWaterMarkBySystem implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;
    /**
     * 该时间由于基于系统时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15  
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.注意,如果程序挂了,12点重启消费这个数据,就统计不到了
     * @return
     */
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(System.currentTimeMillis() - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        return timestamp;
    }
}

 

  b.根据数据自生时间进行做延迟判断

   

public class UserTimeWaterMarkByRowTime implements AssignerWithPeriodicWatermarks<UserTimeInfo> {
    /**
     * 默认允许 5秒延迟
     */
    long maxDelayTime = 5000;

    /**
     * 该时间由于基于数据时间来做,
     * 如果10:00 11:10 秒用户点击的数据,然后延迟,实际收到的时间是10.00 11:15
     * a.根据系统时间 想减,小于5秒就会统计到
     * b.只要消息 时间延迟小于5 就能被统计。 
     * 这种对点击事件来说,更符合要求
     * @return
     */
    private long currentMaxTimestamp;
    
    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp - maxDelayTime);
    }
    @Override
    public long extractTimestamp(UserTimeInfo element, long previousElementTimestamp) {
        long timestamp = element.getPTime().getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
}

 

四.source 类,和以前一样

  

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());
        //  UserTimeWaterMarkByRowTime 这个时间可以替换
        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

}

 

 

public class UserTimeWaterMarkApp {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<UserTimeInfo> userInfoDataStream = env.addSource(new UserTimeSource());

        DataStream<UserTimeInfo> timedData = userInfoDataStream.assignTimestampsAndWatermarks(new UserTimeWaterMarkByRowTime());
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("test", timedData, "userId,pTime.rowtime");
        Table result = tableEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM  test" +
                " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId ");
        // deal with (Tuple2<Boolean, Row> value) -> out.collect(row)
        SingleOutputStreamOperator allClick = tableEnv.toRetractStream(result, Row.class)
                .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> {
                    out.collect(value.f1);
                }).returns(Row.class);
        // add sink or print
        allClick.print();
        env.execute("test");
    }

 

小结:

   1.这个是基于flink 1.7 跑的

   2.代码比较简单,也好理解,有问题直接私信我 

 

 

 



已有 0 人发表留言,猛击->> 这里<<-参与讨论


ITeye推荐



相关 [flink watermark] 推荐:

flink-watermark

- - ITeye博客
     当我们统计用户点击的时候,有时候会因为各种情况数据延迟,我们需要一个允许最大的延迟范围进行统计.        模拟初始数据:早上10:00 11.10 用户点击了一次,但是延迟到10:00 11.15 才发送过来,允许最大延迟5秒, 5秒窗口统计. /** 实际时间-偏移量 偏移后的时间*/.

一文精通 Flink on YARN

- - IT瘾-dev
本文主要是讲解flink on yarn的部署过程,然后yarn-session的基本原理,如何启动多个yarn-session的话如何部署应用到指定的yarn-session上,然后是用户jar的管理配置及故障恢复相关的参数. flink on yarn的整个交互过程图,如下:. 要使得flink运行于yarn上,flink要能找到hadoop配置,因为要连接到yarn的resourcemanager和hdfs.

Flink SQL 编程实践

- - Jark's Blog
注: 本教程实践基于 Ververica 开源的. sql-training 项目. 基于 Flink 1.7.2. 本文将通过五个实例来贯穿 Flink SQL 的编程实践,主要会涵盖以下几个方面的内容. 如何使用 SQL CLI 客户端. 如何在流上运行 SQL 查询. 运行 window aggregate 与 non-window aggregate,理解其区别.

谈谈 Flink Shuffle 演进

- - 时间与精神的小屋
在分布式计算中,Shuffle 是非常关键但常常容易被忽视的一环. 比如著名的 MapReduce 的命名跳过 Shuffle ,只包含其前后的 Map 跟 Reduce. 背后原因一方面是 Shuffle 是底层框架在做的事情,用户基本不会感知到其存在,另一方面是 Shuffle 听起来似乎是比较边缘的基础服务.

Flink 1.16:Hive SQL 如何平迁到 Flink SQL

- - Jark's Blog
Hive SQL 迁移的动机. Flink 已经是流计算的事实标准,当前国内外做实时计算或流计算一般都会选择 Flink 和 Flink SQL. 另外,Flink 也是是家喻户晓的流批一体大数据计算引擎. 然而,目前 Flink 也面临着挑战. 比如虽然现在大规模应用都以流计算为主,但 Flink 批计算的应用并不广泛,想要进一步推动真正意义上的流批一体落地,需要推动业界更多地落地 Flink 批计算,需要更积极地拥抱现有的离线生态.

Flink Kafka Connector与Exactly Once剖析

- - SegmentFault 最新的文章
Flink Kafa Connector是Flink内置的Kafka连接器,它包含了从Kafka Topic读入数据的 Flink Kafka Consumer以及向Kafka Topic写出数据的 Flink Kafka Producer,除此之外Flink Kafa Connector基于Flink Checkpoint机制提供了完善的容错能力.

Flink在唯品会的实践

- - DockOne.io
唯品会自2017年开始基于Kubernetes深入打造高性能、稳定、可靠、易用的实时计算平台,支持唯品会内部业务在平时以及大促的平稳运行. 现平台支持Flink、Spark、Storm等主流框架. 本文主要分享Flink的容器化实践应用以及产品化经验. 平台支持公司内部所有部门的实时计算应用. 主要的业务包括实时大屏,推荐,实验平台,实时监控和实时数据清洗等.

使用 Kubernetes 部署 Flink 应用

- - 张吉的博客
Kubernetes 是目前非常流行的容器编排系统,在其之上可以运行 Web 服务、大数据处理等各类应用. 这些应用被打包在一个个非常轻量的容器中,我们通过声明的方式来告知 Kubernetes 要如何部署和扩容这些程序,并对外提供服务. Flink 同样是非常流行的分布式处理框架,它也可以运行在 Kubernetes 之上.

Flink CDC 核心:Debezium 1.9.0.Beta1 发布!

- - IT瘾-dev
我很高兴地宣布 Debezium  1.9.0.Beta1的发布. 此版本包括 Debezium Server 的许多新功能,包括 Knative Eventing 支持和使用 Redis 接收器的偏移存储管理、SQL Server 连接器的多分区缩放以及各种错误修复和改进. 总体而言,此版本已修复56 个问题.

Flink CDC 高频面试 13 问

- - IT瘾-dev
大家好,今天分享一篇土哥的文章. Flink cdc 2.1.1 发布后,更新了很多新功能以及知识点,今天为大家全面总结了 CDC 的知识点如 无锁算法及面试高频考点. 2 Flink cdc 2.1.1 新增内容. 获取本文文档,直接在公众号后台回复: CDC,加土哥微信,领取 Flink CDC 2.2.1 总结文档.