Uber 的实时数据分析系统架构 - 网站架构札记
Uber 实时系统的 Use case:
- 把乘客匹配给司机
- 计算乘车费
- 预计乘车时间,司机到达时间等
举一个更详细些的例子,UberEATS 是 Uber 的外卖服务。实时系统也为这个功能估算送餐时间。其中需要考虑的因素有:
- 餐厅此时的繁忙程度
- 做这份菜需要的时间
- 路上的交通状况
- 大致有多少可以供派遣的送餐车
所有来自乘客和司机的事件 event ,由 Kafka 收集 。Kafka 使用 Pub-sub 的订阅发布模式。Uber 整个系统中各个 microservice 之间的通信也通过了 Kafka。 Uber 使用 Samza 或 Flink 之类的工具做流处理。Kafka 也被用来记录数据库 schema(结构)的变化。
规模
Uber 处理 events 的规模在每天万亿条的级别,大约是百万 events 每秒。数据量在 PB 级。
上图是 Uber real time system 的大体架构。Kafka events 的来源主要是客户端 app,web API / Service 以及数据库。然后 event message 被 Kafka 分发给其他组件。Surge 负责计算乘车费用。ELK 负责做 logging,为 debug 提供数据。 Samza / Flink 处理实时数据,为数据分析和警报功能提供依据。AWS S3 和 Hadoop 也记录 message,供线下数据分析。
Uber 对于这个系统的要求是:
- 低延迟 Low latancy:API 延迟在 5ms 以内
- 高可用性 High availability:可用性在 99.99% 以上,意思就是全年宕机大约不超过1小时
- 多个数据中心之间要可以做数据复制
- 支持做 message 的数据审计,auditing
Kafka 的架构和流程
Uber 的 Kafka 架构,是由每个地区的 regional Kafka 将 message 向中心汇总。这其中,Kafka REST proxy 拥有候补服务器 secondary Kafka。因为在 uber 的 Kafka use case 中,有些需求对于高可用性要求非常高,比如用户订车。而有些数据允许丢失,但是要求很低的延迟,比如 logging data。
为了实现 low latancy 以及 high availability,最重要的两点是:在每个步骤都采用大量批处理(batching)和异步处理(async processing),一定不要阻塞服务器。
举个例子说,从 uber 的手机app上将一条 queue message 发送给 Kafka,是通过 app 里面的 proxy client library。而 proxy client 收到 message 后,立刻 acknowledge,然后将 message 放入一个缓存 buffer 中。当 buffer 填满时,所有 buffer 里面的 message 被打包成一个 batch,一起发送给 Kafka proxy server。这样可以实现 high throughput 和 low latancy 。而在后面, Kafka proxy server 也是一样,收到这个 batch 之后,立刻 acknowledge,然后根据 message 的 broker 不同,重新分类 message,缓存并打包成 batch ,发送给 Regional Kafka。
Uber 的 Proxy client library是自己实现的。它的特点是:
- 支持高吞吐量 high throughput:非阻塞,异步,批处理
- 当 kafka server 出现故障时,能缓存 message 在本地。等 server 恢复之后,再逐步发给 server
- Topic Discovery:类似于 service discovery 的思想。Topic Discovery 负责自动发现哪个 topic 在哪个 kafka cluster 上
uReplicator是为了改进 Kafka 的 mirror maker功能。当一个 broker 被删除时, Kafka 会对 partition 进行 rebalance。从一个 cluster 复制 partition 到另一个 cluster 时,如果一个 cluster 已经有很多 partition 了(比如 500 个以上),mirror maker 功能会变得很慢。
uReplicator 使用了 Apache Helix技术改进了 re-balancing 的过程。
uReplicator 的 github 地址: https://github.com/uber/uReplicator
Uber 开源项目主页: https://uber.github.io/
关于缓存和批处理
有些特别的 use case ,并不适合使用 buffer 加 batch 的机制,比如支付。在前面看到的流程中,如果 app 或者 proxy server 崩溃了,都会造成缓存里的数据丢失。
因此,Uber 的 Kafka 也是要支持同步机制的。类似支付这样的 message,会被 proxy client 同步发给 proxy server ,但不会放进buffer ,proxy client 也不会立刻 acknowledge。Proxy server 发送 message 给 regional Kafka 时,需要至少三台服务器返回 ack ,proxy server 才向 proxy client 返回 ack。此时为了保证高一致性,而增加了延迟。Uber 的系统支持对各个步骤的机制做微调,以找到最适合自己的配置。
审计工具 Chaperone
Chaperone是 Uber 开发的审计工具,主要负责统计和分析 message 的信息,包括 message 的数量,以及延迟。Uber 在这个 pipeline 的每个阶段几乎都插入了 Chaperone。app 客户端会发送数据给 Chaperone web service,Kafka cluster 会发送数据给 Chaperone kafka service。Uber 以每10分钟作为一个时间片,将数据统计给 Chaperone report service,以供数据分析。Report service 使用跨地区的 Cassandra 数据库。根据这些 message 数量的统计信息,如果来自客户端的统计和服务器端的统计有较大不匹配,也可以看出服务器是否出现故障。
Chaperone 服务于2015年在 Uber 上线使用,审计超过2万个 Kafka topic。
Clsuter Balancing 服务器集群平衡工具
Kafka 的数据是采用 consistent hashing 的方式存储的。上图展示了 consistent hashing 的结构。当一个 partition 的数据过多时,Kafka 并没有一个自动的 balacing 解决方案。 Uber 开发了一个工具,当一个 cluster 的数据过多时,会自动生成一个后台任务,创建一个新的 partition,并且做数据迁移。
原演讲地址:How Uber scaled its Real Time Infrastructure to Trillion events per day