唯品会实时计算平台的演进之路
一、实时平台现状
先介绍一下我们整个平台的现状,按计算的话,分为 Storm、Spark、Flink 三个主要的计算引擎,Flink 相应的应用数量目前少一些,不过按照整个计算引擎的发展方式,后续我们还是希望以 Flink 为主做相应的业务推进。
1、核心业务
- 实时推荐引擎:这块是非常核心的业务,对于大数据来说这些都是个性化推荐、实时推荐;
- 实时看板:我们称为给到总裁级别、各个业务线的运营和商务的数据,例如各个业务线上的实时销售和用户访问数据;
实时数据,包括 MySQL binlog 数据和用户行为数据:
我们通过 VDP 将 binlog 解析后同步到 Kafka 中,给到后续业务方;
- 用户行为数据是把 app 或者微信等埋点曝光的数据清洗出来,给后续的实时应用;
- 金融风控与安全业务的风控;
- 比价:大家在购买电商商品的时候会比价,我们平台方面也会做,看同样一件商品在其他电商售价多少,以决定我们自己的价格策略;
- 监控:我们有很多实时的监控系统业务指标,需要利用到实时的指标,比如全站的 PV、UV 等。
2、实时平台的职责
整个实时平台我们可以看两大板块:
- 实时计算平台:提供基础数据的支持,保证应用的监控数据的稳定性,提供相应的开发支持,包括应用的开发方案评审和引导;
- 实时基础数据:很多时候我们需要提供基础数据(埋点、Binlog 数据)的清洗打宽,这方面技术性也很强,会汇集到各种业务方。此外,格式的定义、自然的监控也是我们来做,会提供技术平台和技术数据,这些体现着平台的重要职责。
二、实时平台的发展历程
1、早期
整个实时平台的发展过程中,实时这方面也是从 0 到 1,大规模开始进行实时的业务大概是在 2013、2014 年,期间在不断地产生和迭代。
- Storm 是最早成熟的实时计算框架,最开始也是基于 Storm 做实时应用开发和推广。
- 早期也在 Storm 尝试去做 SQL 相关的事,那时候想法挺美好,但是因为实时的 SQL 能完成的事情,包括框架的成熟度比较低的情况下,SQL 相互碰撞,步子迈得太大了一点,就死在了沙滩上,我们当时能够真正帮到业务用 SQL 完成的事情还太少。现在相对而言,实时框架上的 SQL(Flink SQL)完善了很多,可以推广使用。
- 有段时间也做所有实时应用的对接,对我们来说最大的就是资源瓶颈,人员不足,后面还是聚焦平台为主,包括负责部分核心应用。
2、现在
整个实时平台的发展过程经历了几个阶段,具体的技术发展过程如下:
首先是有了 VRC,用于任务管理、监控、告警、数据质量。实时应用都是线上的应用,没有平台统一管控的话,实时应用是否 OK 或数据是否延迟等情况出现,就都不知道。有了实时平台之后,我们就知道实际应用运行到底怎么样。
Storm 比较完善后,计算框架增加 Spark Streaming。
很多用户会用到我们提供基础的数据,比如流量的数据、Binlog 等,并且我们会提供业务上的聚合和打宽。因为很多情况下我们用流量数据时,希望这个数据能包含的维度比较多,最典型的维度是用户过来以后,基本上有对应的设备 ID 转换成唯品会的 ID。
这个设备 ID 关联的用户的性别、姓名,或者用户的基本会员等级、用户偏好等一些基础的数据我们直接关联好,后续做实时任务就简单了很多。
2018 年我们开始推广 Flink 相关的应用,团队内部做的应用全部切到了 Flink。也是从 2018 年 6 月开始,调度开始切换到 Kubernetes 上。我们的实时平台不仅做实时相关的事,也做很多跟机器学习相关的工作,希望机器资源能更好地去共享。
机器资源对于电商来说,会有一个很明显的特征。在 618、双十一这样的大促时,随着数据量的暴增,资源的消耗量也会迅猛增长。
大促结束之后,这些应用根本就不需要这些资源,那么在平常一些时候,这些资源可以给机器学习更多的资源,通过统一资源管理框架调度系统做的这件事情,这是我们切换到 Kubernetes 上的很重要的原因。
我们一直往后面推进,里面也有很多的技术问题需要解决。
3、平台架构
实时平台来说,各家公司的架构基本上差不多。主要的核心数据就是两方面:一是数据源自用户行为的数据;另外就是线上对接的业务数据库的数据。
实时数据到达后,通过计算引擎完成业务逻辑的计算,以及和其他数据的交互,包括支持实时访问的存储系统。数据的结果可以给到实时报表、推荐引擎,也有给到机器学习平台进行模型训练。
整个数据流的过程会经历很多的环节,通过很多次计算的过程,对稳定性和可靠性都有比较高的要求,核心体系架构如下所示:
实时平台提供一个交互式开发的环境,一些简单的逻辑或者可以用 SQL 表达到的;复杂逻辑会使用计算框架的 API 进行开发。
对于任务来说,版本、数据质量以及元数据系统和离线还不太一样。离线的元数据系统都是表,任务很多都是依赖报表来完成。
实时的元数据系统就会考虑到它是在哪;我怎么可以访问到它;它里面数据的格式是怎样的。因为数据可能是 PB 格式,也可能是 JSON,我们会考虑怎么把这个数据格式对业务方透明,在上面做相应的封装来保证业务的方式比较简洁。
实时计算在从早期的荒野到现在逐步完善,我用痛并快乐来形容:痛是因为它开发的难度、开发的要求比离线和其他应用开发要难;快乐在问题解决以后,以及对业务的增长带来成就。
三、实时计算的难点和挑战
1、开发复杂度
首先说一下实时开发的挑战。比如实时所有的数据,在我们的处理中都是以流形式过来的,没有开始和结束的概念,离线的如果用 Spark SQL 去处理数据就有明确的开始结束时间。
就如我们做订单处理,其实这个订单的状态是一直变化的。如添加的购物车、下单、支付、物流和收货以及售后等。
物流之前,订单还会扭转到物流仓库。在包括售后和退货等环节后,这整个订单的持续过程会很久,它随时随地在变化。因为一个月以后订单发生的质量问题,也需要平台来处理。
整个计算链路也比较长,同时涉及到的系统有很多,对计算引擎、存储这些都会要求。
相应地,做实时开发有几个讨论的核心问题:
- 设置各个组件的并行度是不是合理?是不是可以满足需求?怎么去评估我这是合理的还是说它太大还是太小?
- 面对计算的结果和内部的计算中间结果数据,用什么来存储?中间是没有状态数据的,这状态数据如果量比较大,并且需要跟外部系统密切交互的时候,如果外部系统响应比较慢或者延迟比较高,就会严重影响实时处理的效率;
- 异常处理困难,多个流之间的处理很具有挑战性,现在来说,没有一个非常完美的方案能解决多个流之间的关联关系。离线处理相对比较简单,但是在实时处理的时候,问题就复杂了很多;
- 一致性问题:实时数据要做到 Exactly Once 也是非常有挑战的,对系统有非常高的要求;
- 数据准确性和校验,很多时候做的实时指标都要跟离线进行对数,如何保证实时和离线一致的,需要考虑到的问题就特别多。
2、技术难度
乱序问题
多个流要关联的话,乱序问题是非常麻烦的。因为很多的数据过来的时候会存在乱序问题。有一条数据可能是早发生的,但是后收到了,因为整个数据上报过来的时候,经历了很多很多的重输,经历了各种设备,很难保证数据是严格有序的。
吞吐和延迟
延迟和吞吐 TPS 通常是矛盾的,你希望 TPS 高,怎么办?
通常加大批处理的模式,我希望一次可以处理一批数据,这样吞吐量就上去了;但是这样延迟就会增加。很多时候面临的具体是什么问题,根据具体问题来给出解决方案。
内部的状态数量
比如计算 UV,UV 的维度会非常多,甚至细到商品粒度。
我们在线的商品几百万的时候,对于商品的 UV,以及跟商品关联的的品牌、档期等,这些交叉之后所需要的计算的指标特别大,可能达到千万级的统计量。
怎么去做状态数据的存储,包括这些数据的可用性,就非常重要了。如果任务处理失败了,怎么快速的恢复等。
四、实时平台的发展方向
对于实时平台来说,这些年的发展都是在做一件事情——降低开发门槛。首先,从平台本身来说,我们做了这几件事情:
1、丰富实时基础数据——实时离线融合
提供核心基础数据
我们做的第一件事是提供核心基础数据。这个很多时候可以降低实时业务后续开发难度。因为前面我们刚刚提到了做多种实时宽表以后,业务做实时应用其实就只是处理单一流。
在前置的任务中,我们把很多特别难搞的问题都解决掉,包括关联到其他维度的数据,多个数据源之间的乱序等,关联好的数据输出到一个流中,后续实时的应用处理单一流就简单了很多。
实时和离线统一
因为大家都知道有一种 Labmda 提法,我实时地算一遍,然后用批处理去补数。
但这会面临两个问题,一是数据需要两个计算框架,工作量大了很多;二是数据重复计算,导致更多的资源消耗。
我们现在来做里面的事情,能实时处理的数据就不会再去离线处理。因为实时处理的实时清洗打宽以后,我们以五分钟的粒度写入 HDFS 的准实时表里面,离线任务就可以启动来使用这些数据了。这样,很多数据的的口径就做到了一致性。
业务方有需求要一个他们认为的实时数据指标,首先作为平台来说,就会问你一个直白的问题:你希望这个实时是什么概念?是秒级还是分钟级?
因为对于实时的时间密度大家理解是不一致的,如果在分钟级以上我们会根据这些数据通过准实时或者离线的方案来完成数据的输出。因为离线 SQL 的开发成本是最低的,可能我直接写 SQL 就能完成。
相应的,如果你真的要求数据时间非常短,那要考虑真正实时的应用怎么开发。
2、统一计算资源:基于 Kubernetes 调度实时和 AI 平台资源
前面我们也提到了在管理计算资源的时候,如果做到 Storm、Flink 或者 Spark 等任务通过 Kubernetes 统一资源的调度管理后,能快速地对应用扩收容。实时平台的管理实时应用接近 800 个,所以需要快速地提升自身应用的能力和变更。
3、新的开发模式
实时平台现在推广基于 Streaming SQL 的开发模式,底层的计算引擎以 Flink 和 Spark Structured Streaming 为主。
因为不管是 Flink 还是 Spark Structured Streaming,都支持状态存储,便于错误恢复和可靠性保障。
第二个是相应的,我们整个 API 要更高级一些。相应的,写一些代码的时候,也会有非常大的简化。
整个来说,开发平台也在预研实时的 notebook 开发环境,你可以完成相应的实时应用的开发,整个模式都是在替换 Storm 的节奏。
4、效率提升
在我们的实践中,举一些使用 Flink 带来的显著好处的案例。
UV 计算
在以 Storm 为计算框架的时候,状态存储通常是在 Redis 中,这样会需要大量的 Redis 资源。
现在完全可以用 Flink 来做,它的好处是我们有很多的计算任务,通过这样的转化,它计算的稳定性、可靠性得以提升,计算资源消耗降低 2/3。
以我们自己 UV 中品牌日这个场景为例,同时会有上百个品牌日在线,因此每个品牌日的指标需要全部的预算指标。在 Storm 为计算的时候甚至用到了 240 个以上的 Worker 节点,并且配合了 2T 的 Redis 存储。
切到了 Flink 以后,因为 Flink 所有的状态都是在内存中,这个过程中少了跟 Redis 的交互,效率会有非常大的提升。同时整个的依赖组件越少,系统稳定性就会越高。
埋点数据落地
我们很多实时的在 Kafka 里面的数据通过 Spark Streaming 写入到 HDFS 中,然后通过 Hive、Spark SQL 来访问。
在使用 Flink 里带的 Bucket Sink 模式,只需要十分之一的资源就达到了原来 Spark Streaming 就能达到相应的吞吐量,并且延迟也大幅度的降低。
5、统一数据源
1)UDM 架构
当前实时系统,机器学习平台要处理的数据分布在各种数据存储组件中,如 Kafka、Redis、Tair、HDFS 等等,如何方便高效的访问、处理、共享这些数据是一个很大的挑战,对于当前的数据访问和解析常常需要耗费很多的精力。主要的痛点包括:
- 对于 Kafka,Redis,Tair 中的 binary(PB/Avro 等格式)数据,使用者无法快速直接的了解数据的 schema 与数据内容,采集数据内容及与写入者的沟通成本很高。
- 由于缺少独立的统一数据系统服务,对 Kafka,Redis,Tair 等中的 binary 数据访问需要依赖写入者提供的信息,如 proto 生成类,数据格式 wiki 定义等,维护成本高,容易出错。
- 缺乏 relational schema 使得使用者无法直接基于更高效易用的 SQL 或 LINQ 层 API 开发业务。
- 无法通过一个独立的服务方便的发布和共享数据。
- 实时数据无法直接提供给 Batch SQL 引擎使用。
- 此外,对于当前大部分的数据源的访问也缺少审计,权限管理,访问监控,跟踪等特性。
UDM(统一数据管理系统)包括 Location Manager、 Schema Metastore 以及 Client Proxy 等模块,主要的功能包括:
- 提供从名字到地址的映射服务,使用者通过抽象名字而不是具体地址访问数据。
- 用户可以方便的通过 Web GUI 界面方便的查看数据 Schema,探查数据内容。
- 提供支持审计,监控,溯源等附加功能的 Client API Proxy。
- 在 Spark/Flink/Storm 等框架中,以最适合使用的形式提供这些数据源的封装。
UDM 的体系架构如下图所示:
2)基于 UDM 的开发模式
在基于 UDM 的基础上,实时和离线可以统一开发模式,其示例代码如下:
从示例可以看出,不管 Flink 还是 Spark 都是统一的开发模式,而且代码非常简洁。
Q & A
Q1:我们也是做这个的,你的大数据平台上所有的组件都是容器化的还是混合的?是属于计算类的?
A1:现在大数据的所有组件要容器化很难,存储我们还没有做容器化相关的事,更多的是做偏计算引擎,就是我计算容器化掉,能给到很多更明确的意义。
比如说我很难告诉你一个在 YARN 上的任务到底吃掉了多少 CPU,到底系统有没有问题。那现在容器化后就可以非常好地监控应用的资源消耗,也非常容易判断资源是否使用合理。
第二个,容器化以后,可以开放一些更高的权限给应用的开发者,我可以允许你进到容器里去做一些以前需要高权限才能做的事,比如说要去做一些性能分析问题的时候,在物理机上不太可能给你比较高的权限的,你进去看到的包括其他开发者的应用,操作不当可能把组件都搞挂了。
但是在容器环境下,你登录到容器里面,搞死了就是你自己把自己的应用搞死了,别人的你根本就看不到。
我们目前是计算类的。对于大数据无外乎就计算和存储两大类。存储容器化目前来说,优势不明显,反而会带来更多问题。
现在对于大数据来说,一个趋势是以前我们希望计算和存储在一起,现在由于网络带宽越来越高,我们偏好计算和存储分离。因为大的互联网公司基本上都是从至少单机万兆以上的网络。相对而言,数据的本地性重要性就没有那么高。存储和计算分离以后,计算节点扩展就更加容易。
Q2:我想问下,如果在流计算的统计过程中要维护很多状态,比如有几十亿的要维护,根据您刚刚提到的技术难点,能讲讲是怎样解决的吗?下一个问题是各个节点写入不均衡,怎么去监控,怎么解决?怎么样看数据倾斜呢?
A2:第一个问题,如果以 Storm 来说,可以做两级存储,依赖 Storm 我肯定会在自己 Worker 的 JVM 内部做一级缓存。
达到一定的时间和一定量之后,刷到外部的二级存储里面。计算维度数据的统计指标非常庞大的时候,如果直接往外部输,说得直白一点,再强悍的外部存储的性能都扛不住。你也提到了上亿的时候,每秒输出的数据都是几百万 QPS,基本上没有一个存储的引擎可以在比较合理的规模来支持这个量级(除非依赖更大规模的分布式存储引擎)。
因此一般都是两级存储,到了一定时间或者一定状态数量之后把它刷新到外部的存储引擎。这样就有一个问题,你会丢掉最近未刷新出去的那批数据,这就要求你可以从上一个刷新出去的数据状态支持恢复,或者重复处理。
Storm 完全依赖自己的状态管理,数据出现失败,或者恢复的时候,需要自己处理比较多的事情。Flink 相对而言就好很多,支持从 checkpoint 或者 savepoint 直接恢复。
第二个问题对于数据倾斜来说,基本上所有的数据倾斜的处理都是两阶段处理。即先做一级计算汇总,一级计算之后再做二级处理;或者明确知道哪些数据分布特别不均匀,把这些数据单独处理。
关于怎么样看数据倾斜,观察每一级 Task 往下一级 Task 写出的数据量的分布就可以了。在我看来,数据量最大的比最小的高了十倍就肯定是倾斜,当然也可以计算标准差。
Q3:您之前说唯品会是电商方面的,我们公司也是电商方面的。想问关于订单方面的状态您是怎么维持它的瞬时性的?
A3:对于订单,或者还是其他的业务数据,就是两种场景,一种场景要求我们自己做的工具,我们对应的同一张表,哪怕是分表的数据,同一个表或者分库分表写到同一个 Kafka 的 Topic 里面,并且根据分表做 Partition 映射,这样可以保证局部一致性,即同一个物理表的数据是严格有序的。要保证全局一致性,其实是保证不了的。
但是,对业务来说,局部的一致性就 OK 了,同一个订单只能写到同一个分表上。这样在读取数据的时候,就能做到按相应顺序消费。
第二种,像一些特殊的场景,可能还是会依赖我们自己把秒级数据抽取,即高频的以 SQL 方式去拉取最新的数据写入到 Kafka 中。
很多时候我们做订单统计,也会缓存所有的订单计算过程一些中间状态。最典型的,订单的中间状态的变化是很复杂的一件事。我下单时第一件事是拆单,一个订单能买三件商品,我们有复杂的拆单逻辑,即将这个母单会拆成三个子单,或者两个子单。
因为对应了不同的物流仓库,也可能有下的海淘的商品。在签收之前的过程中你可能又取消了一个子单,或者把三个子单都取消,那这个订单就彻底取消掉了。
因此我们算单数的时候,就要全部减掉。整订单的各种状态我们会把它缓存起来。因为像我们的订单大概有接近 40 个状态,只是有些状态可能不见得会在统计的时候用到。
作者介绍
王新春,唯品会高级经理、数据平台实时团队高级架构师,主要负责实时计算平台、实时数据、实时报表和机器学习平台等业务;曾任美团点评(原大众点评)数据平台高级架构师,负责从零开始搭建实时计算平台及数据平台工具体系开发和建设等工作。