Spring mvc 框架定时刷新kerberos认证票据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.XXX.counter.listener;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.util.Timer;
public class TicketScanerListener implements ServletContextListener {
private Object lock = new Object();
public void contextDestroyed(ServletContextEvent arg0) {
System.out.println("web应用关闭...");
}

public void contextInitialized(ServletContextEvent arg0) {
System.out.println("web应用初始化...");
// 创建定时器
Timer timer = new Timer();
// 每隔30秒就定时执行任务
timer.schedule(new TicketScanerTask(lock), 0, 1000 * 1000);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.xxx.counter.listener;

import java.util.TimerTask;

import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
/**
* 定时器,定义定时任务的具体内容
*/
public class TicketScanerTask extends TimerTask{
private static Logger logger = Logger.getLogger(TicketScanerTask.class);
// 存储传递过来的锁
private Object lock;
// 构造方法
TicketScanerTask( Object lock){
this.lock = lock;
}

private static Configuration conf = null;
public static String principal = "dmp_operator1@DATASEA.COM";
public static String keytabPath = "dmp_operator1.keytab";
@Override
public void run() {
// 考虑到多线程的情况,这里必须要同步
synchronized (lock){

logger.info("TicketScanerTask......");

System.setProperty("java.security.krb5.conf",this.getClass().getResource("/krb5.conf").getPath());

try {
UserGroupInformation.loginUserFromKeytab("dmp_operator1@DATASEA.COM", this.getClass().getResource("/dmp_operator1.keytab").getPath());

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("授权失败...");
}


}
}
}
1
2
3
<listener>
<listener-class>com.xxx.counter.listener.TicketScanerListener</listener-class>
</listener>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.xxx.counter.util;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HbaseConnection {
private static final Logger logger = LoggerFactory.getLogger(HbaseConnection.class);

public static Configuration conf = null;
public static Connection connection = null;

public Connection getConnection () throws IOException {
conf = HBaseConfiguration.create();

conf.addResource(this.getClass().getResource("/hbase-site.xml").getPath());
conf.addResource(this.getClass().getResource("/core-site.xml").getPath());
conf.addResource(this.getClass().getResource("/hdfs-site.xml").getPath());
conf.set("hbase.zookeeper.quorum", "192.168.103.3,192.168.103.4,192.168.103.5");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase-secure");
conf.setLong("hbase.rpc.timeout", 30000);
connection = ConnectionFactory.createConnection(conf);
return connection;
}
}

Spark Scala 获取自然日 属于每周的第一天和每周的最后一天,每月的第一天和每月的最后一天

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.xxxx.bigdata
import java.util.Calendar
import java.text.SimpleDateFormat
import org.apache.spark.sql.Row
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object Date_week_month {
def getNowWeekStart(logdate: String) = {
var period: String = ""
var cal: Calendar = Calendar.getInstance();
var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");

cal.setTime(df.parse(logdate))

var d = 0
if (cal.get(Calendar.DAY_OF_WEEK) == 1) {
d = -6
} else {
d = 2 - cal.get(Calendar.DAY_OF_WEEK)
}

cal.add(Calendar.DAY_OF_WEEK, d)

val startdate = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())

cal.add(Calendar.DAY_OF_WEEK, 6)

val enddate = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())

(startdate, enddate)
}

def getNowMonthStart(logdate: String) = {
var cal: Calendar = Calendar.getInstance();
var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.setTime(df.parse(logdate))
cal.set(Calendar.DATE, 1)
val monthstart = df.format(cal.getTime())

cal.set(Calendar.DATE, 1)
cal.roll(Calendar.DATE, -1)
val monthend = df.format(cal.getTime())

(monthstart, monthend)

}
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("test").setMaster("local")
conf.set("spark.default.parallelism", "200")
conf.set("spark.sql.shuffle.partitions", "200")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("zookeeper.znode.parent", "/hbase-secure")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hiveContext.setConf("hive.exec.max.dynamic.partitions.pernode", "2000")
hiveContext.setConf("hive.exec.max.dynamic.partitions", "5000")
sc.setLogLevel("ERROR")
val startTime = "2018-01-01"

val endTime = "2021-12-31"

val dateFormat = new SimpleDateFormat("yyyy-MM-dd")

val dateFiled = Calendar.DAY_OF_MONTH

var beginDate = dateFormat.parse(startTime)

val endDate = dateFormat.parse(endTime)

val calendar = Calendar.getInstance()

calendar.setTime(beginDate)

val dateArray: ArrayBuffer[String] = ArrayBuffer()

val data = new ArrayBuffer[Row]
while (beginDate.compareTo(endDate) <= 0) {

val login_time = dateFormat.format(beginDate)

val starttime = getNowWeekStart(login_time)._1
val endtime = getNowWeekStart(login_time)._2

val monthstart = getNowMonthStart(login_time)._1
val monthend = getNowMonthStart(login_time)._2

data += Row(login_time, starttime, endtime, monthstart, monthend)

calendar.add(dateFiled, 1)
beginDate = calendar.getTime

}

val rdd = sc.parallelize(data)

val stuct = StructType(
Array(
StructField("login_time", StringType),
StructField("starttime", StringType),
StructField("endtime", StringType),
StructField("monthstart", StringType),
StructField("monthend", StringType)))

val test1 = hiveContext.createDataFrame(rdd, stuct)

val tmpTable = "dateTable"

test1.registerTempTable(tmpTable)

val sql = s"""
select count(*) from
$tmpTable
"""

println("sql = " + sql)

hiveContext.sql(sql).show();

sc.stop()
}
}

Hive SQL 实现以上方法先循环取出,日期之间所有的日期

1
2
3
4
5
6
7
8
9
10
START_DAY='2018-01-01'
END_DAY='2021-12-31'

initSec=`date -d $START_DAY +%s`
finiSec=`date -d $END_DAY +%s`
for ((i=$initSec; i <=$finiSec; i+=24 * 3600 ));do
cur_day=`date -d "1970-1-1 UTC $i seconds" +%F`
echo "$cur_day,,,,">> testa

done

生成了 这样的文件

1
2
3
4
5
6
7
8
2018-01-01,,,,
2018-01-02,,,,
2018-01-03,,,,
2018-01-04,,,,
2018-01-05,,,,
2018-01-06,,,,
2018-01-07,,,,
.............

然后load到Hive表中

1
CREATE TABLE test (dt String,WEEK_START string,WEEK_END string,MONTH_START string,MONTH_END string)row format delimited fields TERMINATED BY ',';

执行SQL

