实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

标签: Spark 大数据平台 kafka spark 实时计算 | 发表时间:2018-02-22 14:32 | 作者:lxw1234@qq.com
出处:http://lxw1234.com

本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了。

在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once。本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的。

spark streaming

1. 关于数据源

数据源是文本格式的日志,由Nginx产生,存放于日志服务器上。在日志服务器上部署Flume Agent,使用TAILDIR Source和Kafka Sink,将日志采集到Kafka进行临时存储。日志格式如下:

2018-02-22T00:00:00+08:00|~|200|~|/test? pcid=DEIBAH& siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=GLLIEG&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJMEC&siteid=8

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HMGBDE&siteid=3

2018-02-22T00:00:00+08:00|~|200|~|/test?pcid=HIJFLA&siteid=4

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=JCEBBC&siteid=9

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=KJLAKG&siteid=8

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=FHEIKI&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IGIDLB&siteid=3

2018-02-22T00:00:01+08:00|~|200|~|/test?pcid=IIIJCD&siteid=5

日志是由测试程序模拟产生的,字段之间由|~|分隔。

2. 实时计算需求

分天、分小时PV;

分天、分小时、分网站(siteid)PV;

分天 UV;

3. Spark Streaming消费Kafka数据

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

在Spark Streaming中消费Kafka数据,保证Exactly-once的核心有三点:

使用Direct方式连接Kafka;自己保存和维护Offset;更新Offset和计算在同一事务中完成;

后面的Spark Streaming程序(文章结尾),主要有以下步骤:

  1. 启动后,先从Redis中获取上次保存的Offset,Redis中的key为”topic_partition”,即每个分区维护一个Offset;
  2. 使用获取到的Offset,创建DirectStream;
  3. 在处理每批次的消息时,利用Redis的事务机制,确保在Redis中指标的计算和Offset的更新维护,在同一事务中完成。只有这两者同步,才能真正保证消息的Exactly-once。
./spark-submit \
--class com.lxw1234.spark.TestSparkStreaming \
--master local[2] \
--conf spark.streaming.kafka.maxRatePerPartition=20000 \
--jars /data1/home/dmp/lxw/realtime/commons-pool2-2.3.jar,\
/data1/home/dmp/lxw/realtime/jedis-2.9.0.jar,\
/data1/home/dmp/lxw/realtime/kafka-clients-0.11.0.1.jar,\
/data1/home/dmp/lxw/realtime/spark-streaming-kafka-0-10_2.11-2.2.1.jar \
/data1/home/dmp/lxw/realtime/testsparkstreaming.jar \
--executor-memory 4G \
--num-executors 1

在启动Spark Streaming程序时候,有个参数最好指定:

spark.streaming.kafka.maxRatePerPartition=20000(每秒钟从topic的每个partition最多消费的消息条数)

如果程序第一次运行,或者因为某种原因暂停了很久重新启动时候,会积累很多消息,如果这些消息同时被消费,很有可能会因为内存不够而挂掉,因此,需要根据实际的数据量大小,以及批次的间隔时间来设置该参数,以限定批次的消息量。

如果该参数设置20000,而批次间隔时间未10秒,那么每个批次最多从Kafka中消费20万消息。

4. Redis中的数据模型

  • 分小时、分网站PV

普通K-V结构,计算时候使用incr命令递增,

Key为 “site_pv_网站ID_小时”,

如:site_pv_9_2018-02-21-00、site_pv_10_2018-02-21-01

该数据模型用于计算分网站的按小时及按天PV。

  • 分小时PV

普通K-V结构,计算时候使用incr命令递增,

Key为“pv_小时”,如:pv_2018-02-21-14、pv_2018-02-22-03

该数据模型用于计算按小时及按天总PV。

  • 分天UV

Set结构,计算时候使用sadd命令添加,

Key为”uv_天”,如:uv_2018-02-21、uv_2018-02-20

该数据模型用户计算按天UV(获取时候使用SCARD命令获取Set元素个数)

 

注:这些Key对应的时间,均由实际消息中的第一个字段(时间)而定。

5. 故障恢复

如果Spark Streaming程序因为停电、网络等意外情况终止而需要恢复,则直接重启即可;

如果因为其他原因需要重新计算某一时间段的消息,可以先删除Redis中对应时间段内的Key,然后从原始日志中截取该时间段内的消息,当做新消息添加至Kafka,由Spark Streaming程序重新消费并进行计算;

6. 附程序

依赖jar包:

commons-pool2-2.3.jar

