Spark Streaming+kafka订单实时统计实现

标签: spark streaming kafka | 发表时间:2017-06-01 04:51 | 作者:a123demi
出处:http://blog.csdn.net
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解。

一.前期准备

开发环境:window7+eclipse+jdk1.7
部署环境:linux+zookeeper+kafka+hadoop+spark
本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客。

二.概念理解

Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafka、Flume、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到HDFS、Databases和Dashboards等。实际上,你可以将流数据应用于Spark的机器学习和图形处理的算法上。

Spark Streaming处理的数据流图
Spark Streaming内部工作原理,其接收实时输入数据流,同时将数据划分成批次,然后通过Spark引擎处理生成按照批次的结果流。

Spark Streaming内部工作原理
Spark Streaming提供了表示连续数据流的、高度抽象的被称为离散流的DStream。DStream本质上表示RDD的序列。任何对DStream的操作都会转变为对底层RDD的操作。 

三.实例需求

通过Spark Streaming+kafka,实时统计订单的订单总数,所有订单价格数。

四.实例实现

4.1 订单实体order

package com.lm.sparkLearning.orderexmaple;

import java.io.Serializable;

/**
 * 简单订单
 * @author liangming.deng
 *
 */
public class Order implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	//订单商品名称
	private String name;
	//订单价格
    private Float price;
    

	public Order() {
		super();
	}
    
	public Order(String name, Float price) {
		super();
		this.name = name;
		this.price = price;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public Float getPrice() {
		return price;
	}
	public void setPrice(Float price) {
		this.price = price;
	}
	@Override
	public String toString() {
		return "Order [name=" + name + ", price=" + price + "]";
	}
    
}


4.2 kafka订单生产者orderProducer

kafka生产者定时发送随机数量订单
package com.lm.sparkLearning.orderexmaple;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.lm.sparkLearning.utils.ConstantUtils;
import com.lm.sparkLearning.utils.RandomUtils;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * 订单 kafka消息生产者
 * 
 * @author liangming.deng
 *
 */
public class OrderProducer {
	private static Logger logger = LoggerFactory.getLogger(OrderProducer.class);

	public static void main(String[] args) throws IOException {
		// set up the producer
		Producer<String, String> producer = null;
		ObjectMapper mapper = new ObjectMapper();

		try {

			Properties props = new Properties();
			// kafka集群
			props.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);

			// 配置value的序列化类
			props.put("serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);
			// 配置key的序列化类
			props.put("key.serializer.class", ConstantUtils.SERIALIZER_CLASS_VALUE);

			ProducerConfig config = new ProducerConfig(props);
			producer = new Producer<String, String>(config);
			// 定义发布消息体
			List<KeyedMessage<String, String>> messages = new ArrayList<>();
			// 每隔3秒生产随机个订单消息
			while (true) {
				int random = RandomUtils.getRandomNum(20);
				if (random == 0) {
					continue;
				}
				messages.clear();
				for (int i = 0; i < random; i++) {
					int orderRandom = RandomUtils.getRandomNum(random * 10);
					Order order = new Order("name" + orderRandom, Float.valueOf("" + orderRandom));
					// 订单消息体:topic和消息
					KeyedMessage<String, String> message = new KeyedMessage<String, String>(
							ConstantUtils.ORDER_TOPIC, mapper.writeValueAsString(order));
					messages.add(message);
				}

				producer.send(messages);
				logger.warn("orderNum:" + random + ",message:" + messages.toString());
				Thread.sleep(10000);

			}

		} catch (Exception e) {
			e.printStackTrace();
			logger.error("-------------:" + e.getStackTrace());
		} finally {
			producer.close();
		}

	}
}


4.3 Spark Streaming+kafka订单实时统计OrderSparkStreaming

package com.lm.sparkLearning.orderexmaple;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.AtomicDouble;
import com.lm.sparkLearning.utils.ConstantUtils;
import com.lm.sparkLearning.utils.SparkUtils;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

