维度数据实时关联的实践(w/ Flink、Vert.x & Guava Cache) - 简书

标签: | 发表时间:2020-06-07 21:30 | 作者:
出处:https://www.jianshu.com

Data Enrichment

在流式处理作业(特别是实时数仓ETL作业)中,我们的数据流可以视为无界事实表,其中往往缺乏一些维度信息。例如,对于埋点日志流而言,为了减少传输冗余,可能只会带有城市ID、商品ID等,如果要映射到对应的名称,就需要与外部存储中的维度表进行关联。这里的外部存储一般是指适合OLTP场景的数据库,如MySQL、Redis、HBase等。

英文语境里习惯将上述操作称为data enrichment。下图展示出了Trackunit公司的实时IoT处理架构,比较有代表性。注意图中的"Enrich"字样。

https://www.ververica.com/blog/trackunit-leverages-flink-industrial-iot

实时关联维度数据的思路主要有如下4种。

  1. 全量预加载+定时刷新:适用于规模较小的缓慢变化维度(SCD),思路最简单,可以参见笔者之前写的 示例

  2. 实时查询+缓存刷新:适用于规模较大的缓慢变化维度(SCD),在数仓维度建模过程中,这种维度最为常见,本文接下来会详细叙述其实现方式。

  3. 纯实时查询:适用于快速变化维度(RCD),或者对关联时效性要求极高的场合,需特别注意频繁请求对外部存储的压力。

  4. 流式化维度:比较特殊且灵活,将维度表的change log转化为流,从而把静态表的关联转化为双流join。从change log解析出的维度数据可以写入状态存储,起到缓存的作用。之后再提。

上述4种思路并没有绝对的好坏之分,而是需要根据业务特点和需求来取舍。

下面介绍用Flink异步I/O、Vert.x JDBC Client和Guava Cache实现的实时查询+缓存刷新方案。

Flink Async I/O

Flink的异步I/O专门用来解决Flink计算过程中与外部系统的交互问题。在默认情况下,算子向外部系统发出请求后即阻塞,等待结果返回才能发送下一个请求,可能会造成较大的延迟,吞吐量下降。有了异步I/O之后,就可以并发地发出请求和接收响应,延迟大大降低。下图来自 官方文档,一看便知。

关于它的细节,看官可以参考之前的 《聊聊Flink异步I/O机制的原理》一文,不再废话。

Vert.x JDBC Client

Vert.x是一个由Eclipse基金会开源的跨语言、事件驱动的异步应用程序框架,运行在JVM平台上,底层依赖于Netty。Vert.x的异步应用场景极为广泛,如Web、数据库访问、响应式编程、微服务、MQTT、认证与鉴权、消息队列、事件总线等等,详情可以参见 官方文档

本文采用的维度表数据源是MySQL,而Java原生的JDBC机制是同步的,要与Flink异步I/O一同使用的话,按传统方式需要自己创建连接池、线程池并实现异步化。我们引入 Vert.x JDBC Client模块来简化之,先加入依赖项。

    <dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>5.1.47</version>
</dependency>

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-jdbc-client</artifactId>
  <version>3.8.5</version>
</dependency>

通过VertxOptions指定事件循环线程池和工作线程池的大小,然后指定JDBC连接的各项参数(注意c3p0的连接池大小 max_pool_size),并创建异步的SQL客户端实例。

    Properties dbProps = ParameterUtil.getFromResourceFile("mysql.properties");

Vertx vertx = Vertx.vertx(
  new VertxOptions()
    .setWorkerPoolSize(10)
    .setEventLoopPoolSize(5)
);

JsonObject config = new JsonObject()
  .put("url", dbProps.getProperty("mysql.sht.url"))
  .put("driver_class", "com.mysql.jdbc.Driver")
  .put("max_pool_size", 20)
  .put("user", dbProps.getProperty("mysql.sht.user"))
  .put("password", dbProps.getProperty("mysql.sht.pass"));

SQLClient sqlClient = JDBCClient.createShared(vertx, config);