1
2
insert overwrite table test select  dt,date_sub(dt,cast(date_format(dt,'u') as int)-1) as start_time,
date_sub(dt,cast(date_format(dt,'u') as int)-7) as end_time,trunc(dt,'MM'),last_day(dt) from test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| 2021-02-24  | 2021-02-22  | 2021-02-28  | 2021-02-01  | 2021-02-28  |
| 2021-02-25 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-02-26 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-02-27 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-02-28 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-03-01 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-02 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-03 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-04 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-05 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-06 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-07 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-08 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-09 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-10 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-11 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-12 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-13 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-14 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-15 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-16 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-17 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-18 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-19 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-20 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-21 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-22 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-23 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-24 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-25 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-26 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-27 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-28 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-29 | 2021-03-29 | 2021-04-04 | 2021-03-01 | 2021-03-31 |
| 2021-03-30 | 2021-03-29 | 2021-04-04 | 2021-03-01 | 2021-03-31 |
| 2021-03-31 | 2021-03-29 | 2021-04-04 | 2021-03-01 | 2021-03-31 |
| 2021-04-01 | 2021-03-29 | 2021-04-04 | 2021-04-01 | 2021-04-30 |

Kafka HA Kafka一致性重要机制之ISR(kafka replica)

一、kafka replica

当某个topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)。kafka的replica包含leader与follower。
Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,因此可以使用Broker id 指定Partition的Replica。
所有Partition的Replica默认情况会均匀分布到所有Broker上。

二、Data Replication如何Propagate(扩散出去)消息?

每个Partition有一个leader与多个follower,producer往某个Partition中写入数据是,只会往leader中写入数据,然后数据才会被复制进其他的Replica中。
数据是由leader push过去还是有flower pull过来?
kafka是由follower周期性或者尝试去pull(拉)过来(其实这个过程与consumer消费过程非常相似),写是都往leader上写,但是读并不是任意flower上读都行,读也只在leader上读,flower只是数据的一个备份,保证leader被挂掉后顶上来,并不往外提供服务。

三、Data Replication何时Commit?

同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高。
异步复制: 只要leader拿到数据立即commit,等follower慢慢去复制,可用性高,立即返回,一致性差一些。
Commit:是指leader告诉客户端,这条数据写成功了。kafka尽量保证commit后立即leader挂掉,其他flower都有该条数据

kafka不是完全同步,也不是完全异步,是一种ISR机制

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
当ISR中所有Replica都向Leader发送ACK时,leader才commit既然所有Replica都向Leader发送ACK时,leader才commit,那么flower怎么会leader落后太多?
producer往kafka中发送数据,不仅可以一次发送一条数据,还可以发送message的数组;批量发送,同步的时候批量发送,异步的时候本身就是就是批量;底层会有队列缓存起来,批量发送,对应broker而言,就会收到很多数据(假设1000),这时候leader发现自己有1000条数据,flower只有500条数据,落后了500条数据,就把它从ISR中移除出去,这时候发现其他的flower与他的差距都很小,就等待;如果因为内存等原因,差距很大,就把它从ISR中移除出去。

server配置

1
2
3
4
5
6
7
rerplica.lag.time.max.ms=10000
# 如果leader发现flower超过10秒没有向它发起fech请求,那么leader考虑这个flower是不是程序出了点问题
# 或者资源紧张调度不过来,它太慢了,不希望它拖慢后面的进度,就把它从ISR中移除。

rerplica.lag.max.messages=4000 # 相差4000条就移除
# flower慢的时候,保证高可用性,同时满足这两个条件后又加入ISR中,
# 在可用性与一致性做了动态平衡 亮点

topic配置

1
min.insync.replicas=1 # 需要保证ISR中至少有多少个replica

Producer配置

1
2
3
4
5
6
request.required.asks=0
# 0:相当于异步的,不需要leader给予回复,producer立即返回,发送就是成功,
那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步),
既有可能丢失也可能会重发
# 1:当leader接收到消息之后发送ack,丢会重发,丢的概率很小
# -1:当所有的follower都同步消息成功后发送ack. 丢失消息可能性比较低

Data Replication如何处理Replica恢复

leader挂掉了,从它的follower中选举一个作为leader,并把挂掉的leader从ISR中移除,继续处理数据。一段时间后该leader重新启动了,它知道它之前的数据到哪里了,尝试获取它挂掉后leader处理的数据,获取完成后它就加入了ISR。

Data Replication如何处理Replica全部宕机

等待ISR中任一Replica恢复,并选它为Leader 等待时间较长,降低可用性或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用
选择第一个恢复的Replica为新的Leader,无论它是否在ISR中并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失可用性较高

Spark1.X操作DataFrame示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{"id":1, "name":"Ganymede", "age":32}

{"id":2, "name":"Lilei", "age":19}

{"id":3, "name":"Lily", "age":25}

{"id":4, "name":"Hanmeimei", "age":25}

{"id":5, "name":"Lucy", "age":37}

{"id":6, "name":"Tom", "age":27}

1,Ganymede,32

2, Lilei, 19

3, Lily, 25

4, Hanmeimei, 25

5, Lucy, 37

6, wcc, 4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package DataCleaning


import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameTest {
case class People(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameTest").setMaster("local")

val sc = new SparkContext(conf)

val sqlContxt = new SQLContext(sc)

val dfsql = sqlContxt.read.json("people.json")

val dftxt = sc.textFile("people.txt")



//映射表
dfsql.registerTempTable("people")

dfsql.show()
dfsql.printSchema()

dfsql.select(dfsql.col("id"),dfsql.col("name")).foreach( x =&gt; {

println(x.get(0),x.get(1))
})


dfsql.select(dfsql.col("id"),dfsql.col("name")).foreachPartition( Iterator =&gt;
Iterator.foreach(x =&gt; {
println(x.getAs("id"),x.getAs("name"))
}))


sqlContxt.sql("select id,name from people").foreach( x =&gt; {
println("SQL打印出来的````"+x.get(0),x.get(1))
})

sqlContxt.sql("select id,name from people").foreachPartition(Iterable =&gt; {
Iterable.foreach( x =&gt; {
println("SQL打印出来的````"+x.getAs("id"),x.getAs("name"))
})
})


val peopleRowRdd = dftxt.map(x =&gt; x.split(",")).map(data =&gt;{
val id = data(0).trim().toInt
val name = data(1).trim()
val age = data(2).trim().toInt
Row(id,name,age)

})
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name",StringType,true ),
StructField("age", IntegerType, true)
))

val df = sqlContxt.createDataFrame(peopleRowRdd, structType)

df.registerTempTable("people1")
println("-------------------")
df.show()
df.printSchema()



val people = sc.textFile("people.txt")

val peopleRDD = people.map { x =&gt; x.split(",") }.map ( data =&gt;
{
People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt)
})

//这里需要隐式转换一把
import sqlContxt.implicits._
val dfDF = peopleRDD.toDF()
dfDF.registerTempTable("people")

println("-------------case class反射来映射注册临时表-----------------------")
dfDF.show()
dfDF.printSchema()

}


}

Spark scala 抽取mysql数据 导入Hive

1
2
3
4
5
6
7
8
9
10
11
12
driverClass=com.mysql.jdbc.Driver
feeds.jdbcUrl=jdbc:mysql://172.16.XX.X:3306/feeds?useUnicode=true&characterEncoding=utf8&tinyInt1isBit=false&autoReconnect=true&useSSL=false
feeds.user=root
feeds.password=Xingrui@jiatuimysql

