大数据流处理框架介绍
实时流处理简单概述:实时是说整个流处理相应时间较短,流式计算是说数据是源源不断的,没有尽头的。实时流处理一般是将业务系统产生的数据进行实时收集,交由流处理框架进行数据清洗,统计,入库,并可以通过可视化的方式对统计结果进行实时的展示。本文涉及到的框架或技术有 Flume,Logstash,kafka,Storm, SparkStreaming等。
实时流处理的的流程与技术选型 :
一、日志收集
由于业务系统一般是游离与流处理集群如SparkStreaming、Storm之外的,所以我们需要对业务系统的数据进行实时收集。这就用到了日志收集框架,日志收集框架主要需要解决三个问题:数据从哪儿来,数据到哪儿去,实时收集。因为在流处理中为了防止突发或激增流量压垮流处理集群,通常将收集过后的数据输出到kafka分布式消息系统,然后流处理集群去消费kafka中的数据,下面介绍两种常用的日志收集框架以及他们如何对接kafka.
1).Apache Flume
这是一个apache的顶级项目,所以域名为flume.apache.org, 下面是官网上的原理图,Flume框架把每个收集任务都定义为一个Agent(这是一个JAVA进程),他有三个基本组件Source、Channel、Sink。
source:收集数据,可以对接各种常用数据源,如文件(exec source),kafka(kafka source),jms(java消息系统)等。
channel:source组件把数据收集来以后,临时存放在channel(管道)中,即channel组件在agent中是专门用来存放临时数据的,并起到数据缓冲的作用。常用的channel有memory chanel 、jdbc chanel 、file channel 等等。
sink:sink组件是用于从channel中取数据并送到目的地的组件,目的地包括hdfs、logger、avro、thrift、file、hbase等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
其实flume的使用就是编写配置文件,下面是使用flume将Nginx的日志对接kafka的配置文件,我们将该收集任务命名为
exec-memory-kafka,只需如下编写:
#配置source、sink、channel
exec-memory-kafka.sources = exec-source #指定source (数据从哪儿来),可以指定多个数据源,用逗号分隔。
exec-memory-kafka.sinks = kafka-sink #指定sink(数据到哪儿去)
exec-memory-kafka.channels = memory-channel #指定channel
#source详细配置
exec-memory-kafka.sources.exec-source.type = exec 执行操作系统命令
exec-memory-kafka.sources.exec-source.command = sudo tail -F /var/log/nginx/access.log #监控Nginx日志文件
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c #shell命令的前缀
#channel 详细配置
exec-memory-kafka.channels.memory-channel.type = memory #内存channel
#sink详细配置
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink #类型 为kafka sink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092 #kafaka服务的地址,多个用逗号分隔
exec-memory-kafka.sinks.kafka-sink.topic = test1 #指定主题
exec-memory-kafka.sinks.kafka-sink.batchSize = 5 #指定每多少条收集一次,这里是每5条发送一次。
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1 #使kafka对是否收到数据进行确认,确保数据不会丢失
#为sink和source指定channel
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
编写好配置文件后切换到flume的bin目录下执行:
flume-ng agent --conf 配置文件的目录–conf-file 配置文件的全路径–name exec-memory-kafka -Dflume.root.logger=INFO,console
即可开启收集任务(进程的方式)
2).ELK技术栈的Logstash
Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。Logstash使用时也是编写配置文件,下面是如何使用配置文件的方式将Nginx日志输出到Kafka。
1
#定义数据源
input{
#这里是Nginx日志文件
file{
path =>"/var/log/nginx/access.log"
}
}
#数据发到哪,这里是kafka
output{
kafka{
topic_id => “test1” #指定topic
codec=>plain{
format=>"%{message}" #输出的格式,这里表示只输出消息,不输出其他信息,如版本信息等。
}
bootstrap_servers=>“hadoop000:9092” #kafka服务的地址
batch_size=>1 #每几条数据发送一次
}
}
切换到logstash的bin目录,执行以下命令即可开始收集任务:
logstash -f 你的配置文件的位置。
二、kafka
kafka是一个分布式的流处理平台,在流处理中,我们通常使用他作为一个消息系统来使用,他是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。
kafka作为消息系统时相比其他消息中间件主要有4大优势:
可扩展性:可以通过增加broker的方式水平扩展kafka集群
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:最大限度的容灾,允许集群中节点失败,包括主节点,是高可用的。
高并发
几个重要的角色:
Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群,一台机器可以启动多个broker在不同的端口上。
Topic:消息系统中的主题,生产者和消费者共同关注的部分。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
Segment:partition物理上由多个segment组成,每个Segment存着message信息,以文件的形式
Producer :生产者, 生产message发送到topic
Consumer : 消费者,订阅topic消费message, consumer作为一个线程来消费
具体的使用可参照官网,有详细的介绍:
http://kafka.apache.org/quickstart
三、流处理框架
日志信息输出到kafka后,需要使用流处理框架作为消费者去消费kafka中的数据,下面是Storm和Spark的基本原理及其如何使用。
1
1 .Storm
apache的顶级项目,官网是storm.apache.org ,他是一个免费的,开源的,分布式的实时计算系统。
Storm有很多用处:如实时计算分析,在线机器学习,分布式RPC即DRPC,作为ETL工具等,
Storm特点:处理速度快、可扩展 、容灾与高可用的,能够实现高频数据和大规模数据的实时处理。
Storm中几个核心的概念:
Topologies:拓扑,将整个流处理流程串起来,每个storm应用程序都需要定义Toplogies,由spout和bolt组成的。
Streams:消息流,抽象概念,由没有边界的Tuple构成
Spouts:消息流的源头,Topology的消息生产者。产生数据的组件,比如我们要对接kafka,我们就要定义一个kafka Spout
1
2
3
4
5
Bolts:消息处理单元,可以做过滤、聚合、查询/写数据库等操作。
Tuple:具体的数据,传递的基本单元。
1
Storm架构:
类似于Hadoop的架构,也是主从架构(Master/Slave),所有节点都是无状态的,在他们上面的信息(元数据)会存储在zookeeper中
Nimbus: 集群的主节点,负责任务(task)的指派和分发、资源的分配
1
Supervisor: 从节点,可以启动多个Worker,可以通过配置来指定一个Topo运行在多个Worker之上,也可以通过配置来指定集群的从节点(负责干活的),Supervisor节点负责执行任务的具体部分,启动和停止自己管理的Worker进程等,一个Supervisor默认启动4个Worker进程
Worker: 运行具体组件逻辑(Spout/Bolt)的进程,这是一个进程,一个Work进程只为一个Topology服务。
Task: Worker中每一个Spout和Bolt的线程称为一个Task,他是最终运行spout或者bolt代码的最小执行单元
executor:是一个被worker进程启动的单独线程,Spout和bolt和共享一个executor,而且一个executor可以运行多个Task。
下面是各个组件职责的示意图:
1
编码时几个核心的角色:
1). ISpout:核心接口(interface),负责将数据发送到topology中去处理,Storm会跟踪Spout发出去的tuple的,通过ack/fail机制,对Spout发送成功或失败时做处理,没条数据即Tuple都有自己的message id,而且ack/fail/nextTuple是在同一个线程中执行的,所以不用考虑线程安全方面。
核心方法
open: 初始化操作
1
2
3
4
5
close: 资源释放操作
nextTuple: 发送数据
ack: tuple处理成功,storm会反馈给spout一个成功消息
fail:tuple处理失败,storm会发送一个消息给spout,处理失败
实现类:
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public interface IRichSpout extends ISpout, IComponent {}
我们定义Spout时只需要继承BaseRichSpout这个类,并实现其中的方法即可。
1
2).IComponent接口
概述:public interface IComponent extends Serializable
他为topology中所有可能的组件提供公用的方法
如 void declareOutputFields(OutputFieldsDeclarer declarer);
此方法用于声明当前Spout/Bolt发送的tuple的名称,使用OutputFieldsDeclarer配合使用
实现类:
public abstract class BaseComponent implements IComponent
IBolt接口:
概述职责:接收tuple处理,并进行相应的处理(filter/join/…),IBolt会在一个运行的机器上创建,使用Java序列化它,然后提交到主节点(nimbus)上去执行,nimbus会启动worker来反序列化,调用prepare方法,然后才开始处理tuple处理
方法:
prepare:初始化
execute:处理一个tuple数据,tuple对象中包含了元数据信息
cleanup:shutdown之前的资源清理操作
实现类:
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public interface IRichBolt extends IBolt, IComponent
RichShellBolt
我们定义Bolt时只需继承BaseRichBolt并实现其中的方法即可。
1
以下是Storm对kafka的消息进行实时打印的代码实现。Storm官网有许多对接主流框架的介绍,引入所需jar包,就可以使用写好的KafkaSpout,而无需自己定义KafkaSpout类了。
org.apache.storm storm-kafka ${storm.version}
public class StormKafkaTopology {
public static class LogBolt extends BaseRichBolt {
private OutputCollector outputCollector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
public void execute(Tuple tuple) {
try {
byte[] bytes = tuple.getBinaryByField("bytes");
String value = new String(bytes);
System.out.println("value :" + value);
this.outputCollector.ack(tuple);
} catch (Exception e) {
this.outputCollector.fail(tuple);
}
}
//无后续bolt,无需声明
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
//kafka的配置
String topicName = "project_topic";
BrokerHosts hosts = new ZkHosts("hadoop000:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
//从上次收集的位置开始,而不是从头开始
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
//创建kafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("KafkaSpout", kafkaSpout);
//设置Bolt
builder.setBolt("LogBolt", new LogBolt()).shuffleGrouping("KafkaSpout");
//本地运行Storm任务
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("StormKafkaTopology", new Config(), builder.createTopology());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
}}
2.SparkStreaming
官网上的介绍如下:
Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
1
即:Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可以使用Spark的其他子框架,如集群学习、图计算等,对流数据进行处理。
Spark严格意义上来说并不能算实时流处理,他粗粒度的工作原理为:将实时接收的数据,根据一定的时间间隔拆成一批批的数据,具体来说是一批批RDD(分布式弹性数据集,Spark中的核心概念),然后通过SparkEngine来处理这些数据,可能是一些transformation和action操作,最后得到一批批的处理结果。
Strom和SparkStreaming的对比:
1).Strom是真正意义上的的流处理,时延相比SparkStreaming较低,而SparkStremming是将接受的实时流数据,按照指定的时间间隔拆成一个个RDD,在每个RDD中以批处理的形式处理数据。本质上还是批处理。
2).Storm会通过messageId的方式全局追踪和记录每一条记录,并通过ack/fail机制确保每条数据至少被处理一次(也可能是多次),而SparkStream应用程序只需要批处理级别对记录进行追踪,他能保证每个批处理记录仅仅被处理一次。
3).由于SparkStreming是运行在Spark平台上的无需单独安装,可以和批处理SparkSql,机器学习等其他其框架结合起来使用。
下面使用scala语言将SparkStreming对接kafka并对图书点击量进行实时统计的应用代码:将kafka中收集到的日志进行清洗,并转换成ClikcLog对象,并实时统计的结果转化成BookClick对象并写入Hbase,Nginx日志结构如下:
1
192.168.126.1 - - [2017-12-02 19:20:28] “GET /books/1 HTTP/1.1” 200 2403 “-” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36” “-”
object BookCount {
def main(args: Array[String]): Unit = {
//以参数的形式运行SparkStreming应用程序 四个参数为zk地址 ,用户组, 主题,线程数
if (args.length != 4) {
System.err.println("Usage: KafkaReceiverWordCount ")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf()
//构造StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Spark Streaming对接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
val logs = messages.map(_._2)
// 192.168.126.1 - - [2017-12-02 19:20:28] "GET /books/1 HTTP/1.1" 200 2403 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" "-"
// 0 1 2 3 4 5 6 7 8
val cleanData = logs.map(line => {
val infos = line.split(" ")
val url = infos(6)
var bookId = 0
val time = infos(3).substring(1) + " " + infos(4).substring(0, 7)
if (url.startsWith("/books/")) {//只关注以books/开头的请求
bookId = url.split("/")(2).toInt
}
ClickLog(infos(0), TimeUtil.newTime(time), bookId, infos(8).toInt)
}).filter(clickLog => clickLog.bookId != 0)//为零表示不满足要求,忽略。
//cleanData.print()
cleanData.map(x => {
(x.time.substring(0, 8) + "_" + x.bookId, 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(record => {
val list= new ListBuffer[BookClick]
record.foreach(pair => {
list.append(BookClick(pair._1,pair._2))
})
BookClickDao.put(list)
})
})
ssc.start()
ssc.awaitTermination()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
}
}
case class ClickLog(ip:String,time:String,bookId:Int,statusCode:Int)
case class BookClick(day_id:String,click_count:Int)
object BookClickDao {
val tableName = “book_clickcount”
val cf = “info”
val colume = “click_count”
def put(list: ListBuffer[BookClick]): Unit = {
val table = HbaseUtils.getInstance().getTable(tableName)
for (ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_id), Bytes.toBytes(cf), Bytes.toBytes(colume), ele.click_count)
}
}
def get(day_id: String): Long = {
val table = HbaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_id))
val value = table.get(get).getValue(cf.getBytes, colume.getBytes)
if (value == null)
0l
else
Bytes.toLong(value)
}
}
object TimeUtil {
val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance(“yyyy-MM-dd HH:mm:ss”)
val TARGET_TIME = FastDateFormat.getInstance(“yyyyMMddHHmmss”)
def passTime(time: String)={
YYYYMMDDHHMMSS_FORMAT.parse(time)
}
def newTime(time:String)={
TARGET_TIME.format(passTime(time))
}
def main(args: Array[String]): Unit = {
println(newTime(“2017-12-02 19:13:25”))
}
}
因为流处理框架本身不具备存储能力,最后需要将统计结果入库,并可通过百度的Echart或者阿里的DataV等数据可视化工具,定义sql和时间间隔,对统计结果进行实时的展示。