Kafka 是如何保证数据可靠性和一致性

数据可靠性

Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从 Producter 往 Broker 发送消息、Topic 分区副本以及 Leader 选举几个角度介绍数据的可靠性。

Topic 分区副本

在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3。

Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

Producer 往 Broker 发送消息

如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:

acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka 。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。

acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 Leader,但在消息被复制到 follower 副本之前 Leader发生崩溃。

acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。

根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。
另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。

Leader 选举

在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的,具体可以参见《图文了解 Kafka 的副本复制机制》。只有 ISR 里的成员才有被选为 leader 的可能。

所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:

producer 级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync
topic 级别:设置 replication.factor>=3,并且 min.insync.replicas>=2;
broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;

数据一致性

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

Kafka 生产者分区策略

1)分区的原因
1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic
又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以 Partition 为单位读写了。
2)分区的原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition
数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。

消费者分区分配策略

Range 范围分区(默认的)

假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个
最后分配结果如下

C1:0,1,2,3
C2:4,5,6
C3:7,8,9

如果有11个分区将会是:

C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)

在这种情况下,C1多消费了两个分区

RoundRobin 轮询分区

把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer

假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)

有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)

分区将会按照一定的顺序排列起来,消费者将会组成一个环状的结构,然后开始轮询。
P0-0分配给C0
P0-1分配给C1但是C1并没订阅T0,于是跳过C1把P0-1分配给C2,
P0-2分配给C0
P1-0分配给C1,
P1-1分配给C0,
P2-0分配给C1,
P2-1分配给C2,
P2-2分配给C1,
p2-3分配给C2

C0: P0-0,P0-2,P1-1
C1:P1-0,P2-0,P2-2
C2:P0-1,P2-1,P2-3

Python数据质量检测

1. 重复值检查

1.1 什么是重复值
重复值的检查首先要明确一点,即重复值的定义。对于一份二维表形式的数据集来说,什么是重复值?主要有两个层次:
① 关键字段出现重复记录,比如主索引字段出现重复;
② 所有字段出现重复记录。
第一个层次是否是重复,必须从这份数据的业务含义进行确定。比如一张表,从业务上讲,一个用户应该只会有一条记录,那么如果某个用户出现了超过一条的记录,那么这就是重复值。第二个层次,就一定是重复值了。
1.2 重复值产生的原因
重复值的产生主要有两个原因,一是上游源数据造成的,二是数据准备脚本中的数据关联造成的。从数据准备角度来看,首先检查数据准备的脚本,判断使用的源表是否有重复记录,同时检查关联语句的正确性和严谨性,比如关联条件是否合理、是否有限定数据周期等等。
比如:检查源表数据是否重复的SQL:

1
2
3
SELECT MON_ID,COUNT(*),COUNT(DISTINCT USER_ID)
FROM TABLE_NAME
GROUP BY MON_ID;

如果是上游源数据出现重复,那么应该及时反映给上游进行修正;如果是脚本关联造成的,修改脚本,重新生成数据即可。
还有一份情况,这份数据集是一份单独的数据集,并不是在数据仓库中开发得到的数据,既没有上游源数据,也不存在生成数据的脚本,比如公开数据集,那么如何处理其中的重复值?一般的处理方式就是直接删除重复值。

1
2
3
4
5
6
7
8
import pandas as pd
dataset = pd.read_excel("/labcenter/python/dataset.xlsx")
#判断重复数据
dataset.duplicated() #全部字段重复是重复数据
dataset.duplicated(['col2']) #col2字段重复是重复数据
#删除重复数据
dataset.drop_duplicates() #全部字段重复是重复数据
dataset.drop_duplicates(['col2']) #col2字段重复是重复数据

2. 缺失值检查

缺失值主要是指数据集中部分记录存在部分字段的信息缺失。

2.1 缺失值出现的原因
出现趋势值主要有三种原因:
① 上游源系统因为技术或者成本原因无法完全获取到这一信息,比如对用户手机APP上网记录的解析;
② 从业务上讲,这一信息本来就不存在,比如一个学生的收入,一个未婚者的配偶姓名;
③ 数据准备脚本开发中的错误造成的。
第一种原因,短期内无法解决;第二种原因,数据的缺失并不是错误,无法避免;第三种原因,则只需通过查证修改脚本即可。
缺失值的存在既代表了某一部分信息的丢失,也影响了挖掘分析结论的可靠性与稳定性,因此,必须对缺失值进行处理。
如果缺失值记录数超过了全部记录数的50%,则应该从数据集中直接剔除掉该字段,尝试从业务上寻找替代字段;
如果缺失值记录数没有超过50%,则应该首先看这个字段在业务上是否有替代字段,如果有,则直接剔除掉该字段,如果没有,则必须对其进行处理。

