Kafka实战-数据持久化 - 哥不是小萝莉

标签: kafka 数据 小萝莉 | 发表时间:2015-07-15 14:32 | 作者:哥不是小萝莉
出处:

1.概述

  经过前面Kafka实战系列的学习,我们通过学习《 Kafka实战-入门》了解Kafka的应用场景和基本原理,《 Kafka实战-Kafka Cluster》一文给大家分享了Kafka集群的搭建部署,让大家掌握了集群的搭建步骤,《 Kafka实战-实时日志统计流程》一文给大家讲解一个项目(或者说是系统)的整体流程,《 Kafka实战-Flume到Kafka》一文给大家介绍了Kafka的数据生产过程,《 Kafka实战-Kafka到Storm》一文给大家介绍了Kafka的数据消费,通过Storm来实时计算处理。今天进入Kafka实战的最后一个环节,那就是Kafka实战的结果的数据持久化。下面是今天要分享的内容目录:

  • 结果持久化
  • 实现过程
  • 结果预览

  下面开始今天的分享内容。

2.结果持久化

  一般,我们在进行实时计算,将结果统计处理后,需要将结果进行输出,供前端工程师去展示我们统计的结果(所说的报表)。结果的存储,这里我们选择的是Redis+MySQL进行存储,下面用一张图来展示这个持久化的流程,如下图所示:

  从途中可以看出,实时计算的部分由Storm集群去完成,然后将计算的结果输出到Redis和MySQL库中进行持久化,给前端展示提供数据源。接下来,我给大家介绍如何实现这部分流程。

3.实现过程

  首先,我们去实现Storm的计算结果输出到Redis库中,代码如下所示:

package cn.hadoop.hdfs.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import redis.clients.jedis.Jedis;
import cn.hadoop.hdfs.util.JedisFactory;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

/**
* @Date Jun 10, 2015
*
* @Author dengjie
*
* @Note Calc WordsCount eg.
*/
public class WordsCounterBlots implements IRichBolt {

/**
*
*/
private static final long serialVersionUID = -619395076356762569L;

OutputCollector collector;
Map<String, Integer> counter;

@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.counter = new HashMap<String, Integer>();
}

public void execute(Tuple input) {
String word = input.getString(0);
Integer integer = this.counter.get(word);
if (integer != null) {
integer += 1;
this.counter.put(word, integer);
} else {
this.counter.put(word, 1);
}
for (Entry<String, Integer> entry : this.counter.entrySet()) {
// write result to redis
Jedis jedis = JedisFactory.getJedisInstance("real-time");
jedis.set(entry.getKey(), entry.getValue().toString());

// write result to mysql
// ...
}
this.collector.ack(input);
}

public void cleanup() {
// TODO Auto-generated method stub

}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub

}

public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

}

   注:这里关于输出到MySQL就不赘述了,大家可以按需处理即可。

4.结果预览

  在实现持久化到Redis的代码实现后,接下来,我们通过提交Storm作业,来观察是否将计算后的结果持久化到了Redis集群中。结果如下图所示:

  通过Redis的Client来浏览存储的Key值,可以观察统计的结果持久化到来Redis中。

5.总结

  我们在提交作业到Storm集群的时候需要观察作业运行状况,有可能会出现异常,我们可以通过Storm UI界面来观察,会有提示异常信息的详细描述。若是出错,大家可以通过Storm UI的错误信息和Log日志打印的错误信息来定位出原因,从而找到对应的解决办法。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

 


本文链接: Kafka实战-数据持久化,转载请注明。

相关 [kafka 数据 小萝莉] 推荐:

Kafka实战-数据持久化 - 哥不是小萝莉

- - 博客园_首页
今天进入Kafka实战的最后一个环节,那就是Kafka实战的结果的数据持久化.   一般,我们在进行实时计算,将结果统计处理后,需要将结果进行输出,供前端工程师去展示我们统计的结果(所说的报表). 结果的存储,这里我们选择的是Redis+MySQL进行存储,下面用一张图来展示这个持久化的流程,如下图所示:.

Kafka实战-Flume到Kafka - 哥不是小萝莉

- - 博客园_首页
  前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.   Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到Kafka(供实时计算处理)和HDFS(离线计算处理).

Kafka实战-实时日志统计流程 - 哥不是小萝莉

- - 博客园_首页
  在《 Kafka实战-简单示例》一文中给大家介绍来Kafka的简单示例,演示了如何编写Kafka的代码去生产数据和消费数据,今天给大家介绍如何去整合一个完整的项目,本篇博客我打算为大家介绍Flume+Kafka+Storm的实时日志统计,由于涉及的内容较多,这里先给大家梳理一个项目的运用这些技术的流程.

Kafka幂等性原理及实现剖析 - 哥不是小萝莉 - 博客园

- -
最近和一些同学交流的时候反馈说,在面试Kafka时,被问到Kafka组件组成部分、API使用、Consumer和Producer原理及作用等问题都能详细作答. 但是,问到一个平时不注意的问题,就是Kafka的幂等性,被卡主了. 那么,今天笔者就为大家来剖析一下Kafka的幂等性原理及实现. 2.1 Kafka为啥需要幂等性.

HBase BulkLoad批量写入数据实战 - 哥不是小萝莉 - 博客园

- -
在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等. 这些方式,在导入数据的过程中,如果数据量过大,可能耗时会比较严重或者占用HBase集群资源较多(如磁盘IO、HBase Handler数等).

高速数据总线kafka介绍

- - 数据库 - ITeye博客
在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转. 有没有一个系统可以同时搞定在线应用(消息)和离线应用(数据文件,日志). 2、降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的作用.

kafka数据可靠性深度解读

- - CSDN博客推荐文章
Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用. 目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark等都支持与Kafka集成. Kafka凭借着自身的优势,越来越受到互联网企业的青睐,唯品会也采用Kafka作为其内部核心消息引擎之一.

Avoiding Data Loss - 避免Kafka数据丢失

- -
If for some reason the producer cannot deliver messages that have been consumed and committed by the consumer, it is possible for a MirrorMaker process to lose data..

大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

- - 行业应用 - ITeye博客
大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目. 对于离线处理,hadoop还是比较适合的,但是对于实时性比较强的,数据量比较大的,我们可以采用Storm,那么Storm和什么技术搭配,才能够做一个适合自己的项目. 可以带着下面问题来阅读本文章:. 1.一个好的项目架构应该具备什么特点.

实用 | 从Apache Kafka到Apache Spark安全读取数据

- - IT瘾-bigdata
随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要. 本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两个关键组件进行了说明. Cloudera Distribution of Apache Kafka 2.0.0版本(基于Apache Kafka 0.9.0)引入了一种新型的Kafka消费者API,可以允许消费者从安全的Kafka集群中读取数据.