initialPoolSize=3
minPoolSize=3
acquireIncrement=3
maxPoolSize=15
maxIdleTime=10
acquireRetryAttempts=30
acquireRetryDelay=1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata_cardevent_etl</groupId>
<artifactId>bigdata_cardevent_etl</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>${project.artifactId}</name>


<properties>
<encoding>UTF-8</encoding>
<spark.version>1.6.3</spark.version>
<mysql.version>5.1.47</mysql.version>
<c3p0.version>0.9.5-pre4</c3p0.version>
<fastjson.version>1.2.47</fastjson.version>

</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>${c3p0.version}</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<source>1.8</source>
<target>1.8</target>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.jiatui.bigdata.format.sdklog.main.FormatSdkLogApp</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.jiatui.bigdata.util

import com.mchange.v2.c3p0.ComboPooledDataSource
import java.util.Properties
import java.io.File
import java.io.FileInputStream
import java.sql.Connection
import scala.collection.mutable.ArrayBuffer
import java.sql.ResultSet
/**
* c3p0连接
*/
class DBUtilC3p0() extends Serializable{
private val cpds: ComboPooledDataSource =new ComboPooledDataSource(true)

private val prop=new Properties

{
try {
val file: File = new File("c3p0.properties")
val in = new FileInputStream(file)
prop.load(in)
println("prop:" + prop)
cpds.setJdbcUrl(prop.getProperty("feeds.jdbcUrl"))
cpds.setDriverClass(prop.getProperty("driverClass"))
cpds.setUser(prop.getProperty("feeds.user"))
cpds.setPassword(prop.getProperty("feeds.password"))
cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize")))
cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize")))
cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement")))
cpds.setInitialPoolSize(Integer.valueOf(prop.getProperty("initialPoolSize")))
cpds.setMaxIdleTime(Integer.valueOf(prop.getProperty("maxIdleTime")))
} catch {
case ex: Exception => ex.printStackTrace()
}
}



def getConnection: Connection = {
try {
cpds.getConnection()
} catch {
case ex: Exception =>
ex.printStackTrace()
null
}
}
}
object DBUtilC3p0 {
var dbUtilC3p0: DBUtilC3p0 = _
def getDBUtilC3p0(): DBUtilC3p0 = {
synchronized {
if (dbUtilC3p0 == null) {
dbUtilC3p0 = new DBUtilC3p0()
}
}
dbUtilC3p0
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.jiatui.bigdata.util

import java.sql.ResultSet
import java.sql.Connection
import java.sql.PreparedStatement
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.sql.DriverManager
import jodd.util.PropertiesUtil
import scala.collection.mutable.ArrayBuffer

/*
* <p>Title: CardEventEtl</p>
*
* <p>Description:获取feeds所有数据表名 </p>
*
* @author zhangshuai
*
* @date 2019年3月1日
*/

object DBUtil {

def getFeedsTableNames() = {
var data = new ArrayBuffer[String]()
val conn = DBUtilC3p0.getDBUtilC3p0().getConnection
try {
val stat = conn.createStatement()
val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")

while (rs.next()) {
val tableName = rs.getString(1)

data += tableName

}
rs.close()
stat.close()
data
} catch {
case t: Exception =>
t.printStackTrace()
println("获取feeds表表名时候出错.")
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case t: Exception =>
t.printStackTrace()
}
}
data
}



// private var connection: Connection = _
//
// private var preparedStatement: PreparedStatement = _
//
// private var resultSet: ResultSet = _
//
// def getCardEventTableNames = {
// var data = new ArrayBuffer[String]()
// val conn = DBUtil.getConnection
// try {
// val stat = conn.createStatement()
// val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")
//
// while (rs.next()) {
// val tableName = rs.getString(1)
//
// data += tableName
//
// }
// rs.close()
// stat.close()
// data
// } catch {
// case t: Exception =>
// t.printStackTrace()
// println("获取统计表表名时候出错.")
// } finally {
//
// try {
// if (conn != null) {
// conn.close()
//
// }
//
// } catch {
// case t: Exception =>
// t.printStackTrace()
// }
//
// }
// data
//
// }
//
//
// def getFeedsConnection: Connection ={
// DBUtil.getConnection
// }
//
//
//
// // 数据库驱动类
//
// private val driverClass: String = ConfigurationManager.getProperty("driverClass")
// // 数据库连接地址
// private val url: String = ConfigurationManager.getProperty("feeds.jdbcUrl")
//
// // 数据库连接用户名
// private val username: String = ConfigurationManager.getProperty("feeds.user")
// // 数据库连接密码
// private val password: String = ConfigurationManager.getProperty("feeds.password")
//
// // 加载数据库驱动
// // Class.forName(driverClass)
// Class.forName("com.mysql.jdbc.Driver")
//
// // 连接池大小
// val poolSize: Int = ConfigurationManager.getProperty("poolSize").toInt
//
// // 连接池 - 同步队列
// private val pool: BlockingQueue[Connection] = new LinkedBlockingQueue[Connection]()
//
// /**
// * 初始化连接池
// */
// for (i <- 1 to poolSize) {
// DBUtil.pool.put(DriverManager.getConnection(url, username, password))
// }
//
// /**
// * 从连接池中获取一个Connection
// * @return
// */
// private def getConnection: Connection = {
// pool.take()
// }
//
// /**
// * 向连接池归还一个Connection
// * @param conn
// */
// private def returnConnection(conn: Connection): Unit = {
// DBUtil.pool.put(conn)
// }
//
// /**
// * 启动守护线程释放资源
// */
// def releaseResource() = {
// val thread = new Thread(new CloseRunnable)
// thread.setDaemon(true)
// thread.start()
// }
//
// /**
// * 关闭连接池连接资源类
// */
// class CloseRunnable extends Runnable {
// override def run(): Unit = {
// while (DBUtil.pool.size > 0) {
// try {
// // println(s"当前连接池大小: ${DBUtil.pool.size}")
// DBUtil.pool.take().close()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.jiatui.bigdata.util

import java.sql.ResultSet
import java.sql.Connection
import java.sql.PreparedStatement
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.sql.DriverManager
import jodd.util.PropertiesUtil
import scala.collection.mutable.ArrayBuffer

/*
* <p>Title: CardEventEtl</p>
*
* <p>Description:获取feeds所有数据表名 </p>
*
* @author zhangshuai
*
* @date 2019年3月1日
*/

object DBUtil {

def getFeedsTableNames() = {
var data = new ArrayBuffer[String]()
val conn = DBUtilC3p0.getDBUtilC3p0().getConnection
try {
val stat = conn.createStatement()
val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")

while (rs.next()) {
val tableName = rs.getString(1)

data += tableName

}
rs.close()
stat.close()
data
} catch {
case t: Exception =>
t.printStackTrace()
println("获取feeds表表名时候出错.")
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case t: Exception =>
t.printStackTrace()
}
}
data
}



// private var connection: Connection = _
//
// private var preparedStatement: PreparedStatement = _
//
// private var resultSet: ResultSet = _
//
// def getCardEventTableNames = {
// var data = new ArrayBuffer[String]()
// val conn = DBUtil.getConnection
// try {
// val stat = conn.createStatement()
// val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")
//
// while (rs.next()) {
// val tableName = rs.getString(1)
//
// data += tableName
//
// }
// rs.close()
// stat.close()
// data
// } catch {
// case t: Exception =>
// t.printStackTrace()
// println("获取统计表表名时候出错.")
// } finally {
//
// try {
// if (conn != null) {
// conn.close()
//
// }
//
// } catch {
// case t: Exception =>
// t.printStackTrace()
// }
//
// }
// data
//
// }
//
//
// def getFeedsConnection: Connection ={
// DBUtil.getConnection
// }
//
//
//
// // 数据库驱动类
//
// private val driverClass: String = ConfigurationManager.getProperty("driverClass")
// // 数据库连接地址
// private val url: String = ConfigurationManager.getProperty("feeds.jdbcUrl")
//
// // 数据库连接用户名
// private val username: String = ConfigurationManager.getProperty("feeds.user")
// // 数据库连接密码
// private val password: String = ConfigurationManager.getProperty("feeds.password")
//
// // 加载数据库驱动
// // Class.forName(driverClass)
// Class.forName("com.mysql.jdbc.Driver")
//
// // 连接池大小
// val poolSize: Int = ConfigurationManager.getProperty("poolSize").toInt
//
// // 连接池 - 同步队列
// private val pool: BlockingQueue[Connection] = new LinkedBlockingQueue[Connection]()
//
// /**
// * 初始化连接池
// */
// for (i <- 1 to poolSize) {
// DBUtil.pool.put(DriverManager.getConnection(url, username, password))
// }
//
// /**
// * 从连接池中获取一个Connection
// * @return
// */
// private def getConnection: Connection = {
// pool.take()
// }
//
// /**
// * 向连接池归还一个Connection
// * @param conn
// */
// private def returnConnection(conn: Connection): Unit = {
// DBUtil.pool.put(conn)
// }
//
// /**
// * 启动守护线程释放资源
// */
// def releaseResource() = {
// val thread = new Thread(new CloseRunnable)
// thread.setDaemon(true)
// thread.start()
// }
//
// /**
// * 关闭连接池连接资源类
// */
// class CloseRunnable extends Runnable {
// override def run(): Unit = {
// while (DBUtil.pool.size > 0) {
// try {
// // println(s"当前连接池大小: ${DBUtil.pool.size}")
// DBUtil.pool.take().close()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// }

}

Kafka生产者详解

一、生产者发送消息的过程

首先介绍一下 Kafka 生产者发送消息的过程:

Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。

alt

二、创建生产者

2.1 项目依赖
本项目采用 Maven 构建,想要调用 Kafka 生产者 API,需要导入 kafka-clients 依赖,如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>

2.2 创建生产者
创建 Kafka 生产者时,以下三个属性是必须指定的:

bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
key.serializer :指定键的序列化器;
value.serializer :指定值的序列化器。

创建的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SimpleProducer {

public static void main(String[] args) {

String topicName = "Hello-Kafka";

Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/*创建生产者*/
Producer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i,
"world" + i);
/* 发送消息*/
producer.send(record);
}
/*关闭生产者*/
producer.close();
}
}