按如下的异步风格获取连接、执行查询、处理查询结果,并关闭连接。借助Lambda表达式,可以将回调写得相对优雅一些。

    sqlClient.getConnection(connResult -> {
  if (connResult.failed()) {
    LOGGER.error("Cannot get MySQL connection via Vertx JDBC client ", connResult.cause());
    return;
  }

  SQLConnection conn = connResult.result();
  String sql = "/* SQL statement here */";

  conn.query(sql, queryResult -> {
    if (queryResult.failed()) {
      LOGGER.error("Error executing SQL query: {}", sql, queryResult.cause());
      return;
    }

    ResultSet resultSet = queryResult.result();
    for (JsonObject row : resultSet.getRows()) {
      // handle result here...
    }

    conn.close();
  });
});

千万别忘记在处理结束后调用SQLConnection.close()方法,否则连接池会被很快耗尽。

Guava Cache

显而易见,data enrichment过程中对维度数据的访问是非常频繁的,并且维度表往往也比较大,全量加载的成本可能不低。为了避免对维度数据库造成压力,并且同时加快关联的速度,在维度不太经常变动、对精确度要求不很高的情况下,就可以用缓存暂时将一部分维度数据保留在内存中,并设定合理的过期策略。缓存是典型的空间换时间思想的体现。

Google Guava专门提供了集中式、线程安全的Cache组件满足这类需求,我们可以将它近似理解成带有缓存特性的ConcurrentMap。按以下方法创建一个维度缓存。

    Cache<String, String> dimCache = CacheBuilder.newBuilder()
  .initialCapacity(10_000)
  .maximumSize(20_000)
  .expireAfterAccess(1, TimeUnit.HOURS)
  .build();

initialCapacity()方法和maximumSize()方法分别指定该缓存的初始容量和最大容量,推荐对它们有一个预估。Guava Cache的过期/刷新策略有3种,根据需求选用即可:

  • expireAfterWrite():指定数据被写入缓存之后多久过期;
  • expireAfterAccess():指定数据多久没有被访问过之后过期;
  • refreshAfterWrite():指定数据被写入缓存之后多久刷新其值(不删除)。

简单的用法如下。

    String key = /* ... */;
String value = dimCache.getIfPresent(key);
if (value == null) {
  value = getFromDatabase(key);
  dimCache.put(key, value);
}

也可以直接用get()方法一步实现“若无则计算”(compute-if-absent)的逻辑,第二个参数是一个Callable。

    String value = dimCache.get(key, () -> {
  return getFromDatabase(key);
});

关于它的详细用法(比如带自动加载的LoadingCache、基于弱/软引用的清除策略等),可以参见GitHub上的 Wiki页

Integration