1
2
3
4
5
6
7
##查看哪些字段有缺失值   
dataset.isnull().any() #获取含有NaN的字段
##统计各字段的缺失值个数
dataset.isnull().apply(pd.value_counts)
##删除含有缺失值的字段
nan_col = dataset.isnull().any()
dataset.drop(nan_col[nan_col].index,axis=1)

2.2 缺失值的处理
缺失值的处理主要有两种方式:过滤和填充。

(1)缺失值的过滤
直接删除含有缺失值的记录,总体上会影响样本个数,如果删除样本过多或者数据集本来就是小数据集时,这种方式并不建议采用。

1
2
##删除含有缺失值的记录
dataset.dropna()

(2)缺失值的填充
缺失值的填充主要三种方法:
① 方法一:使用特定值填充
使用缺失值字段的平均值、中位数、众数等统计量填充。
优点:简单、快速
缺点:容易产生数据倾斜
② 方法二:使用算法预测填充
将缺失值字段作为因变量,将没有缺失值字段作为自变量,使用决策树、随机森林、KNN、回归等预测算法进行缺失值的预测,用预测结果进行填充。
优点:相对精确
缺点:效率低,如果缺失值字段与其他字段相关性不大,预测效果差
③ 方法三:将缺失值单独作为一个分组,指定值进行填充
从业务上选择一个单独的值进行填充,使缺失值区别于其他值而作为一个分组,从而不影响算法计算。
优点:简单,实用
缺点:效率低

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
##使用Pandas进行特定值填充
dataset.fillna(0) ##不同字段的缺失值都用0填充
dataset.fillna({'col2':20,'col5':0}) ##不同字段使用不同的填充值
dataset.fillna(dataset.mean()) ##分别使用各字段的平均值填充
dataset.fillna(dataset.median()) ##分别使用个字段的中位数填充

##使用sklearn中的预处理方法进行缺失值填充(只适用于连续型字段)
from sklearn.preprocessing import Imputer
dataset2 = dataset.drop(['col4'],axis=1)
colsets = dataset2.columns
nan_rule1 = Imputer(missing_values='NaN',strategy='mean',axis=0) ##创建填充规则(平均值填充)
pd.DataFrame(nan_rule1.fit_transform(dataset2),columns=colsets) ##应用规则
nan_rule2 = Imputer(missing_values='median',strategy='mean',axis=0) ##创建填充规则(中位数填充)
pd.DataFrame(nan_rule2.fit_transform(dataset2),columns=colsets) ##应用规则
nan_rule3 = Imputer(missing_values='most_frequent',strategy='mean',axis=0) ##创建填充规则(众数填充)
pd.DataFrame(nan_rule3.fit_transform(dataset2),columns=colsets) ##应用规则

Oozie常用命令

Oozie任务重试

1
oozie job -rerun 0000000-191106143754176-oozie-oozi-W  -D oozie.wf.rerun.failnodes=true

检查xml格式

1
xmllint -noout hive-site.xml

查看coordinator状态

1
oozie jobs -jobtype coordinator  -filter status=RUNNING

查看执行错误的workflow.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[root@dlbdn3 ~]# oozie job -info 0000027-181203143359779-oozie-oozi-C -oozie http://localhost:11000/oozie -order=desc -timezone='Asia/Shanghai' -filter status=KILLED AND FAILED
Job ID : 0000027-181203143359779-oozie-oozi-C
------------------------------------------------------------------------------------------------------------------------------------
Job Name : PG-LANDING-JOB-Coordinator
App Path : hdfs://dlbdn3:8020/user/hue/oozie/deployments/_ericsson_-oozie-13-1543825800.46
Status : KILLED
Start Time : 2018-09-13 00:00 CST
End Time : 2040-09-01 15:15 CST
Pause Time : -
Concurrency : 1
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Err Code Created Nominal Time
0000027-181203143359779-oozie-oozi-C@30 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:25 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@29 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:20 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@28 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:15 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@27 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:10 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@26 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:05 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@25 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:00 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@24 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:55 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@23 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:50 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@22 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:45 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@21 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:40 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@20 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:35 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@19 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:30 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@18 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:25 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@17 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:20 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@16 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:15 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@15 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:10 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@14 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:05 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@13 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:00 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@12 KILLED 0000039-181203143359779-oozie-oozi-W - 2018-12-03 16:34 CST 2018-09-13 00:55 CST
------------------------------------------------------------------------------------------------------------------------------------