2.3 测试

  1. 启动Kakfa
    Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:
1
2
3
4
5
# zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties

启动单节点 kafka 用于测试:

1
# bin/kafka-server-start.sh config/server.properties
  1. 创建topic
    1
    2
    3
    4
    5
    6
    7
    8
    # 创建用于测试主题
    bin/kafka-topics.sh --create \
    --bootstrap-server hadoop001:9092 \
    --replication-factor 1 --partitions 1 \
    --topic Hello-Kafka

    # 查看所有主题
    bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
  2. 运行项目
    此时可以看到消费者控制台,输出如下,这里 kafka-console-consumer 只会打印出值信息,不会打印出键信息。
    alt

2.4 可能出现的问题
在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动 Kafka 的情况下,此时需要对 server.properties 文件中的 listeners 配置进行更改

1
2
# hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://hadoop001:9092

二、发送消息

上面的示例程序调用了 send 方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。

2.1 同步发送
在调用 send 方法后可以接着调用 get() 方法,send 方法的返回值是一个 Future对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
for (int i = 0; i < 10; i++) {
try {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*同步发送消息*/
RecordMetadata metadata = producer.send(record).get();
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 Hello-Kafka 主题时候,使用 –partitions 指定其分区数为 1,即只有一个分区。

1
2
3
4
5
6
7
8
9
10
topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41
topic=Hello-Kafka, partition=0, offset=42
topic=Hello-Kafka, partition=0, offset=43
topic=Hello-Kafka, partition=0, offset=44
topic=Hello-Kafka, partition=0, offset=45
topic=Hello-Kafka, partition=0, offset=46
topic=Hello-Kafka, partition=0, offset=47
topic=Hello-Kafka, partition=0, offset=48
topic=Hello-Kafka, partition=0, offset=49

2.2 异步发送
通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
/*异步发送消息,并监听回调*/
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("进行异常处理");
} else {
System.out.printf("topic=%s, partition=%d, offset=%s \n",
metadata.topic(), metadata.partition(), metadata.offset());
}
}
});
}

三、自定义分区器

Kafka 有着默认的分区机制:

如果键值为 null, 则使用轮询 (Round Robin) 算法将消息均衡地分布到各个分区上;
如果键值不为 null,那么 Kafka 会使用内置的散列算法对键进行散列,然后分布到各个分区上。
某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例

3.1 自定义分区器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 自定义分区器
*/
public class CustomPartitioner implements Partitioner {

private int passLine;

@Override
public void configure(Map<String, ?> configs) {
/*从生产者配置中获取分数线*/
passLine = (Integer) configs.get("pass.line");
}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
/*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/
return (Integer) key >= passLine ? 1 : 0;
}

@Override
public void close() {
System.out.println("分区器关闭");
}
}

需要在创建生产者时指定分区器,和分区器所需要的配置参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ProducerWithPartitioner {

public static void main(String[] args) {

String topicName = "Kafka-Partitioner-Test";

Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

/*传递自定义分区器*/
props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
/*传递分区器所需的参数*/
props.put("pass.line", 6);

Producer<Integer, String> producer = new KafkaProducer<>(props);

for (int i = 0; i <= 10; i++) {
String score = "score:" + i;
ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
/*异步发送消息*/
producer.send(record, (metadata, exception) ->
System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
}

producer.close();
}
}

3.2 测试
需要创建一个至少有两个分区的主题:

1
2
3
4
bin/kafka-topics.sh --create \
--bootstrap-server hadoop001:9092 \
--replication-factor 1 --partitions 2 \
--topic Kafka-Partitioner-Test