扯了这么多,把三者结合起来写个示例吧。下面的AsyncFunction实现了从MySQL异步加载城市名、商品名和分类名3个维度的数据。AnalyticsAccessLogRecord是事件的POJO类。

    public static final class MySQLDimensionAsyncFunc
  extends RichAsyncFunction<AnalyticsAccessLogRecord, AnalyticsAccessLogRecord> {
  private static final long serialVersionUID = 1L;
  private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDimensionAsyncFunc.class);

  private transient SQLClient sqlClient;
  private transient volatile Cache<String, String> dimCache;

  @Override
  public void open(Configuration parameters) throws Exception {
    Properties dbProps = ParameterUtil.getFromResourceFile("mysql.properties");

    Vertx vertx = Vertx.vertx(
      new VertxOptions()
        .setWorkerPoolSize(10)
        .setEventLoopPoolSize(5)
    );

    JsonObject config = new JsonObject()
      .put("url", dbProps.getProperty("mysql.sht.url"))
      .put("driver_class", "com.mysql.jdbc.Driver")
      .put("max_pool_size", 20)
      .put("user", dbProps.getProperty("mysql.sht.user"))
      .put("password", dbProps.getProperty("mysql.sht.pass"));

    sqlClient = JDBCClient.createShared(vertx, config);

    dimCache = CacheBuilder.newBuilder()
      .initialCapacity(10_000)
      .maximumSize(20_000)
      .expireAfterAccess(1, TimeUnit.HOURS)
      .build();
  }

  @Override
  public void close() throws Exception {
    sqlClient.close();
    dimCache.invalidateAll();
  }

  @Override
  public void asyncInvoke(AnalyticsAccessLogRecord record, ResultFuture<AnalyticsAccessLogRecord> resultFuture) throws Exception {
    boolean needEnriching = false;
    long siteId = record.getSiteId();
    long categoryId = record.getCategoryId();
    long merchandiseId = record.getMerchandiseId();

    String siteCacheKey = "s" + siteId;
    String categoryCacheKey = "c" + categoryId;
    String merchandiseCacheKey = "m" + merchandiseId;

    List<String> selectSql = new ArrayList<>();

    if (siteId >= 0) {
      String name = dimCache.getIfPresent(siteCacheKey);
      if (name == null) {
        selectSql.add("SELECT 's' AS t,name AS n FROM xx_db_new.site WHERE id = " + siteId);
        needEnriching = true;
      } else {
        record.setSiteName(name);
      }
    }
    if (categoryId >= 0) {
      String name = dimCache.getIfPresent(categoryCacheKey);
      if (name == null) {
        selectSql.add("SELECT 'c' AS t,name AS n FROM xx_db_new.category WHERE id = " + categoryId);
        needEnriching = true;
      } else {
        record.setCategoryName(name);
      }
    }
    if (merchandiseId >= 0) {
      String name = dimCache.getIfPresent(merchandiseCacheKey);
      if (name == null) {
        selectSql.add("SELECT 'm' AS t,title AS n FROM xx_db_new.merchandise WHERE id = " + merchandiseId);
        needEnriching = true;
      } else {
        record.setMerchandiseName(name);
      }
    }
   
    if (needEnriching) {
      sqlClient.getConnection(connResult -> {
        if (connResult.failed()) {
          LOGGER.error("Cannot get MySQL connection via Vertx JDBC client ", connResult.cause());
          return;
        }

        SQLConnection conn = connResult.result();
        String sql = StringUtils.join(selectSql, " UNION ALL ");

        conn.query(sql, queryResult -> {
          if (queryResult.failed()) {
            LOGGER.error("Error executing SQL query: {}", sql, queryResult.cause());
            return;
          }

          ResultSet resultSet = queryResult.result();
          for (JsonObject row : resultSet.getRows()) {
            String tag = row.getString("t");
            String name = row.getString("n");

            switch (tag) {
              case "s":
                record.setSiteName(name);
                dimCache.put(siteCacheKey, name);
                break;
              case "c":
                record.setCategoryName(name);
                dimCache.put(categoryCacheKey, name);
                break;
              case "m":
                record.setMerchandiseName(name);
                dimCache.put(merchandiseCacheKey, name);
                break;
              default: break;
            }
          }

          resultFuture.complete(Collections.singletonList(record));
          conn.close();
        });
      });
    } else {
      resultFuture.complete(Collections.singletonList(record));
    }
  }
}

最后通过AsyncDataStream.(un)orderedWait()方法调用之,注意设定超时时间与异步请求的数量限制。

    DataStream<AnalyticsAccessLogRecord> recordStream = /* ... */;
DataStream<AnalyticsAccessLogRecord> enrichedRecordStream = AsyncDataStream.unorderedWait(
  recordStream,
  new MySQLDimensionAsyncFunc(),
  3, TimeUnit.SECONDS,
  100
).name("async_dimension_enrich").uid("async_dimension_enrich");

大功告成。

民那晚安晚安。

相关 [维度 数据 实时] 推荐:

维度数据实时关联的实践(w/ Flink、Vert.x & Guava Cache) - 简书

- -
在流式处理作业(特别是实时数仓ETL作业)中,我们的数据流可以视为无界事实表,其中往往缺乏一些维度信息. 例如,对于埋点日志流而言,为了减少传输冗余,可能只会带有城市ID、商品ID等,如果要映射到对应的名称,就需要与外部存储中的维度表进行关联. 这里的外部存储一般是指适合OLTP场景的数据库,如MySQL、Redis、HBase等.