/**
 * spark streaming统计订单量和订单总值
 * 
 * @author liangming.deng
 *
 */
public class OrderSparkStreaming {
	private static Logger logger = LoggerFactory.getLogger(OrderSparkStreaming.class);
	private static AtomicLong orderCount = new AtomicLong(0);
	private static AtomicDouble totalPrice = new AtomicDouble(0);

	public static void main(String[] args) {

		// Create context with a 2 seconds batch interval
		JavaStreamingContext jssc = SparkUtils.getJavaStreamingContext("JavaDirectKafkaWordCount",
				"local[2]", null, Durations.seconds(20));

		Set<String> topicsSet = new HashSet<>(Arrays.asList(ConstantUtils.ORDER_TOPIC.split(",")));
		Map<String, String> kafkaParams = new HashMap<>();
		kafkaParams.put("metadata.broker.list", ConstantUtils.METADATA_BROKER_LIST_VALUE);
		kafkaParams.put("auto.offset.reset", ConstantUtils.AUTO_OFFSET_RESET_VALUE);

		// Create direct kafka stream with brokers and topics
		JavaPairInputDStream<String, String> orderMsgStream = KafkaUtils.createDirectStream(jssc,
				String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,
				topicsSet);

		// json与对象映射对象
		final ObjectMapper mapper = new ObjectMapper();
		JavaDStream<Order> orderDStream = orderMsgStream
				.map(new Function<Tuple2<String, String>, Order>() {
					/**
					 * 
					 */
					private static final long serialVersionUID = 1L;

					@Override
					public Order call(Tuple2<String, String> t2) throws Exception {
						Order order = mapper.readValue(t2._2, Order.class);
						return order;
					}
				}).cache();

		// 对DStream中的每一个RDD进行操作
		orderDStream.foreachRDD(new VoidFunction<JavaRDD<Order>>() {
			/**
			 * 
			 */
			private static final long serialVersionUID = 1L;

			@Override
			public void call(JavaRDD<Order> orderJavaRDD) throws Exception {
				long count = orderJavaRDD.count();
				if (count > 0) {
					// 累加订单总数
					orderCount.addAndGet(count);
					// 对RDD中的每一个订单,首先进行一次Map操作,产生一个包含了每笔订单的价格的新的RDD
					// 然后对新的RDD进行一次Reduce操作,计算出这个RDD中所有订单的价格众合
					Float sumPrice = orderJavaRDD.map(new Function<Order, Float>() {
						/**
						 * 
						 */
						private static final long serialVersionUID = 1L;

						@Override
						public Float call(Order order) throws Exception {
							return order.getPrice();
						}
					}).reduce(new Function2<Float, Float, Float>() {
						/**
						 * 
						 */
						private static final long serialVersionUID = 1L;

						@Override
						public Float call(Float a, Float b) throws Exception {
							return a + b;
						}
					});
					// 然后把本次RDD中所有订单的价格总和累加到之前所有订单的价格总和中。
					totalPrice.getAndAdd(sumPrice);

					// 数据订单总数和价格总和,生产环境中可以写入数据库
					logger.warn("-------Total order count : " + orderCount.get()
							+ " with total price : " + totalPrice.get());
				}
			}
		});
		orderDStream.print();

		jssc.start(); // Start the computation
		jssc.awaitTermination(); // Wait for the computation to terminate
	}
}


4.4 实例实时结果




OrderProducer消息生产者

OrderSparkStreaming实时计算

五 代码地址


作者:a123demi 发表于2017/6/1 8:50:29 原文链接
阅读:48 评论:0 查看评论

相关 [spark streaming kafka] 推荐:

Kafka+Spark Streaming+Redis实时计算整合实践