此时输入如下,可以看到分数大于等于 6 分的都被分到 1 分区,而小于 6 分的都被分到了 0 分区。

1
2
3
4
5
6
7
8
9
10
11
12
score:6, partition=1, 
score:7, partition=1,
score:8, partition=1,
score:9, partition=1,
score:10, partition=1,
score:0, partition=0,
score:1, partition=0,
score:2, partition=0,
score:3, partition=0,
score:4, partition=0,
score:5, partition=0,
分区器关闭

四、生产者其他属性

上面生产者的创建都仅指定了服务地址,键序列化器、值序列化器,实际上 Kafka 的生产者还有很多可配置属性,如下:

  1. acks
    acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:

acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
2. buffer.memory
设置生产者内存缓冲区的大小。

  1. compression.type
    默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有 snappy,gzip,lz4。

  2. retries
    发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。

  3. batch.size
    当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

  4. linger.ms
    该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。

  5. clent.id
    客户端 id,服务器用来识别消息的来源。

  6. max.in.flight.requests.per.connection
    指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1 可以保证消息是按照发送的顺序写入服务器,即使发生了重试。

  7. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms
    timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;
    request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;
    metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。

  8. max.block.ms
    指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

  9. max.request.size
    该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K ,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。

  10. receive.buffer.bytes & send.buffer.byte
    这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。

Kafka消费者详解

一、消费者和消费者群组

在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段。
alt
需要注意的是:同一个分区只能被同一个消费者群组里面的一个消费者读取,不可能存在同一个分区被同一个消费者群里多个消费者共同读取的情况,如图:
alt

可以看到即便消费者 Consumer5 空闲了,但是也不会去读取任何一个分区的数据,这同时也提醒我们在使用时应该合理设置消费者的数量,以免造成闲置和额外开销。

二、分区再均衡

因为群组里的消费者共同读取主题的分区,所以当一个消费者被关闭或发生崩溃时,它就离开了群组,原本由它读取的分区将由群组里的其他消费者来读取。同时在主题发生变化时 , 比如添加了新的分区,也会发生分区与消费者的重新分配,分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。正是因为再均衡,所以消费费者群组才能保证高可用性和伸缩性。

消费者通过向群组协调器所在的 broker 发送心跳来维持它们和群组的从属关系以及它们对分区的所有权。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发再均衡。

三、创建Kafka消费者

在创建消费者的时候以下以下三个选项是必选的:

bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
key.deserializer :指定键的反序列化器;
value.deserializer :指定值的反序列化器。

除此之外你还需要指明你需要想订阅的主题,可以使用如下两个 API :

consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合。

最后只需要通过轮询 API(poll) 向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只需要关注从分区返回的数据,然后进行业务处理。 示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

/*订阅主题 (s)*/
consumer.subscribe(Collections.singletonList(topic));

try {
while (true) {
/*轮询获取数据*/
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} finally {
consumer.close();
}

三、 自动提交偏移量

3.1 偏移量的重要性
Kafka 的每一条消息都有一个偏移量属性,记录了其在分区中的位置,偏移量是一个单调递增的整数。消费者通过往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。 如果消费者一直处于运行状态,那么偏移量就没有 什么用处。不过,如果有消费者退出或者新分区加入,此时就会触发再均衡。完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。 因为这个原因,所以如果不能正确提交偏移量,就可能会导致数据丢失或者重复出现消费,比如下面情况:

如果提交的偏移量小于客户端处理的最后一个消息的偏移量 ,那么处于两个偏移量之间的消息就会被重复消费;
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

3.2 自动提交偏移量
Kafka 支持自动提交和手动提交偏移量两种方式。这里先介绍比较简单的自动提交:

只需要将消费者的 enable.auto.commit 属性配置为 true 即可完成自动提交的配置。 此时每隔固定的时间,消费者就会把 poll() 方法接收到的最大偏移量进行提交,提交间隔由 auto.commit.interval.ms 属性进行配置,默认值是 5s。

使用自动提交是存在隐患的,假设我们使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。基于这个原因,Kafka 也提供了手动提交偏移量的 API,使得用户可以更为灵活的提交偏移量。

四、手动提交偏移量

用户可以通过将 enable.auto.commit 设为 false,然后手动提交偏移量。基于用户需求手动提交偏移量可以分为两大类:
手动提交当前偏移量:即手动提交当前轮询的最大偏移量;
手动提交固定偏移量:即按照业务需求,提交某一个固定的偏移量。

而按照 Kafka API,手动提交偏移量又可以分为同步提交和异步提交。
4.1 同步提交
通过调用 consumer.commitSync() 来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。

1
2
3
4
5
6
7
8
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
/*同步提交*/
consumer.commitSync();
}

如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量。基于这个原因,Kafka 还提供了异步提交的 API。

4.2 异步提交
异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待 Broker 的响应。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
/*异步提交并定义回调*/
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.out.println("错误处理");
offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
x.topic(), x.partition(), y.offset()));
}
}
});
}

异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序同时提交了 200 和 300 的偏移量,此时 200 的偏移量失败的,但是紧随其后的 300 的偏移量成功了,此时如果重试就会存在 200 覆盖 300 偏移量的可能。同步提交就不存在这个问题,因为在同步提交的情况下,300 的提交请求必须等待服务器返回 200 提交请求的成功反馈后才会发出。基于这个原因,某些情况下,需要同时组合同步和异步两种提交方式。

注:虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试的,你可以通过一个 Map<TopicPartition, Integer> offsets 来维护你提交的每个分区的偏移量,然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。

4.3 同步加异步提交
下面这种情况,在正常的轮询中使用异步提交来保证吞吐量,但是因为在最后即将要关闭消费者了,所以此时需要用同步提交来保证最大限度的提交成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
}
// 异步提交
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
// 因为即将要关闭消费者,所以要用同步提交保证提交成功
consumer.commitSync();
} finally {
consumer.close();
}
}

4.4 提交特定偏移量
在上面同步和异步提交的 API 中,实际上我们都没有对 commit 方法传递参数,此时默认提交的是当前轮询的最大偏移量,如果你需要提交特定的偏移量,可以调用它们的重载方法。

1
2
3
4
/*同步提交特定偏移量*/
commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
/*异步提交特定偏移量*/
commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

需要注意的是,因为你可以订阅多个主题,所以 offsets 中必须要包含所有主题的每个分区的偏移量,示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
/*记录每个主题的每个分区的偏移量*/
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData");
/*TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加*/
offsets.put(topicPartition, offsetAndMetadata);
}
/*提交特定偏移量*/
consumer.commitAsync(offsets, null);
}
} finally {
consumer.close();
}

五、监听分区再均衡

因为分区再均衡会导致分区与消费者的重新划分,有时候你可能希望在再均衡前执行一些操作:比如提交已经处理但是尚未提交的偏移量,关闭数据库连接等。此时可以在订阅主题时候,调用 subscribe 的重载方法传入自定义的分区再均衡监听器。