查看一个具体的workflow信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@dlbdn3 ~]# oozie job --oozie http://localhost:11000/oozie -info 0000039-181203143359779-oozie-oozi-W -localtime
Job ID : 0000039-181203143359779-oozie-oozi-W
------------------------------------------------------------------------------------------------------------------------------------
Workflow Name : PG-LANDING-JOB
App Path : hdfs://dlbdn3:8020/user/hue/oozie/workspaces/hue-oozie-1535681485.06
Status : KILLED
Run : 0
User : ericsson
Group : -
Created : 2018-12-03 16:42 CST
Started : 2018-12-03 16:42 CST
Last Modified : 2018-12-03 16:43 CST
Ended : 2018-12-03 16:43 CST
CoordAction ID: 0000027-181203143359779-oozie-oozi-C@12

Actions
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Ext Status Err Code
------------------------------------------------------------------------------------------------------------------------------------
0000039-181203143359779-oozie-oozi-W@:start: OK - OK -
------------------------------------------------------------------------------------------------------------------------------------
0000039-181203143359779-oozie-oozi-W@spark-23f2 KILLED job_1543800485319_0062 KILLED -
------------------------------------------------------------------------------------------------------------------------------------

查看当前运行的workflow有哪些

1
2
3
4
5
6
7
8
9
10
11
12
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf  -filter status=RUNNING
No Jobs match your criteria!
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf -filter status=RUNNING
Job ID App Name Status User Group Started Ended
------------------------------------------------------------------------------------------------------------------------------------
0000041-181203143359779-oozie-oozi-W PG-LANDING-JOBRUNNING ericsson - 2018-12-03 08:50 GMT -
------------------------------------------------------------------------------------------------------------------------------------
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf -filter status=RUNNING -localtime
Job ID App Name Status User Group Started Ended
------------------------------------------------------------------------------------------------------------------------------------
0000041-181203143359779-oozie-oozi-W PG-LANDING-JOBRUNNING ericsson - 2018-12-03 16:50 CST -
------------------------------------------------------------------------------------------------------------------------------------

深入理解Kafka副本机制

一、Kafka集群

Kafka 使用 Zookeeper 来维护集群成员 (brokers) 的信息。每个 broker 都有一个唯一标识 broker.id,用于标识自己在集群中的身份,可以在配置文件 server.properties 中进行配置,或者由程序自动生成。下面是 Kafka brokers 集群自动创建的过程:

每一个 broker 启动的时候,它会在 Zookeeper 的 /brokers/ids 路径下创建一个 临时节点,并将自己的 broker.id 写入,从而将自身注册到集群;

当有多个 broker 时,所有 broker 会竞争性地在 Zookeeper 上创建 /controller 节点,由于 Zookeeper 上的节点不会重复,所以必然只会有一个 broker 创建成功,此时该 broker 称为 controller broker。它除
了具备其他 broker 的功能外,还负责管理主题分区及其副本的状态。

当 broker 出现宕机或者主动退出从而导致其持有的 Zookeeper 会话超时时,会触发注册在 Zookeeper 上的 watcher 事件,此时 Kafka 会进行相应的容错处理;如果宕机的是 controller broker 时,还会触发新的controller 选举。

二、副本机制

为了保证高可用,kafka 的分区是多副本的,如果一个副本丢失了,那么还可以从其他副本中获取分区数据。但是这要求对应副本的数据必须是完整的,这是 Kafka 数据一致性的基础,所以才需要使用 controller broker 来进行专门的管理。下面将详解介绍 Kafka 的副本机制。

2.1 分区和副本
Kafka 的主题被分为多个分区 ,分区是 Kafka 最基本的存储单位。每个分区可以有多个副本 (可以在创建主题时使用 replication-factor 参数进行指定)。其中一个副本是首领副本 (Leader replica),所有的事件都直接发送给首领副本;其他副本是跟随者副本 (Follower replica),需要通过复制来保持与首领副本数据一致,当首领副本不可用时,其中一个跟随者副本将成为新首领。
alt

2.2 ISR机制
每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:

与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
在规定的时间内从首领副本那里低延迟地获取过消息。

如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。

这里给出一个主题创建的示例:使用 –replication-factor 指定副本系数为 3,创建成功后使用 –describe 命令可以看到分区 0 的有 0,1,2 三个副本,且三个副本都在 ISR 列表中,其中 1 为首领副本。
alt