- - 简单之美
基于Spark通用计算平台,可以很好地扩展各种计算类型的应用,尤其是Spark提供了内建的计算库支持,像Spark Streaming、Spark SQL、MLlib、GraphX,这些内建库都提供了高级抽象,可以用非常简洁的代码实现复杂的计算逻辑、这也得益于Scala编程语言的简洁性. 这里,我们基于1.3.0版本的Spark搭建了计算平台,实现基于Spark Streaming的实时计算.

Spark Streaming+kafka订单实时统计实现

- - CSDN博客推荐文章
前几篇文章我们分别学习Spark RDD和PairRDD编程,本文小编将通过简单实例来加深对RDD的理解. 开发环境:window7+eclipse+jdk1.7. 部署环境:linux+zookeeper+kafka+hadoop+spark. 本实例开发之前,默认已搭好了开发环境和部署环境,如果未搭建,可以参考本人相关大数据开发搭建博客.

Spark Streaming vs. Kafka Stream 哪个更适合你

- - IT瘾-bigdata
作者:Mahesh Chand Kandpal. 译者注:本文介绍了两大常用的流式处理框架,Spark Streaming和Kafka Stream,并对他们各自的特点做了详细说明,以帮助读者在不同的场景下对框架进行选择. 流式处理的需求每天都在增加,仅仅对大量的数据进行处理是不够的. 数据必须快速地得到处理,以便企业能够实时地对不断变化的业务环境做出反应.

Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统

- -
Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统. 2015 年 7 月 27 日发布. 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.

[原]Spark Streaming原理简析

- - 张包峰的博客
StreamingContext实例化的时候,需要传入一个 SparkContext,然后指定要连接的 spark matser url,即连接一个 spark engine,用于获得executor. 实例化之后,首先,要指定一个接收数据的方式,如. 这样从socket接收文本数据. 这个步骤返回的是一个 ReceiverInputDStream的实现,内含 Receiver,可接收数据并转化为RDD放内存里.

Spark Streaming 自定义接收器

- - zzm
Spark Streaming可以从任意数据源接受流数据,而不仅仅是那些内置支持的数据源(如Flume、kafka等). 这就要求开发人员实现一个接收器(recevier),用于接收来自有关数据源的数据. 本篇手册以一个自定义的接收器(recevier)实现和其在spark streaming中的应为为主线进行讲解.

Spark整合Kafka小项目

- - 互联网 - ITeye博客
SparkStreaming与kafka整合小项目实践含所有代码带详细注释. 总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从kafka消费日志,并流式处理将结果发送到kafka另一个topic,Java后台从kafka消费日志分析结果,实现秒级大数据实时分析展示.

Spark Streaming + Elasticsearch构建App异常监控平台

- - 美团点评技术团队
本文已发表在《程序员》杂志2016年10月期. 如果在使用App时遇到闪退,你可能会选择卸载App、到应用商店怒斥开发者等方式来表达不满. 但开发者也同样感到头疼,因为崩溃可能意味着用户流失、营收下滑. 为了降低崩溃率,进而提升App质量,App开发团队需要实时地监控App异常. 一旦发现严重问题,及时进行热修复,从而把损失降到最低.

实用 | 从Apache Kafka到Apache Spark安全读取数据

- - IT瘾-bigdata
随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要. 本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两个关键组件进行了说明. Cloudera Distribution of Apache Kafka 2.0.0版本(基于Apache Kafka 0.9.0)引入了一种新型的Kafka消费者API,可以允许消费者从安全的Kafka集群中读取数据.

Hadoop Streaming 编程

- - 学着站在巨人的肩膀上
Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如:. 采用shell脚本语言中的一些命令作为mapper和reducer(cat作为mapper,wc作为reducer). 本文安排如下,第二节介绍Hadoop Streaming的原理,第三节介绍Hadoop Streaming的使用方法,第四节介绍Hadoop Streaming的程序编写方法,在这一节中,用C++、C、shell脚本 和python实现了WordCount作业,第五节总结了常见的问题.