1
2
3
4
 /*订阅指定集合内的所有主题*/
subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
/*使用正则匹配需要订阅的主题*/
subscribe(Pattern pattern, ConsumerRebalanceListener listener)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
/*该方法会在消费者停止读取消息之后,再均衡开始之前就调用*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("再均衡即将触发");
// 提交已经处理的偏移量
consumer.commitSync(offsets);
}

/*该方法会在重新分配分区之后,消费者开始读取消息之前被调用*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

}
});

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData");
/*TopicPartition 重写过 hashCode 和 equals 方法,所以能够保证同一主题和分区的实例不会被重复添加*/
offsets.put(topicPartition, offsetAndMetadata);
}
consumer.commitAsync(offsets, null);
}
} finally {
consumer.close();
}

六 、退出轮询

Kafka 提供了 consumer.wakeup() 方法用于退出轮询,它通过抛出 WakeupException 异常来跳出循环。需要注意的是,在退出线程时最好显示的调用 consumer.close() , 此时消费者会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡 ,而不需要等待会话超时。

下面的示例代码为监听控制台输出,当输入 exit 时结束轮询,关闭消费者并退出程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*调用 wakeup 优雅的退出*/
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
/*等待主线程完成提交偏移量、关闭消费者等操作*/
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> rd : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset());
}
}
} catch (WakeupException e) {
//对于 wakeup() 调用引起的 WakeupException 异常可以不必处理
} finally {
consumer.close();
System.out.println("consumer 关闭");
}

七、独立的消费者

因为 Kafka 的设计目标是高吞吐和低延迟,所以在 Kafka 中,消费者通常都是从属于某个群组的,这是因为单个消费者的处理能力是有限的。但是某些时候你的需求可能很简单,比如可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据,这个时候就不需要消费者群组和再均衡了, 只需要把主题或者分区分配给消费者,然后开始读取消息井提交偏移量即可。

在这种情况下,就不需要订阅主题, 取而代之的是消费者为自己分配分区。 一个消费者可以订阅主题(井加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。 分配分区的示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

/*可以指定读取哪些分区 如这里假设只读取主题的 0 分区*/
for (PartitionInfo partition : partitionInfos) {
if (partition.partition()==0){
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
}

// 为消费者指定分区
consumer.assign(partitions);


while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<Integer, String> record : records) {
System.out.printf("partition = %s, key = %d, value = %s\n",
record.partition(), record.key(), record.value());
}
consumer.commitSync();
}

附录 : Kafka消费者可选属性

  1. fetch.min.byte
    消费者从服务器获取记录的最小字节数。如果可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。

  2. fetch.max.wait.ms
    broker 返回给消费者数据的等待时间,默认是 500ms。

  3. max.partition.fetch.bytes
    该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

  4. session.timeout.ms
    消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。

  5. auto.offset.reset
    该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:

latest (默认值) :在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的最新记录);
earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录。
6. enable.auto.commit
是否自动提交偏移量,默认值是 true。为了避免出现重复消费和数据丢失,可以把它设置为 false。

  1. client.id
    客户端 id,服务器用来识别消息的来源。

  2. max.poll.records
    单次调用 poll() 方法能够返回的记录数量。

  3. receive.buffer.bytes & send.buffer.byte
    这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。

Oozie任务调度使用详细代码

job.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
nameNode=hdfs://dev-bg-m01:8020
jobTracker=dev-bg-m01:8050
queueName=default
oozie.use.system.libpath=true
#oozie.libpath=/user/dmp_operator1/share/libs
jdbcURL=jdbc:hive2://dev-bg-m01:10000/default
jdbcPrincipal=hive/dev-bg-m01@DATASEA.COM
appsDir=user/dmp_operator1/OozieApps
dataDir=user/dmp_operator1/data
jdbcPrincipal=hive/dev-bg-m01@DATASEA.COM
jdbcURL=jdbc:hive2://dev-bg-m01:10000/default
oozie.coord.application.path=${nameNode}/${appsDir}/shop/t_order
#oozie.wf.application.path=${nameNode}/${appsDir}/shop/t_order
workflowAppUri=${nameNode}/${appsDir}/shop/t_order
start=2018-12-14T01:00+0800
end=2018-12-30T01:00+0800

getDate.sh

1
2
3
4
5
#!/bin/bash
last_day=`date -d last-day +%Y-%m-%d`
current_day=`date -d today +%Y-%m-%d`
echo start_day=$last_day
echo end_day=$current_day

coordinator.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<coordinator-app name="t_order_sqoop" frequency="40 11 * * *" start="${start}" end="${end}" timezone="GMT+0800"
xmlns="uri:oozie:coordinator:0.4">
<action>
<workflow>
<app-path>${workflowAppUri}</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>

workflow.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
<?xml version="1.0" encoding="UTF-8"?><!-- Licensed to the Apache Software 
Foundation (ASF) under one or more contributor license agreements. See the
NOTICE file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file to you under the Apache License,
Version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
the specific language governing permissions and limitations under the License. -->
<workflow-app xmlns="uri:oozie:workflow:0.5" name="dmp-base-jobs">

<credentials>
<credential name="hs2-creds" type="hive2">
<property>
<name>hive2.server.principal</name>
<value>${jdbcPrincipal}</value>
</property>
<property>
<name>hive2.jdbc.url</name>
<value>${jdbcURL}</value>
</property>
</credential>
</credentials>


<start to="shell-date" />
<action name="shell-date">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>2048</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx1638m</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>2048</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx1638m</value>
</property>
</configuration>
<exec>getDate.sh</exec>
<file>getDate.sh</file>
<capture-output />
</shell>
<ok to="appForkStart" />
<error to="fail" />
</action>
<fork name="appForkStart">
<path start="t_dynamic_info_import" />
</fork>
<action name="t_dynamic_info_import">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>5120</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4096m</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>5120</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx4096m</value>
</property>
</configuration>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://192.168.3.119/jt_ai_forum?tinyInt1isBit=false</arg>
<arg>--username</arg>
<arg>root</arg>
<arg>--password</arg>
<arg>Xingrui@DCDB123</arg>
<arg>--driver</arg>
<arg>com.mysql.jdbc.Driver</arg>
<arg>--query</arg>
<arg>select
dynamic_id,company_id,card_id,dynamic_type,title,content,images,video_style,video_url,is_pub,is_top,pub_time,gmt_create,gmt_modified,old_db_id,programa_id
from t_dynamic_info WHERE 1=1 and $CONDITIONS
</arg>
<arg>--hive-drop-import-delims</arg>
<arg>--null-string</arg>
<arg>\\N</arg>
<arg>--null-non-string</arg>
<arg>\\N</arg>
<arg>--delete-target-dir</arg>
<arg>--target-dir</arg>
<arg>/apps/hive/warehouse/ods.db/ods_t_dynamic_info/dt=${(wf:actionData('shell-date')['start_date'])}
</arg>
<arg>--fields-terminated-by</arg>
<arg>\001</arg>
<arg>-m</arg>
<arg>1</arg>
</sqoop>
<ok to="appForkEnd" />
<error to="fail" />
</action>