2.3 不完全的首领选举
对于副本机制,在 broker 级别有一个可选的配置参数 unclean.leader.election.enable,默认值为 fasle,代表禁止不完全的首领选举。这是针对当首领副本挂掉且 ISR 中没有其他可用副本时,是否允许某个不完全同步的副本成为首领副本,这可能会导致数据丢失或者数据不一致,在某些对数据一致性要求较高的场景 (如金融领域),这可能无法容忍的,所以其默认值为 false,如果你能够允许部分数据不一致的话,可以配置为 true。

2.4 最少同步副本
ISR 机制的另外一个相关参数是 min.insync.replicas , 可以在 broker 或者主题级别进行配置,代表 ISR 列表中至少要有几个可用副本。这里假设设置为 2,那么当可用副本数量小于该值时,就认为整个分区处于不可用状态。此时客户端再向分区写入数据时候就会抛出异常 org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

2.5 发送确认
Kafka 在生产者上有一个可选的参数 ack,该参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入成功:
acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

三、数据请求

3.1 元数据请求机制
在所有副本中,只有领导副本才能进行消息的读写处理。由于不同分区的领导副本可能在不同的 broker 上,如果某个 broker 收到了一个分区请求,但是该分区的领导副本并不在该 broker 上,那么它就会向客户端返回一个 Not a Leader for Partition 的错误响应。 为了解决这个问题,Kafka 提供了元数据请求机制。

首先集群中的每个 broker 都会缓存所有主题的分区副本信息,客户端会定期发送发送元数据请求,然后将获取的元数据进行缓存。定时刷新元数据的时间间隔可以通过为客户端配置 metadata.max.age.ms 来进行指定。有了元数据信息后,客户端就知道了领导副本所在的 broker,之后直接将读写请求发送给对应的 broker 即可。

如果在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已经过时了,此时还有可能会收到 Not a Leader for Partition 的错误响应,这种情况下客户端会再次求发出元数据请求,然后刷新本地缓存,之后再去正确的 broker 上执行对应的操作,过程如下图:
alt

3.2 数据可见性
需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
alt

3.3 零拷贝
Kafka 所有数据的写入和读取都是通过零拷贝来实现的。传统拷贝与零拷贝的区别如下
传统模式下的四次拷贝与四次上下文切换
以将磁盘文件通过网络发送为例。传统模式下,一般使用如下伪代码所示的方法先将文件数据读入内存,然后通过 Socket 将内存中的数据发送出去。

1
2
buffer = File.read
Socket.send(buffer)

这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态 Buffer(DMA 拷贝),然后应用程序将内存态 Buffer 数据读入到用户态 Buffer(CPU 拷贝),接着用户程序通过 Socket 发送数据时将用户态 Buffer 数据拷贝到内核态 Buffer(CPU 拷贝),最后通过 DMA 拷贝将数据拷贝到 NIC Buffer。同时,还伴随着四次上下文切换,如下图所示:
alt

sendfile和transferTo实现零拷贝
Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝。数据通过 DMA 拷贝到内核态 Buffer 后,直接通过 DMA 拷贝到 NIC Buffer,无需 CPU 拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件到网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。零拷贝过程如下图所示:
alt

从具体实现来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 的 transferFrom 方法通过调用 Java NIO 中 FileChannel 的 transferTo 方法实现零拷贝,如下所示:

1
2
3
4
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供 sendfile 这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。

四、物理存储

4.1 分区分配
在创建主题时,Kafka 会首先决定如何在 broker 间分配分区副本,它遵循以下原则:

在所有 broker 上均匀地分配分区副本;
确保分区的每个副本分布在不同的 broker 上;
如果使用了 broker.rack 参数为 broker 指定了机架信息,那么会尽可能的把每个分区的副本分配到不同机架的 broker 上,以避免一个机架不可用而导致整个分区不可用。

基于以上原因,如果你在一个单节点上创建一个 3 副本的主题,通常会抛出下面的异常:

1
2
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor   
Exception: Replication factor: 3 larger than available brokers: 1.

4.2 分区数据保留规则
保留数据是 Kafka 的一个基本特性, 但是 Kafka 不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反, Kafka 为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。分别对应以下四个参数:

log.retention.bytes :删除数据前允许的最大数据量;默认值-1,代表没有限制;
log.retention.ms:保存数据文件的毫秒数,如果未设置,则使用 log.retention.minutes 中的值,默认为 null;
log.retention.minutes:保留数据文件的分钟数,如果未设置,则使用 log.retention.hours 中的值,默认为 null;
log.retention.hours:保留数据文件的小时数,默认值为 168,也就是一周。
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以 Kafka 把分区分成若干个片段,当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除。如果按照默认值保留数据一周,而且每天使用一个新片段,那么你就会看到,在每天使用一个新片段的同时会删除一个最老的片段,所以大部分时间该分区会有 7 个片段存在。

4.3 文件格式
通常保存在磁盘上的数据格式与生产者发送过来消息格式是一样的。 如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送 (格式如下所示) ,然后保存到磁盘上。之后消费者读取后再自己解压这个包装消息,获取每条消息的具体信息。
alt

Spark Yarn 的提交二种方式

一、前述

Spark可以和Yarn整合,将Application提交到Yarn上运行,和StandAlone提交模式一样,Yarn也有两种提交任务的方式。

具体

1、yarn-client提交任务方式

1
2
3
./spark-submit --master yarn  --class org.apache.spark.examples.SparkPi  ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn-lient --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

alt

执行原理图解

alt

执行流程
1.客户端提交一个Application,在客户端启动一个Driver进程。
2.Driver进程会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
3.RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
4.AM启动后,会向RS请求一批container资源,用于启动Executor.
5.RS会找到一批NM返回给AM,用于启动Executor。
6.AM会向NM发送命令启动Executor。
7.Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。

总结
1、Yarn-client模式同样是适用于测试,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.

2、 ApplicationMaster的作用:

         为当前的Application申请资源

         给NodeManager发送消息启动Executor。

注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。

2、yarn-cluster提交任务方式

提交命令

1
2
./spark-submit --master yarn --deploy-mode cluster  --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

结果在yarn的日志里面:

alt

执行原理
alt
执行流程
1.客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
2.RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。
3.AM启动,AM发送请求到RS,请求一批container用于启动Executor。
4.RS返回一批NM节点给AM。
5.AM连接到NM,发送请求到NM启动Executor。
6.Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。

总结
1、Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

2.ApplicationMaster的作用:

       为当前的Application申请资源

       给nodemanager发送消息 启动Excutor。

       任务调度。(这里和client模式的区别是AM具有调度能力,因为其就是Driver端,包含Driver进程)

ELK日志采集logstash output -> elasticsearch 数据写入性能优化。

背景

前些时间测试线上ELK环境,发现beats组件直连elasticsearch数据灌入异常快,但是过了logstash数据明显迟缓。判定logstah的灌入存在瓶颈。以下为logstash调优细节。

环境

本次针对的优化对象是线上的日志分析平台,主要数据源为基础服务器数据和应用日志
ES节点:顶配3点集群,各方面负载不高,无写入瓶颈
logstah节点:2个汇聚端,
网络:内网万兆传输,无瓶颈
数据量(条):1~1.5w/s

架构

alt

分析

logstah的功能是一个管道,通过input灌入数据,filter过滤数据,output输入数据

input:filebeat和metricbeat总连接数为500左右,且观察日志无retry 或 timeout等输出,无明显瓶颈
filter和output:logstash的正则解析过程非常消耗资源,但是我们的节点资源消耗居然不高。在新版的logstash中优化了input,filter,output的线程模式,在配置文件中可以通过配置pipeline.workers来调整filter和output的线程数

优化

查询官网手册后,最影响logstash传输效率的参数有以下几个:
1、pipeline.workers:决定filter和output的线程数,官方建议大于CPU数,如果logstah节点是混用服务器,建议等于或小于CPU数
2、pipeline.batch.size:单个线程每次调用ES bulk index API时的事件数。这些时间将被放到内存中。最好的设定值是不断地测试,测试,测试。
3、pipeline.batch.size:单个线程每次调用ES bulk index API时的事件数。这些时间将被放到内存中。最好的设定值是不断地测试,测试,测试。
优化后的logstash
pipeline.batch.size: 2500
pipeline.batch.delay: 5
pipeline.workers: 8
pipeline.batch.size: 2500
pipeline.batch.delay: 50

优化结果

alt

SparkStructured StreamExecution:持续查询的运转引擎

StreamExecution 的初始状态

定义好 Dataset/DataFrame 的产生、变换和写出,再启动 StreamExection 去持续查询。这些 Dataset/DataFrame 的产生、变换和写出的信息就对应保存在 StreamExecution 非常重要的 3 个成员变量中:
1、sources: streaming data 的产生端(比如 kafka 等)
2、logicalPlan: DataFrame/Dataset 的一系列变换(即计算逻辑)
3、sink: 最终结果写出的接收端(比如 file system 等)