jedis-2.9.0.jar

kafka-clients-0.11.0.1.jar

spark-streaming-kafka-0-10_2.11-2.2.1.jar

InternalRedisClient (Redis链接池)

package com.lxw1234.spark

import redis.clients.jedis.JedisPool
import org.apache.commons.pool2.impl.GenericObjectPoolConfig

/**
 * @author lxw1234
 */
/**
 * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
 */
object InternalRedisClient extends Serializable {
  
  @transient private var pool: JedisPool = null
  
  def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
      maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
    makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)   
  }
  
  def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
      maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
      testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
    if(pool == null) {
         val poolConfig = new GenericObjectPoolConfig()
         poolConfig.setMaxTotal(maxTotal)
         poolConfig.setMaxIdle(maxIdle)
         poolConfig.setMinIdle(minIdle)
         poolConfig.setTestOnBorrow(testOnBorrow)
         poolConfig.setTestOnReturn(testOnReturn)
         poolConfig.setMaxWaitMillis(maxWaitMillis)
         pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)
         
         val hook = new Thread{
             override def run = pool.destroy()
         }
         sys.addShutdownHook(hook.run)
    }
  }
  
  def getPool: JedisPool = {
    assert(pool != null)
    pool
  }
}

TestSparkStreaming

package com.lxw1234.spark

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies
import org.apache.spark.streaming.kafka010.HasOffsetRanges
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies

import redis.clients.jedis.Pipeline


/**
 * @author lxw1234
 * 获取topic最小的offset
 * ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list datadev1:9092 --topic lxw1234 --time -2
 */
object TestSparkStreaming {
  
  def main(args : Array[String]) : Unit = {
    val brokers = "datadev1:9092"
    val topic = "lxw1234"
    val partition : Int = 0 //测试topic只有一个分区
    val start_offset : Long = 0l
    
    //Kafka参数
    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> brokers,
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "exactly-once",
        "enable.auto.commit" -> (false: java.lang.Boolean),
        "auto.offset.reset" -> "none"
    )
    
    // Redis configurations
    val maxTotal = 10
    val maxIdle = 10
    val minIdle = 1
    val redisHost = "172.16.213.79"
    val redisPort = 6379
    val redisTimeout = 30000
    //默认db,用户存放Offset和pv数据
    val dbDefaultIndex = 8
    InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
    
        
    val conf = new SparkConf().setAppName("TestSparkStreaming").setIfMissing("spark.master", "local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))
    
    //从Redis获取上一次存的Offset
    val jedis = InternalRedisClient.getPool.getResource
    jedis.select(dbDefaultIndex)
    val topic_partition_key = topic + "_" + partition
    var lastOffset = 0l 
    val lastSavedOffset = jedis.get(topic_partition_key)
    
    if(null != lastSavedOffset) {
      try {
        lastOffset = lastSavedOffset.toLong
      } catch {
        case ex : Exception => println(ex.getMessage)
        println("get lastSavedOffset error, lastSavedOffset from redis [" + lastSavedOffset + "] ")
        System.exit(1)
      }
    }
    InternalRedisClient.getPool.returnResource(jedis)
    
    println("lastOffset from redis -> " + lastOffset)
    
    //设置每个分区起始的Offset
    val fromOffsets = Map{new TopicPartition(topic, partition) -> lastOffset}
    
    //使用Direct API 创建Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
    )
    
    //开始处理批次消息
    stream.foreachRDD { 
      rdd => 
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        
        val result = processLogs(rdd)
        println("=============== Total " + result.length + " events in this batch ..")
        
        val jedis = InternalRedisClient.getPool.getResource
        val p1 : Pipeline = jedis.pipelined();
        p1.select(dbDefaultIndex)
        p1.multi() //开启事务
        
        
        //逐条处理消息
        result.foreach { 
          record => 
              //增加小时总pv
              val pv_by_hour_key = "pv_" + record.hour
              p1.incr(pv_by_hour_key)
              
              //增加网站小时pv
              val site_pv_by_hour_key = "site_pv_" + record.site_id + "_" + record.hour
              p1.incr(site_pv_by_hour_key)
              
              //使用set保存当天的uv
              val uv_by_day_key = "uv_" + record.hour.substring(0, 10)
              p1.sadd(uv_by_day_key, record.user_id)
        }
        
        //更新Offset
        offsetRanges.foreach { offsetRange =>
          println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
          val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition
          p1.set(topic_partition_key, offsetRange.untilOffset + "")
        }
        
        p1.exec();//提交事务  
        p1.sync();//关闭pipeline 
        
        InternalRedisClient.getPool.returnResource(jedis)

    }
    