谈大数据-架构维度

- - 人月神话的BLOG
本篇作为在构思大数据平台架构时候维度方面的简单点滴思考记录. 前面关于大数据平台架构的核心功能的时候谈到过,基本应该包括数据采集和集成,数据存储,数据处理,数据分析这些核心层面. 我在前面谈大数据平台的时候也谈到过平台不仅仅是云和分布式相关技术的引入,其架构一方面和传统的BI相似,但是更加重要的则是对外部应用涉及到大数据的应用场景的支撑和大数据平台本身的大数据服务能力的开放问题.

sina获取实时股票数据

- - 互联网 - ITeye博客
* 从请求的URL中获取返回的数据流. // setConnectTimeout:设置连接主机超时(单位:毫秒). // setReadTimeout:设置从主机读取数据超时(单位:毫秒). // 设置是否向httpUrlConnection输出,因为这个是post请求,参数要放在 http正文内,因此需要设为true,.

实时数据聚合怎么破

- -
实时数据分析一直是个热门话题,需要实时数据分析的场景也越来越多,如金融支付中的风控,基础运维中的监控告警,实时大盘之外,AI模型也需要消费更为实时的聚合结果来达到很好的预测效果. 实时数据分析如果讲的更加具体些,基本上会牵涉到数据聚合分析. 数据聚合分析在实时场景下,面临的新问题是什么,要解决的很好,大致有哪些方面的思路和框架可供使用,本文尝试做一下分析和厘清.

Flink 如何实时分析 Iceberg 数据湖的 CDC 数据

- - 掘金 后端
本文由李劲松、胡争分享,社区志愿者杨伟海、李培殿整理. 主要介绍在数据湖的架构中,CDC 数据实时读写的方案和原理. 文章主要分为 4 个部分内容:. 常见的 CDC 分析方案. 为何选择 Flink + Iceberg. 一、常见的 CDC 分析方案. 我们先看一下今天的 topic 需要设计的是什么.

销售行业ERP数据统计分析都有哪些维度?

- - CSDN博客综合推荐文章
当前的企业信息化建设主要包括ERP系统、OA系统等. 企业希望实现信息系统数据的整合,对企业资源进行分析汇总,方便对企业相关数据的掌控从而便于对业务流程进行及时调整监控. 但是由于系统间数据的组合众多,对于数据的分析维度也有太多选择,由于人力和系统的支持度问题,对于这些可能性的维度都进行数据分析显然并不现实,那么一个销售行业如何针对行业特点、选定维度分析数据呢就成了一个需要思考的问题.

Twitter的海量数据实时系统实现

- vento - NoSQLFan
作为世界上最大的微博客网站,Twitter也有着世界上最大的数据压力,在七月份的一份数据显示,Twitter的日发送量已经突破2亿条,其日独立访问用户早在5月份就达到1.39亿. 下面是Twitter在Qcon London的一个演讲PPT,描述了Twitter最重要的四块实时数据(Tweets,Timelines,Social graphs,实时搜索)的存储实现及架构变迁.

Storm :twitter的实时数据处理工具

- d0ngd0ng - yiihsia[互联网后端技术]_yiihsia[互联网后端技术]
昨天在家里一直发不出文章,于是干脆先发到了iteye上. Twitter在9月19日的Strange Loop大会上公布Storm的代码. 这个类似于Hadoop的即时数据处理工具是BackType开发的,后来被Twitter收购用于Twitter. Twitter列举了Storm的三大类应用:. 1. 信息流处理{Stream processing}.

开放实时数据处理平台 Twitter Storm

- We_Get - 开源中国社区最新软件
Storm 代码来自于Twitter上月收购的BackType,似乎是Twitter为方便用户解析数据的努力. 现在Storm的势头相当强劲,Twitter开发的使其完美的工具,已经变得非常强大. 类似于Hadoop,另一个开源数据操作平台,Storm也可能成为一项大业务. 据报道,雅虎正在考虑分拆Hadoop,打造一个规模达数十亿美元的业务.