StreamExection 另外的重要成员变量是:
1、currentBatchId: 当前执行的 id
2、batchCommitLog: 已经成功处理过的批次有哪些
3、offsetLog, availableOffsets, committedOffsets: 当前执行需要处理的 source data 的 meta 信息
4、offsetSeqMetadata: 当前执行的 watermark 信息(event time 相关,本文暂不涉及、另文解析)等

alt

StreamExecution 的持续查询

alt
一次执行的过程如上图;这里有 6 个关键步骤:
1.StreamExecution 通过 Source.getOffset() 获取最新的 offsets,即最新的数据进度;
2.StreamExecution 将 offsets 等写入到 offsetLog 里
这里的 offsetLog 是一个持久化的 WAL (Write-Ahead-Log),是将来可用作故障恢复用
3.StreamExecution 构造本次执行的 LogicalPlan
(3a) 将预先定义好的逻辑(即 StreamExecution 里的 logicalPlan 成员变量)制作一个副本出来
(3b) 给定刚刚取到的 offsets,通过 Source.getBatch(offsets) 获取本执行新收到的数据的 Dataset/DataFrame 表示,并替换到 (3a) 中的副本里
经过 (3a), (3b) 两步,构造完成的 LogicalPlan 就是针对本执行新收到的数据的 Dataset/DataFrame 变换(即整个处理逻辑)了
4.触发对本次执行的 LogicalPlan 的优化,得到 IncrementalExecution
逻辑计划的优化:通过 Catalyst 优化器完成
物理计划的生成与选择:结果是可以直接用于执行的 RDD DAG
逻辑计划、优化的逻辑计划、物理计划、及最后结果 RDD DAG,合并起来就是 IncrementalExecution
5.将表示计算结果的 Dataset/DataFrame (包含 IncrementalExecution) 交给 Sink,即调用 Sink.add(ds/df)
6.计算完成后的 commit
(6a) 通过 Source.commit() 告知 Source 数据已经完整处理结束;Source 可按需完成数据的 garbage-collection
(6b) 将本次执行的批次 id 写入到 batchCommitLog 里

StreamExecution 的持续查询(增量)

alt
Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据(而不仅仅是本次执行信收到的数据),所以每次执行的结果是针对全量数据进行计算的结果。

但是在实际执行过程中,由于全量数据会越攒越多,那么每次对全量数据进行计算的代价和消耗会越来越大。

Structured Streaming 的做法是:
1.引入全局范围、高可用的 StateStore
2.转全量为增量,即在每次执行时:
先从 StateStore 里 restore 出上次执行后的状态
然后加入本执行的新数据,再进行计算
如果有状态改变,将把改变的状态重新 save 到 StateStore 里
3.为了在 Dataset/DataFrame 框架里完成对 StateStore 的 restore 和 s如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:
所以 Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据,但在具体实现上转换为增量的持续查询。

故障恢复

alt
由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,我们本节的故障恢复只讨论 driver 故障恢复。
1.如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:
2.读取 WAL offsetlog 恢复出最新的 offsets 等;相当于取代正常流程里的 (1)(2) 步
3.读取 batchCommitLog 决定是否需要重做最近一个批次
4.如果需要,那么重做 (3a), (3b), (4), (5), (6a), (6b) 步
这里第 (5) 步需要分两种情况讨论
(i) 如果上次执行在 (5) 结束前即失效,那么本次执行里 sink 应该完整写出计算结果
(ii) 如果上次执行在 (5) 结束后才失效,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)
这样即可保证每次执行的计算结果,在 sink 这个层面,是 不重不丢 的 —— 即使中间发生过 1 次或以上的失效和恢复。

小结:end-to-end exactly-once guarantees

所以在 Structured Streaming 里,我们总结下面的关系[4]:

alt
这里的 end-to-end 指的是,如果 source 选用类似 Kafka, HDFS 等,sink 选用类似 HDFS, MySQL 等,那么 Structured Streaming 将自动保证在 sink 里的计算结果是 exactly-once 的 —— Structured Streaming终于把过去需要使用者去维护的 sink 去重逻辑接盘过去了!:-)

通过BulkLoad(MR)快速将海量数据导入到Hbase

原始文件从mysq导出来的csv文件