<join name="appForkEnd" to="DWD_APP_USER_INCREASED_RECORD_ETL" />

<action name="DWD_APP_USER_INCREASED_RECORD_ETL">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.memory.mb</name>
<value>5120</value>
</property>
<property>
<name>oozie.launcher.mapreduce.map.java.opts</name>
<value>-Xmx4096m</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name>
<value>5120</value>
</property>
<property>
<name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx4096m</value>
</property>
</configuration>
<exec>DWD/DWD_APP_USER_INCREASED_RECORD/DWD_APP_USER_INCREASED_RECORD_ETL.sh
</exec>
<argument>${(wf:actionData('shell-date')['start_date'])}</argument>
<argument>${(wf:actionData('shell-date')['end_date'])}</argument>
<file>DWD/DWD_APP_USER_INCREASED_RECORD/DWD_APP_USER_INCREASED_RECORD_ETL.sh
</file>
<capture-output />
</shell>
<ok to="bdl_company_info_etl" />
<error to="fail" />
</action>
<kill name="fail">
<message>Shell action failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end" />
</workflow-app>

Apache Kylin

一、kylin解决了什么关键问题?

Apache Kylin的初衷就是解决千亿、万亿条记录的秒级查询问题,其中的关键就是打破查询时间随着数据量呈线性增长的这一规律。
大数据OLAP,我们可以注意到两个事实:
• 大数据查询要的一般是统计结果,是多条记录经过聚合函数计算后的统计值。原始的记录则不是必需的,或者被访问的频率和概率极低。
• 聚合是按维度进行的,而维度的聚合可能性是有限的,一般不随数据的膨胀而线性增长。
基于以上两点,我们得到一个新的思路——“预计算”。应尽量多地预先计算聚合结果,在查询时刻也尽量使用预计算的结果得出查询结果,从而避免直接扫描可能无限增长的原始记录。
举例来说,要用下面的SQL来查询10月1日那天销量最高的商品。

1
SELECT item, SUM(sell_amount) FROM sell_details WHERE sell_date='2016-10-01' GROUP BY item ORDER BY SUM(sell_amount) DESC

传统的方法需要扫描所有的记录,找到10月1日的销售记录,然后按商品聚合销售额,最后排序返回。

假如10月1日有1亿条交易,那么查询必需读取并累计至少1亿条记录,且查询速度会随将来销量的增加而逐步下降,如果日交易量提高至2亿条,那查询执行的时间可能会增加一倍

而预计算的方法则会事先按维度[sell_date,item]计算SUM(sell_amount)并将其存储下来,在查询时找到10月1日的销售商品就可以直接排序返回了。

读取的记录数最大不超过维度[sell_date,item]的组合数。显然这个数字将远远小于实际的销售记录,比如10月1日的1亿条交易包含了100万种商品,那么预计算后就只有100万条记录了,是原来的百分之一。并且这些记录是已经按商品聚合的结果,省去了运行时的聚合运算。从未来的发展来看,查询速度只会随日期和商品数目的增长而变化,与销售记录总数不再有直接联系。假如日交易量提高一倍到2亿,但只要商品总数不变,那么预计算的结果记录总数就不会变,查询的速度也不会变。

“预计算”就是Kylin在“大规模并行处理”和“列式存储”之外,提供给大数据分析的第三个关键技术。

二、kylin工作原理

2.1 维度和度量简介
在说明MOLAP Cube之前,需要先介绍一下维度(dimension)和度量(measure)这两个概念。

简单来讲,维度就是观察数据的角度。比如电商的销售数据,可以从时间的维度来观察(如图1-2的左图所示),也可以进一步细化从时间和地区的维度来观察(如图1-2的右图所示)。

维度一般是一组离散的值,比如时间维度上的每一个独立的日期,或者商品维度上的每一件独立的商品。因此,统计时可以把维度值相同的记录聚合起来,应用聚合函数做累加、平均、去重复计数等聚合计算。

alt

度量就是被聚合的统计值,也是聚合运算的结果,它一般是连续值,如图1-2中的销售额,抑或是销售商品的总件数。通过比较和测算度量,分析师可以对数据进行评估,比如今年的销售额相比去年有多大的增长、增长的速度是否达到预期、不同商品类别的增长比例是否合理等。

2.2 Cube和Cuboid
了解了维度和度量,就可以对数据表或者数据模型上的所有字段进行分类了,它们要么是维度,要么是度量(可以被聚合)。于是就有了根据维度、度量做预计算的Cube理论。

给定一个数据模型,我们可以对其上所有维度进行组合。对于N个维度来说,所有组合的可能性有2N种。对每一种维度的组合,将度量做聚合运算,运算的结果保存为一个物化视图,称为Cuboid。将所有维度组合的Cuboid作为一个整体,被称为Cube。所以简单来说,一个Cube就是许多按维度聚合的物化视图的集合。

举一个具体的例子。假定有一个电商的销售数据集,其中维度有时间(Time)、商品(Item)、地点(Location)和供应商(Supplier),度量有销售额(GMV)。那么,所有维度的组合就有24=16种(如图),比如一维度(1D)的组合有[Time][Item][Location][Supplier]四种;二维度(2D)的组合有[Time,Item][Time,Location][Time、Supplier][Item,Location][Item,Supplier][Location,Supplier]六种;三维度(3D)的组合也有四种;最后,零维度(0D)和四维度(4D)的组合各有一种,共计16种组合。

计算Cuboid,就是按维度来聚合销售额(GMV)。如果用SQL来表达计算Cuboid[Time,Location],那就是:

1
select Time, Location, Sum(GMV) as GMV from Sales group by Time, Location

alt

将计算的结果保存为物化视图,所有Cuboid物化视图的总称就是Cube了。

2.3 工作原理
Apache Kylin的工作原理就是对数据模型做Cube预计算,并利用计算的结果加速查询。过程如下:

(1)指定数据模型,定义维度和度量。

(2)预计算Cube,计算所有Cuboid并将其保存为物化视图。

(3)执行查询时,读取Cuboid,进行加工运算产生查询结果。

由于Kylin的查询过程不会扫描原始记录,而是通过预计算预先完成表的关联、聚合等复杂运算,并利用预计算的结果来执行查询,因此其速度相比非预计算的查询技术一般要快一个到两个数量级。并且在超大数据集上其优势更明显。当数据集达到千亿乃至万亿级别时,Kylin的速度甚至可以超越其他非预计算技术1000倍以上。

三、kylin技术架构

Apache Kylin系统可以分为在线查询和离线构建两部分,其技术架构如图所示。在线查询主要由上半区组成,离线构建在下半区。