    case class MyRecord(hour: String, user_id: String, site_id: String)
    
    def processLogs(messages: RDD[ConsumerRecord[String, String]]) : Array[MyRecord] = {
      messages.map(_.value()).flatMap(parseLog).collect()
    }
    
    //解析每条日志,生成MyRecord
    def parseLog(line: String): Option[MyRecord] = {
      val ary : Array[String] = line.split("\\|~\\|", -1);
      try {
        val hour = ary(0).substring(0, 13).replace("T", "-")
        val uri = ary(2).split("[=|&]",-1)
        val user_id = uri(1)
        val site_id = uri(3)
        return Some(MyRecord(hour,user_id,site_id))
        
      } catch {
        case ex : Exception => println(ex.getMessage)
      }
      
      return None
      
    }
    
    
    
    
    ssc.start()
    ssc.awaitTermination()
  }
  

}

 

spark streaming

 

spark streaming

如果觉得本博客对您有帮助,请 赞助作者

转载请注明: lxw的大数据田地 » 实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

相关 [实时 计算 spark] 推荐:

实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

- - lxw的大数据田地
本文想记录和表达的东西挺多的,一时想不到什么好的标题,所以就用上面的关键字作为标题了. 在实时流式计算中,最重要的是在任何情况下,消息不重复、不丢失,即Exactly-once. 本文以Kafka–>Spark Streaming–>Redis为例,一方面说明一下如何做到Exactly-once,另一方面说明一下我是如何计算实时去重指标的.

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark:一个高效的分布式计算系统

- - IT技术博客大学习
标签:   Spark   分布式. Spark与Hadoop的对比. Spark的中间数据放到内存中,对于迭代运算效率更高. Spark更适合于迭代运算比较多的ML和DM运算. 因为在Spark里面,有RDD的抽象概念. Spark比Hadoop更通用. Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作.

分布式计算框架-Spark初步理解

- - 互联网 - ITeye博客
    最开始关注Spark,是在csdn首页上看到一篇文件《Spark核心开发者:性能超Hadoop百倍,算法实现仅有其1/10或1/100》的,看着标题确实感觉比较年逼的. 后来稍微研究了一下,其实发现,这个描述有点问题. Spark是一个基于内存的纯计算框架,而hadoop是包括计算框架的mapreduce和分布式存储hdfs,所以应该描述为Spark性能超Hadoop的mapreduce计算性能百倍.

分布式计算系统 Spark 成为 Apache 顶级项目

- - 博客园_新闻
Apache 软件基金会今天宣布,Spark 项目已从孵化器毕业,成为 Apache 软件基金会的一个顶级项目. Spark 是一个高效的分布式计算系统,发源于美国加州大学伯克利分校 AMPLab 的集群计算平台. Spark 被称为“Hadoop 的瑞士军刀”,拥有非凡的速度和易用性. Spark 立足于内存计算,相比 Hadoop MapReduce,Spark 在性能上要高 100 倍,而且 Spark 提供了比 Hadoop 更上层的 API,同样的算法在 Spark 中实现往往只有 Hadoop 的1/10 或者1/100 的长度.

Spark:比Hadoop更强大的分布式数据计算项目

- - 标点符
Spark是一个由加州大学伯克利分校(UC Berkeley AMP)开发的一个分布式数据快速分析项目. 它的核心技术是弹性分布式数据集(Resilient distributed datasets),提供了比Hadoop更加丰富的MapReduce模型,可以快速在内存中对数据集进行多次迭代,来支持复杂的数据挖掘算法和图计算算法.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

Spark的速度快是以丧失计算结果正确性为代价的

- - Changming
但是它不保证它算出的值是对的,哪怕你要做的只是简单的整数累加. Spark最著名的一篇论文是:《Spark: Cluster Computing with Working Sets》. 当你读它的时候你需要明白:文中代码不保证计算结果是正确的. 具体来说,它的Logistic Regression的代码在map阶段用到了accumulator.

Spark概览

- - 简单文本
Spark具有先进的DAG执行引擎,支持cyclic data flow和内存计算. 因此,它的运行速度,在内存中是Hadoop MapReduce的100倍,在磁盘中是10倍. 这样的性能指标,真的让人心动啊. Spark的API更为简单,提供了80个High Level的操作,可以很好地支持并行应用.