1
2
3
4
5
6
7
8
9
10
503003676755886086,503003161271734273,1
503003669797548035,503003161271734273,1
503003568609964035,503003161271734273,1
503003700512428038,503003161271734273,1
503003764244881414,503003161271734273,1
503003647634841604,503003161271734273,4
503003747857739782,503003161271734273,7
503003582082068480,503003161271734273,5
503003646376542208,503003161271734273,3
503003631474180105,503003161271734273,1

使用MR 对文件进行加盐操作根据MD5/region数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.security.MessageDigest;


/**
* 计数系统工具类
*/

public class CounterUtils {


private static final Logger logger = LoggerFactory.getLogger(CounterUtils.class);


/** region数量 */
private static Integer region_num = 4;

/** salt前缀 */
private static String salt_prefix = "00";




/**
* 为HBase rowkey产生salt值
* @param key 原始计数键名
* @return salt值
*/

public static String generateSalt(String key){

Integer region_seq = Math.abs(generateMD5(key).hashCode()) % region_num;

return salt_prefix+String.valueOf(region_seq);

}


/**
* 根据字符串产生MD5值
* @param msg
* @return 字符串对应的MD5值
*/

public static String generateMD5(String msg) {

MessageDigest md5 = null;

try {

md5 = MessageDigest.getInstance("MD5");

} catch (Exception e) {


logger.error("产生MD5值报错",e);

return "";
}

char[] charArray = msg.toCharArray();
byte[] byteArray = new byte[charArray.length];

for (int i = 0; i < charArray.length; i++)
byteArray[i] = (byte) charArray[i];

byte[] md5Bytes = md5.digest(byteArray);
StringBuffer hexValue = new StringBuffer();

for (int i = 0; i < md5Bytes.length; i++) {

int val = ((int) md5Bytes[i]) & 0xff;
if (val < 16)
hexValue.append("0");
hexValue.append(Integer.toHexString(val));
}

return hexValue.toString();

}

}

加盐生成另一种文件MR程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.jiatui.bigdata.util.CounterUtils;



public class DateConversion extends Configured implements Tool {

// 构建map类
public static class TestMap extends Mapper<LongWritable, Text, Text, TestWritable> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 根据$开始切割字段名
String[] splited = value.toString().split(",");
// 以第一个和第二个为rowkey值
String rowkey1="LOOK_TIMES_" + splited[1] + "_" + splited[0];

String rowkey = CounterUtils.generateSalt(rowkey1)+rowkey1;

Text k2 = new Text(rowkey);
// 第三个为value值
TestWritable v2 = new TestWritable(splited[2]);

context.write(k2, v2);

}

}

// 构建reduce类
public static class TestReduce extends Reducer<Text, TestWritable, Text, TestWritable> {

public void reduce(Text k2, Iterable<TestWritable> v2s, Context context)
throws IOException, InterruptedException {

String value;

// 循环所有的key值和values值
for (TestWritable testWritable : v2s) {

value = testWritable.value;

TestWritable v3 = new TestWritable(value);
context.write(k2, v3);
}

}

}

// main方法启动
public static void main(String[] args) throws IOException, Exception {
ToolRunner.run(new DateConversion(), args);
}

public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ",");

String[] argArray = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job = Job.getInstance(conf, "Test");
FileSystem fs = FileSystem.get(new URI(args[1]), conf);
fs.delete(new Path(args[1]));
job.setJarByClass(DateConversion.class);
job.setMapperClass(TestMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TestWritable.class);
job.setReducerClass(TestReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TestWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}

static class TestWritable implements Writable {

String value;

public TestWritable(String value) {
this.value = value;

}

// 无参构造方法public class UserBean implements Writable
// 这个应该是在自定义writable的时候需要注意,反射过程中需要调用无参构造。
public TestWritable() {
}

public void readFields(DataInput in) throws IOException {

this.value = in.readUTF();

}

public void write(DataOutput out) throws IOException {

out.writeUTF(value);

}

public String toString() {
return value;
}

}

}

生成文件的形式为

1
2
3
4
5
6
7
8
9
10
000LOOK_TIMES_503003049082486784_503003525400236039,3
000LOOK_TIMES_503003049082486784_503003558279393283,1
001LOOK_TIMES_503003049099264000_503003517552689161,6
002LOOK_TIMES_503003049099264000_503003586590937094,2
002LOOK_TIMES_503003049099264000_503003611433799685,1
002LOOK_TIMES_503003049099264000_503003638604505093,1
003LOOK_TIMES_503003049099264000_503003646833725450,2
003LOOK_TIMES_503003049099264000_503254267948171264,2
003LOOK_TIMES_503003049099264000_504261308741332992,1
003LOOK_TIMES_503003049099264000_505510528722927616,1