先看离线构建的部分。从图1-4中可以看到,数据源在左侧,目前主要是Hadoop、Hive、Kafka和RDBMS,其中保存着待分析的用户数据。

根据元数据定义,下方构建引擎从数据源中抽取数据,并构建Cube。

数据以关系表的形式输入,且必须符合星形模型(Star Schema)或雪花模型(Snowflake Schema)。

用户可以选择使用MapReduce或Spark进行构建。

构建后的Cube保存在右侧的存储引擎中,目前HBase是默认的存储引擎。

alt
完成离线构建后,用户可以从上方查询系统发送SQL来进行查询分析。Kylin提供了多样的REST API、JDBC/ODBC接口。无论从哪个接口进入,最终SQL都会来到REST服务层,再转交给查询引擎进行处理。这里需要注意的是,SQL语句是基于数据源的关系模型书写的,而不是Cube。Kylin在设计时刻意对查询用户屏蔽了Cube的概念,分析师只需要理解简单的关系模型就可以使用Kylin,没有额外的学习门槛,传统的SQL应用也更容易迁移。查询引擎解析SQL,生成基于关系表的逻辑执行计划,然后将其转译为基于Cube的物理执行计划,最后查询预计算生成的Cube产生结果。整个过程不访问原始数据源。

  注意 对于查询引擎下方的路由选择,在最初设计时考虑过将Kylin不能执行的查询引导到Hive中继续执行。但在实践后发现Hive与Kylin的执行速度差异过大,导致用户无法对查询的速度有一致的期望,大多语句很可能查询几秒就返回了,而有些要等几分钟到几十分钟,用户体验非常糟糕。最后这个路由功能在发行版中默认被关闭。

Apache Kylin v1.5版本引入了“可扩展架构”的概念。图1-4所示为Rest Server、Cube Build Engine和数据源表示的抽象层。可扩展是指Kylin可以对其三个主要依赖模块——数据源、构建引擎和存储引擎,做任意的扩展和替换。在设计之初,作为Hadoop家族的一员,这三者分别是Hive、MapReduce和HBase。但随着Apache Kylin的推广和使用的深入,用户发现它们存在不足之处。

比如,实时分析可能会希望从Kafka导入数据而不是从Hive;而Spark的迅速崛起,又使我们不得不考虑将MapReduce替换为Spark以提高Cube的构建速度;至于HBase,它的读性能可能不如Cassandra等。可见,是否可以将某种技术替换为另一种技术已成为一个常见的问题。于是,我们对Apache Kylin v1.5版本的系统架构进行了重构,将数据源、构建引擎、存储引擎三大主要依赖模块抽象为接口,而Hive、MapReduce、HBase只是默认实现。其他实现还有:数据源还可以是Kafka、Hadoop或RDBMS;构建引擎还可以是Spark、Flink。资深用户可以根据自己的需要做二次开发,将其中的一个或者多个技术替换为更适合自身需要的技术。

这也为Kylin技术的与时俱进奠定了基础。如果将来有更先进的分布式计算技术可以取代MapReduce,或者有更高效的存储系统全面超越了HBase,Kylin可以用较小的代价将一个子系统替换掉,从而保证Kylin紧跟技术发展的最新潮流,保持最高的技术水平。

可扩展架构也带来了额外的灵活性,比如,它可以允许多个引擎并存。例如,Kylin可以同时对接Hive、Kafka和其他第三方数据源;抑或用户可以为不同的Cube指定不同的构建引擎或存储引擎,以期达到极致的性能和功能定制。

四、kylin特点

Apache Kylin的主要特点包括:
支持SQL接口
支持超大数据集
秒级响应
可伸缩性
高吞吐率
BI及可视化工具集成

Spark 累加器与广播变量

一、简介

在 Spark 中,提供了两种类型的共享变量:累加器 (accumulator) 与广播变量 (broadcast variable):

累加器:用来对信息进行聚合,主要用于累计计数等场景;
广播变量:主要用于在节点间高效分发大对象。

二、累加器

这里先看一个具体的场景,对于正常的累计求和,如果在集群模式中使用下面的代码进行计算,会发现执行结果并非预期

1
2
3
4
var counter = 0
val data = Array(1, 2, 3, 4, 5)
sc.parallelize(data).foreach(x => counter += x)
println(counter)

counter 最后的结果是 0,导致这个问题的主要原因是闭包。
alt

2.1 理解闭包

  1. Scala 中闭包的概念

这里先介绍一下 Scala 中关于闭包的概念:

1
2
var more = 10
val addMore = (x: Int) => x + more

如上函数 addMore 中有两个变量 x 和 more:

x : 是一个绑定变量 (bound variable),因为其是该函数的入参,在函数的上下文中有明确的定义;
more : 是一个自由变量 (free variable),因为函数字面量本生并没有给 more 赋予任何含义。
按照定义:在创建函数时,如果需要捕获自由变量,那么包含指向被捕获变量的引用的函数就被称为闭包函数。

  1. Spark 中的闭包

在实际计算时,Spark 会将对 RDD 操作分解为 Task,Task 运行在 Worker Node 上。在执行之前,Spark 会对任务进行闭包,如果闭包内涉及到自由变量,则程序会进行拷贝,并将副本变量放在闭包中,之后闭包被序列化并发送给每个执行者。因此,当在 foreach 函数中引用 counter 时,它将不再是 Driver 节点上的 counter,而是闭包中的副本 counter,默认情况下,副本 counter 更新后的值不会回传到 Driver,所以 counter 的最终值仍然为零。

需要注意的是:在 Local 模式下,有可能执行 foreach 的 Worker Node 与 Diver 处在相同的 JVM,并引用相同的原始 counter,这时候更新可能是正确的,但是在集群模式下一定不正确。所以在遇到此类问题时应优先使用累加器。

累加器的原理实际上很简单:就是将每个副本变量的最终值传回 Driver,由 Driver 聚合后得到最终值,并更新原始变量。
alt

2.2 使用累加器
SparkContext 中定义了所有创建累加器的方法,需要注意的是:被中横线划掉的累加器方法在 Spark 2.0.0 之后被标识为废弃。

alt

使用示例和执行结果分别如下:

1
2
3
4
5
6
val data = Array(1, 2, 3, 4, 5)
// 定义累加器
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(data).foreach(x => accum.add(x))
// 获取累加器的值
accum.value

alt

三、广播变量

在上面介绍中闭包的过程中我们说道每个 Task 任务的闭包都会持有自由变量的副本,如果变量很大且 Task 任务很多的情况下,这必然会对网络 IO 造成压力,为了解决这个情况,Spark 提供了广播变量。

广播变量的做法很简单:就是不把副本变量分发到每个 Task 中,而是将其分发到每个 Executor,Executor 中的所有 Task 共享一个副本变量。

1
2
3
4
// 把一个数组定义为一个广播变量
val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5))
// 之后用到该数组时应优先使用广播变量,而不是原值
sc.parallelize(broadcastVar.value).map(_ * 10).collect()
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量