Spark整合Kafka小项目
SparkStreaming与kafka整合小项目实践含所有代码带详细注释
总流程:自制日志生成器生成含数据日志,使用kafkaAppender直接发送到kafka,SparkStreaming从kafka消费日志,并流式处理将结果发送到kafka另一个topic,Java后台从kafka消费日志分析结果,实现秒级大数据实时分析展示。
版本
kafka_2.11-0.11.0.1
spark-2.1.1-bin-hadoop2.7
scala-2.11.11
Jdk-1.8
Spark使用Intelij Idea
其余使用eclipse
第一步
日志生成器输出日志到kafka
重点jar包:
kafka-log4j-appender-0.11.0.1.jar //日志使用
kafka_2.11-0.11.0.1.jar //如果报错就加上吧
kafka-clients-0.11.0.1.jar //如果报错就加上吧
slf4j-api-1.7.25.jar //日志框架也可以用其他的
slf4j-log4j12-1.7.25.jar
配置文件内容及注意事项
文件名:log4j.properties
文件内容:
log4j.rootLogger=DEBUG,stdout,KAFKA
//appender Console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l (message:%m)%n
## appender KAFKA
log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.KAFKA.topic=log-topic
log4j.appender.KAFKA.brokerList=master:9090
log4j.appender.KAFKA.compressionType=none
log4j.appender.KAFKA.syncSend=true
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %5p %x-%t %l (message:%m)
文件名:my.properties
#time interval of every times,unit is ms,default 100ms timeinterval=1000 #the count of log every times,default 1000 frequency=298 #runningtime unit is ms,default 60000ms runtime=6000000
代码解析:
LogWriterExcutor.java
import org.apache.log4j.Logger;
class LogWriterExcutor implements Runnable{
Logger logger = Logger.getLogger(this.getClass().getName());
private String []message;
public LogWriterExcutor(String []message){
this.message = message;
}
@Override
public void run() {
// TODO Auto-generated method stub
for(String e : message)
logger.info(e);
}
}
LogCreater.java
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
class LogCreater extends Constant{
Logger logger = Logger.getLogger(this.getClass().getName());
ExecutorService executor = null;
private int timeinterval = TIME_INTERVAL; //间隔多久发送一批日志,单位毫秒
private int frequency = FREQUENCY; //每一批发送发送多少条数据,单位条
private int sumOfChinese = SUM_CHINESE; //自定义中文字集元素个数
private int runtime = RUNTIME; //程序运行总时间
private long startTime = 0;
private long endTime = 0;
private long logCount = 0; //日志已发条数
private boolean stop = true;
LogCreater(){
init();
}
public void init(){
Properties properties = new Properties();
FileInputStream in;
try {
in = new FileInputStream("src\\source\\my.properties");
properties.load(in);
timeinterval = Integer.parseInt((String)properties.get("timeinterval"));
frequency =Integer.parseInt((String)properties.get("frequency"));
runtime =Integer.parseInt((String)properties.get("runtime"));
} catch (IOException e) {
logger.error("配置文件读取失败");
e.printStackTrace();
}
executor = Executors.newCachedThreadPool();
startTime = System.currentTimeMillis();
printHint();
}
public void startCreate() {
System.out.println("正在生成日志.....");
if(executor == null){
logger.error("线程池获取失败,日志生成器执行失败。执行结束");
return;
}
while(stop){
String []messages = getMessages(frequency);
create(messages);
try {
Thread.sleep(timeinterval);
} catch (InterruptedException e) {
logger.error("线程睡眠执行出错");
e.printStackTrace();
}
endTime = System.currentTimeMillis();
if((endTime-startTime)>runtime)
stop = false;
}
System.out.println("共生成 "+logCount+" 条日志。");
}
private void create(String []messages) {
executor.execute(new Thread(new LogWriterExcutor(messages)));
logCount += messages.length;
}
private String[] getMessages(Integer frequency) {
Random rand = new Random();
String []massages = new String[frequency];
for(int i=0;i<frequency;i++){
massages[i] = REGRET[rand.nextInt(sumOfChinese)];
}
return massages;
}
private void printHint(){
System.out.println("每次时间间隔\t"+timeinterval+"ms");
System.out.println("每次日志数量\t"+frequency+"条/次");
System.out.println("预计运行时间\t"+runtime/1000+"s");
}
}
Constant .java
public class Constant {
/*
* 这个文件中存放的全部是常量
*/
/*
* 日志生成器隔多少时间写一批日志,默认值
*/
public static Integer TIME_INTERVAL = 100;
/*
* 日志生成器每一批次生成多少条日志,默认值
*/
public static Integer FREQUENCY = 100;
/*
* 运行时间,默认一分钟,默认值
*/
public static Integer RUNTIME = 60000;
/*
* 298个中文字,来自楚辞《惜誓》
*/
public static String[]REGRET = {"一","言","老","调","清","者","舆","昆","合","渊","下","而","同","不","明","与",
"昏","谏","小","騑","少","我","气","谔","世","或","尚","丝","鸟","逢","瀣","中","是","鸱","就","水","临","制",
"举","砾","鸾","所","乃","鹄","久","居","陆","之","虎","乎","乐","虑","乔","虖","剖","遗","虚","聚","江","吸",
"瑟","象","乡","衡","周","息","虯","衰","驰","山","驱","乱","干","年","并","恶","穷","偷","顺","登","白","幽",
"驾","岁","蚁","节","梅","沆","皆","皇","骋","二","于","隐","源","麒","骖","骛","墟","功","麟","纡","纫","被",
"身","犬","躯","悲","河","蚴","犹","人","难","裁","仁","狂","黄","集","哉","背","苍","从","风","仑","黑","盖",
"高","飙","仙","四","盛","惜","飞","回","苟","因","以","拥","苦","独","竭","曲","直","相","建","固","国","攀",
"异","儃","处","茅","月","夏","霑","休","众","北","圜","生","索","謣","圣","贤","伤","大","在","用","木","天",
"眩","太","夫","伯","地","朱","失","贵","然","贼","放","愿","流","权","充","故","商","均","先","浊","子","何",
"余","神","非","止","赤","此","来","车","革","兮","佯","数","女","杳","海","睹","蝼","彼","载","松","使","长",
"极","羁","如","概","历","玉","涉","冉","枉","羊","王","後","厌","再","美","箕","得","龙","原","龟","审","醢",
"群","冥","推","循","讬","枭","况","德","容","方","澹","离","去","旁","见","观","係","心","寄","又","反","重",
"野","藏","量","发","翔","比","俗","志","诚","进","远","川","察","忠","无","濡","矣","凤","日","知","左","自",
"矫","可","称","翱","深","已","右","至","石","念","时","迻","忽","寿","丹","根","为","尽",};
/*
* 中文字个数,用作随机数范围使用
*/
public static Integer SUM_CHINESE = 100;
}
MyUtil.java
import java.util.Random;
public class MyUtil {
public static int[] getRand(int n,int range){
Random ran = new Random();
int []arr = new int[n];
while(n-->0){
arr[n] = ran.nextInt(range);
}
return arr;
}
}
Demo.java
/*
* 日志生成器
*/
public class Demo{
public static void main(String[] args){
new LogCreater().startCreate();
System.exit(0);
}
}
目录结构:就普通java project,
第二步
创建kafka topic
安装跳过
配置%KAFKA_HOME%conf/server.properties:
网上教程很多,此处不再赘述
启动kafka
kafka-server-start.sh config/server.properties &
创建topic:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --replication-factor 1 --partitions 1 --topic log-topic
查看topic:
kafka-topics.sh --describe --zookeeper master:2181 --topic log-topic
创建控制台消费者:
kafka-console-consumer.sh --bootstrap-server master:9090 --from-beginning --topic log-topic
启动顺序:
1.启动kafka Server,2.创建topic,3.查看创建的topic(可选),4.创建控制台消费者,5.启动日志生成器程序。
注意事项:在启动控制台消费者的终端会将接收的日志打印出来,命令最后面加上 & 符号可将进程调至后台运行。关闭消费者使用Ctrl+c
第三步
spark消费kafka的日志
重点jar包:
kafka_2.11-0.11.0.1.jar
kafka-clients-0.11.0.1.jar
spark-streaming-kafka_2.11-1.6.3.jar
Spark所有自带jar包
Scala的SDK
报异常:
如果运行报java.lang.NoClassDefFoundError: org/apache/spark/Logging
这个Logging截止存在于spark-core_2.11-1.5.2中。
2.1.1版本saprk无此class文件,被org.apache.spark.internal.Logging取代。
解决办法
把1.5.2版本里面的这个class提出来单独用java -xvf new_name.jar class_dir 打包成一个jar包,然后当做常规jar工具包使用
过程解析:
Spark创建Receiver从kafka消费日志数据。
代码解析:Kafka.scala
import java.util.Properties
import java.util.logging.{Level, Logger}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
//import com.trigl.spark.util.{DataUtil, LauncherMultipleTextOutputFormat}
import org.apache.spark.Logging
object Kafka extends Logging{
private var producer: KafkaProducer[String, String] = _
private var props : Properties = _
def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARNING)
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkConf = new SparkConf().setAppName("LauncherStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
/*
provider的参数
*/
val brokerAddress = "master:9090"
val topic = "pro-topic"
props = new Properties()
props.put("bootstrap.servers", brokerAddress)
props.put("value.serializer", classOf[StringSerializer].getName)
// Key serializer is required.
props.put("key.serializer", classOf[StringSerializer].getName)
// wait for all in-sync replicas to ack sends
props.put("acks", "all")
//创建kafka生产者,后面可以直接使用它发送数据
producer = new KafkaProducer[String, String](props)
if(producer == null) {
println("producer为空")
ssc.stop()
}
/*
*消费者参数
*/
val zkQuorum = "master:2181,slave1:2181,slave2:2181"
//这个group本来是随意创建,但是不能与已存在的重复,否在接收不到数据。每次运行请务必修改,或者做成参数,这个问题我尚未解决,但不影响流程///测试
val group = "log-group21"
val topicMap = Map[String, Int]("log-topic" -> 1)
//创建kafka消费者,如果不使用窗口将每隔【StreamingContext第二个参数定义时间】创建一个rdd
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
kafkaStream.window(Seconds(12),Seconds(6)).foreachRDD((rdd: RDD[String], time: Time) => {
//使用窗口每隔6秒钟处理一次前12秒区段的数据,此处6秒钟位置所在参数必须为StreamingContext(),第二个参数的倍数
//这12秒时间区段的数据全在这一个rdd里面,直接迭代计算wordcount,将最终生成的数据发送到kafka另一个topic
val re = rdd.flatMap(t => t.reverse.charAt(1).toString).map(m => (m,1L)).reduceByKey(_+_)
val a = re.collect().toMap
producer.send(new ProducerRecord[String, String](topic, a.mkString(",")))
})
/*
//这个可以用
kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {
//下面这个可以用,直接转发
//rdd.collect().foreach(t => producer.send(new ProducerRecord[String, String](topic, t)))
//下面这个可以用,微处理然后发送
rdd.collect().foreach(t =>{
println("正在发送: "+t)
var s = t.reverse.charAt(1).toString //提取前面夹杂在日志中的一个汉字
producer.send(new ProducerRecord[String, String](topic, s))
})
})
*/
ssc.start()
// 等待实时流
ssc.awaitTermination()
//这条语句建议写上。
producer.close()
println("它发生了")
}
运行命令及注意事项
spark-submit --master spark://master:7077 --class streaming.Kafka libra.jar
如果缺包可以用--jars或者其他参数加上
特别注意:
每次运行请修改scala消费者的group消费组名,否则会接收不到数据,这个问题我还没解决
第四步
spark生成处理结果发送给kafka
jar包:
与第三步一样
创建新的topic:
创建命令请看第二步,新的topic请配置到spark的Producer中
,创建控制台消费者
第五步
Java后台消费kafka日志
重点ar包:
kafka-clients-0.11.0.1.jar
kafka_2.11-0.11.0.1.jar
slf4j-api-1.7.25.jar
slf4j-log4j12-1.7.25.jar
log4j-1.2.17.jar
普通Java工程
代码解析:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
import java.util.Properties;
public class Consumer{
//0.11.0.0版本后使用KafkaConsumer,,版本0.11.0.0之前使用ConsumerConnector
private final KafkaConsumer<Integer, String> consumer;
private String topic;
public Consumer(String topic) {
Properties props = new Properties();
//KafkaProperties是自定义接口文件,用于存放静态参数
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
//这里消费组名貌似也有不能重复的嫌疑,每次运行建议修改一下
props.put(ConsumerConfig.GROUP_ID_CONFIG, "log-group101");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
this.topic = topic;
}
public void doWork() {
//设置topic
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords<Integer, String> records = null;
//循环消费数据,每次请求都会把还没消费过的数据全部请求回来
while(true) {
//这里7秒是每次请求数据的最大等待时间,因为前面spark设置的6秒处理一次,这里用6秒,kafka中转可能延迟
records = consumer.poll(7000);
System.out.println("===========================");
System.out.println("接收数据条数:"+records.count());
for (ConsumerRecord<Integer, String> record : records) {
System.out.println(record.value()+"=="+ record.offset());
}
System.out.println("===========================");
}
}
}
已有 0 人发表留言,猛击->> 这里<<-参与讨论
ITeye推荐