在进行另一个MR操作将生成的文件导入到Hbase表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;





/**
*
* <p>Title: GeneratorHFile</p>
* <p>Description: 先把文件rowkey加盐成文件,根据rowkey ASC 排序 上传HDFS(排序在hive总进行)
* insert overwrite directory '/dmp_operator1/test/'
row format delimited
fields terminated by ','
select * from test order by id asc;
* </p>
* @author zhangshuai
* @date 2018年12月19日
*/
public class GeneratorHFile extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

private static Logger logger = Logger.getLogger(GeneratorHFile.class);



protected void map(LongWritable Key, Text Value,
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {

//切分导入的数据
String Values=Value.toString();
String[] Lines=Values.split(",");


String Rowkey=Lines[0];

Long ColValue=Long.valueOf(Lines[1]);
//拼装rowkey和put;
ImmutableBytesWritable PutRowkey=new ImmutableBytesWritable(Bytes.toBytes(Rowkey));
Put put=new Put(Bytes.toBytes(Rowkey));
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cnt"), Bytes.toBytes(ColValue));

context.write(PutRowkey,put);



}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package com.jiatui.bigdata;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;

import com.jiatui.bigdata.util.ScheduleKerBeros;

public class GenerateHFileDriver {

public static void main(String[] args) throws Exception {

ScheduleKerBeros scheduleKerBeros = new ScheduleKerBeros();
scheduleKerBeros.scheduled();

/**
* 获取Hbase配置,创建连接到目标表,表在Shell中已经创建好,建表语句create
* 'counter_sys.counter_tbl','cf1',这里注意HBase对大小写很敏感
*/

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.103.3,192.168.103.4,192.168.103.5");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.addResource("/home/dmp_operator1/tickets/hbase-site.xml");
conf.addResource("/home/dmp_operator1/tickets/core-site.xml");
conf.addResource("/home/dmp_operator1/tickets/hdfs-site.xml");
conf.set("zookeeper.znode.parent", "/hbase-secure");

conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(64 * 1024 * 1024));
conf.set("mapred.min.split.size", String.valueOf(64 * 1024 * 1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(64 * 1024 * 1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(64 * 1024 * 1024));


Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("counter_sys:counter_tbl"));
Admin admin = conn.getAdmin();

final String InputFile = "hdfs://DATASEA/user/hbase/test/input";
final String OutputFile = "hdfs://DATASEA/user/hbase/test/output";
final Path OutputPath = new Path(OutputFile);

// 设置相关类名
Job job = Job.getInstance(conf, "counter_sys:counter_tbl");
job.setJarByClass(GenerateHFileDriver.class);
job.setMapperClass(GeneratorHFile.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

// 设置文件的输入路径和输出路径
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileInputFormat.setInputPaths(job, InputFile);
FileOutputFormat.setOutputPath(job, OutputPath);

// 配置MapReduce作业,以执行增量加载到给定表中。
HFileOutputFormat2.configureIncrementalLoad(job, table,
conn.getRegionLocator(TableName.valueOf("counter_sys:counter_tbl")));

// MapReduce作业完成,告知RegionServers在哪里找到这些文件,将文件加载到HBase中
if (job.waitForCompletion(true)) {
LoadIncrementalHFiles Loader = new LoadIncrementalHFiles(conf);
Loader.doBulkLoad(OutputPath, admin, table,
conn.getRegionLocator(TableName.valueOf("counter_sys:counter_tbl")));
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.sun.tools.extcheck.Main;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.time.LocalTime;

/**
*
* <p>
* Title: ScheduleTest
* </p>
* <p>
* Description:
* </p>
*
* @author zhangshuai
* @date 2018年12月6日
*/


public class ScheduleKerBeros {
private static final Logger logger = LoggerFactory.getLogger(ScheduleKerBeros.class);

/**
*
* <p>Title: scheduled</p>
* <p>Description: </p>
*/

public void scheduled() {

logger.info("开始授权...");
//System.setProperty("java.security.krb5.conf", this.getClass().getResource("/krb5.conf").getPath());
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
try {
logger.info("授权中1...");
UserGroupInformation.loginUserFromKeytab("hbase/dev-bg-m01@DATASEA.COM","/etc/security/keytabs/hbase.service.keytab");
//UserGroupInformation.loginUserFromKeytab("dmp_operator1@DATASEA.COM",this.getClass().getResource("/dmp_operator1.keytab").getPath());


logger.info("授权成功1...");

} catch (Exception e) {

}

}




}
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量