Flink使用assignAscendingTimestamps 生成水印的三个重载方法

先简单介绍一下Timestamp 和Watermark 的概念:

  1. Timestamp和Watermark都是基于事件的时间字段生成的
  2. Timestamp和Watermark是两个不同的东西,并且一旦生成都跟事件数据没有关系了(所有即使事件中不再包含生成Timestamp和Watermark的字段也没关系)
  3. 事件数据和 Timestamp 一一对应(事件在流中传递以StreamRecord对象表示,value 和 timestamp 是它的两个成员变量)
  4. Watermark 在生成之后与事件数据没有直接关系,Watermark 作为一个消息,和事件数据一样在流中传递(Watermark 和StreamRecord 具有相同的父类:StreamElement)
  5. Timestamp 与 Watermark 在生成之后,会在下游window算子中做比较,判断事件数据是否是过期数据
  6. 只有window算子才会用Watermark判断事件数据是否过期

assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]

此方法是数据流的快捷方式,其中已知元素时间戳在每个并行流中单调递增。在这种情况下,系统可以通过跟踪上升时间戳自动且完美地生成水印。

1
2
3
4
5
6
7
8
9
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// flink auto create timestamp & watermark
.assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

注:这种方法创建时间戳与水印最简单,返回一个long类型的数字就可以了

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

基于给定的水印生成器生成水印,即使没有新元素到达也会定期检查给定水印生成器的新水印,以指定允许延迟时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks periodically(定期生成水印)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
override def extractTimestamp(element: LateDataEvent): Long = {
println("want watermark : " + sdf.parse(element.createTime).getTime)
sdf.parse(element.createTime).getTime
}
})

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

此方法仅基于流元素创建水印,对于通过[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]处理的每个元素,
调用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大于以前的水印,会发出新的水印,
此方法可以完全控制水印的生成,但是要注意,每秒生成数百个水印会影响性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks every event
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
// check extractTimestamp emitted watermark is non-null and large than previously
override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
// generate next watermark
override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
val eventTime = sdf.parse(element.createTime).getTime
eventTime
}
})

转载于

Flink使用SQL操作几种类型的window

Flink SQL 支持三种窗口类型, 分别为 Tumble Windows / HOP Windows 和 Session Windows. 其中 HOP windows 对应 Table API 中的 Sliding Window, 同时每种窗口分别有相应的使用场景和方法

Tumble Window(翻转窗口)
Hop Window(滑动窗口)
Session Window(会话窗口)

HOPWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class HOPWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// HOP(time_attr, interval1, interval2)
// interval1 滑动长度
// interval2 窗口长度
Table result = tEnv.sqlQuery("SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start," +
"HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}

}

TumbleWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class TumbleWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
"TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}


}

SessionWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class SessionWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),

/* Start Session */
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),

/* Start Session */
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// SESSION(time_attr, interval)
// interval 表示两条数据触发session的最大间隔
Table result = tEnv.sqlQuery("SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start," +
"SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}
}

AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

AppendStreamTableSink: 可将动态表转换为Append流。适用于动态表只有Insert的场景。

RetractStreamTableSink: 可将动态表转换为Retract流。适用于动态表有Insert、Delete、Update的场景。

UpsertStreamTableSink: 可将动态表转换为Upsert流。适用于动态表有Insert、Delete、Update的场景。

注意:

RetractStreamTableSink中: Insert被编码成一条Add消息; Delete被编码成一条Retract消息;Update被编码成两条消息(先是一条Retract消息,再是一条Add消息),即先删除再增加。

UpsertStreamTableSink: Insert和Update均被编码成一条消息(Upsert消息); Delete被编码成一条Delete消息。

UpsertStreamTableSink和RetractStreamTableSink最大的不同在于Update编码成一条消息,效率上比RetractStreamTableSink高。

上述说的编码指的是动态表转换为DataStream时,表的增删改如何体现到DataStream上。

测试数据

1
2
3
4
5
6
7
8
9
10
11
12
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:03:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}

AppendStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class AppendStreamTableSink {

public static void main(String[] args) throws Exception {

// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
// props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
// props.setProperty("connector.version", "universal");
// DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer011<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

// DataStream<UserBrowseLog> browseStream=streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer010<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "eventTime", "eventType", "productID", "productPrice",
"eventTimeTimestamp" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.BIGINT() };
TableSink<Row> myAppendStreamTableSink = new MyAppendStreamTableSink(sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout", myAppendStreamTableSink);

// 5、连续查询

String sql = "insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'";
tableEnvironment.sqlUpdate(sql);

tableEnvironment.execute(AppendStreamTableSink.class.getSimpleName());

}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

private static class MyAppendStreamTableSink implements org.apache.flink.table.sinks.AppendStreamTableSink<Row> {

private TableSchema tableSchema;

public MyAppendStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

@Override
public DataType getConsumedDataType() {
return tableSchema.toRowDataType();
}

// 已过时
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}

@Override
public DataStreamSink<Row> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new SinkFunction());
}

private static class SinkFunction extends RichSinkFunction<Row> {
public SinkFunction() {
}

@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println(value);
}
}

}

}

结果

1
2
3
4
5
6
user_1,2016-01-01 10:02:00,browse,product_5,20,1451613720000
user_1,2016-01-01 10:02:16,browse,product_5,20,1451613736000
user_1,2016-01-01 10:02:15,browse,product_5,20,1451613735000
user_1,2016-01-01 10:02:02,browse,product_5,20,1451613722000
user_1,2016-01-01 10:02:06,browse,product_5,20,1451613726000
user_1,2016-01-01 10:03:16,browse,product_5,20,1451613796000

RetractStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class RetractStreamTableSink {

public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.RetractStreamTableSink<Row> retractStreamTableSink = new MyRetractStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

/**
* 自定义RetractStreamTableSink
*
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* DataStream里的数据格式是Tuple2类型,如Tuple2<Boolean, Row>。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
private static class MyRetractStreamTableSink implements org.apache.flink.table.sinks.RetractStreamTableSink<Row> {

private TableSchema tableSchema;

public MyRetractStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

public TableSchema getTableSchema() {
return tableSchema;
}

// 已过时
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
}

// 最终会转换成DataStream处理
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}

public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if (flag) {
System.out.println("增加... " + value);
} else {
System.out.println("删除... " + value);
}
}
}
}

}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
增加... (true,user_2,1)
删除... (false,user_2,1)
增加... (true,user_2,2)
增加... (true,user_1,1)
删除... (false,user_1,1)
增加... (true,user_1,2)
删除... (false,user_1,2)
增加... (true,user_1,3)
删除... (false,user_1,3)
增加... (true,user_1,4)
删除... (false,user_1,4)
增加... (true,user_1,5)
删除... (false,user_1,5)
增加... (true,user_1,6)

UpsertStreamTableSink示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.RetractStreamTableSink.BrowseKafkaProcessFunction;
import com.shuai.test.model.UserBrowseLog;

public class UpsertStreamTableSink {
public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.UpsertStreamTableSink<Row> retractStreamTableSink = new MyUpsertStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}


/**
* 自定义UpsertStreamTableSink
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成一条Add消息。如Tuple2<True, Row>。
* 在SortLimit(即order by ... limit ...)的场景下,被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
@SuppressWarnings("unused")
private static class MyUpsertStreamTableSink implements org.apache.flink.table.sinks.UpsertStreamTableSink<Row> {

private TableSchema tableSchema;

@SuppressWarnings("unused")
public MyUpsertStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}


// 设置Unique Key
// 如上SQL中有GroupBy,则这里的唯一键会自动被推导为GroupBy的字段
@Override
public void setKeyFields(String[] keys) {}

// 是否只有Insert
// 如上SQL场景,需要Update,则这里被推导为isAppendOnly=false
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {}

// 最终会转换成DataStream处理

public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}


public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
/**
*
*/
private static final long serialVersionUID = 1L;

public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}


}

结果

1
2
3
4
5
6
7
8
增加... (true,user_1,1)
增加... (true,user_1,2)
增加... (true,user_1,3)
增加... (true,user_1,4)
增加... (true,user_1,5)
增加... (true,user_1,6)
增加... (true,user_2,1)
增加... (true,user_2,2)

Kafka to Flink - HDFS

kafka制造数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.flink.etl;

import lombok.AllArgsConstructor;
import lombok.ToString;

@lombok.Data
@ToString

public class Data {

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

private long timestamp;
private String event;
private String uuid;

public Data(long timestamp, String event, String uuid) {

this.timestamp=timestamp;
this.event=event;
this.uuid=uuid;
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.flink.etl;

import com.alibaba.fastjson.JSON;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;



public class DataGenerator {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.3.122:9092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
List<String> events = Arrays.asList("page_view", "adv_click", "thumbs_up");
Random random = new Random();
Data data = null;
ProducerRecord<String, String> record = null;
try {
while (true) {
long timestamp = System.currentTimeMillis();
String event = events.get(random.nextInt(events.size()));
String uuid = UUID.randomUUID().toString();
data = new Data(timestamp, event, uuid);
record = new ProducerRecord<>("data-collection-topic", JSON.toJSONString(data));
producer.send(record);
Thread.sleep(3000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}

}

StreamingJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.flink.etl;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"data-collection-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.create()
.withMaxPartSize(1024*1024*128) // 设置每个文件的最大大小 ,默认是128M。这里设置为128M
.withRolloverInterval(TimeUnit.MINUTES.toMillis(86400)) // 滚动写入新文件的时间,默认60s。这里设置为无限大
.withInactivityInterval(TimeUnit.MINUTES.toMillis(60)) // 60s空闲,就滚动写入新的文件
.build();
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("hdfs://DATASEA/home/dmp_operator1/bpointdata"), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner()).withRollingPolicy(rollingPolicy).build();
stream.addSink(sink);
env.enableCheckpointing(10_000);
env.setParallelism(1);
env.setStateBackend((StateBackend) new FsStateBackend("hdfs://DATASEA/home/dmp_operator1/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}

EventTimeBucketAssigner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.flink.etl;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
private ObjectMapper mapper = new ObjectMapper();

@Override
public String getBucketId(String element, Context context) {
JsonNode node = null;
long date = System.currentTimeMillis();
try {
node = mapper.readTree(element);
// date = (long) (node.path("timestamp").floatValue() * 1000);
date = (long) (node.path("timestamp").floatValue());
} catch (IOException e) {
e.printStackTrace();
}
// String partitionValue = new SimpleDateFormat("yyyyMMddHHmm").format(new Date(date));
String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));
return "dt=" + partitionValue;
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}

Druid原理和架构

Druid简介

概念:主要是解决低延迟下实时数据摄入与查询的平台,本质是一个数据存储,但是数据仍然是保存在(hdfs、文件系统等)中。
特点:
① 列式存储格式:
可以将列作为索引,为仅查看几列的查询提供了巨大的速度提升
② 高可用、高并发:
① 集群扩展、缩小、删除、宕机都不会停止服务,全天候运行
② HA、sql的并行化执行、可扩展、容灾等
③ 支持1000+的并发用户,并提供隔离机制支持多租户模式(多租户就是并发互不影响)
④ 低延迟
Druid采用了列式存储、倒排索引、位图索引等关键技术,能够在亚秒级别内完成海量数据的过滤、聚合以及多维分析等操作。
⑤ 存储时候聚合:
无论是实时数据消费还是批量数据处理,Druid在基于DataSource结构存储数据时即可选择对任意的指标列进行聚合操作
聚合:提前做好sum,count等操作

Druid架构

alt
总体可以分为:四个节点+三个依赖

四个节点:

实时节点(RealtimeNode)(新版本的druid好像没有实时节点的说法了):

实时摄入数据,对于旧的数据周期性的生成segment数据文件,上传到deepstorage中
为了避免单点故障,索引服务(Indexer)的主从架构已经逐渐替代了实时节点,所以现在的实时节点,其实里面包含了很多角色:
作用:可以通过索引服务的API,写数据导入任务,用以新增、删除、合并Segment等。是一个主从架构:

统治节点(overlord):

类似于YarnResourceManager:负责集群资源的管理和分配
监视数据服务器上的MiddleManager进程,将提取任务分配给MiddleManager

中间管理者(middlemanager):

类似于YarnNodeManager:负责单个节点资源的管理和分配
新数据提取到群集中的过程。他们负责从外部数据源读取并发布新的段

苦工(peon):

类似于Yarncontainer:负责具体任务的执行
Peon进程是由MiddleManagers产生的任务执行引擎。
每个Peon运行一个单独的JVM,并负责执行单个任务。
Peon总是与生成它们的MiddleManager在同一主机上运行

Router(路由:可选):

可在Druid代理,统治节点和协调器之前提供统一的API网关
注:统治节点和中间管理者的通信是通过zookeeper完成的

历史节点(HistoricalNode):

加载已生成的segment数据文件,以供数据查询
启动或者受到协调节点通知的时候,通过druid_rules表去查找需要加载的数据,然后检查自身的本地缓存中已存在的Segment数据文件,
然后从DeepStorage中下载其他不在本地的Segment数据文件,后加载到内存!!!再提供查询。

查询节点(BrokerNode):

对外提供数据查询服务,并同时从实时节点与历史节点查询数据,合并后返回给调用方
缓存:外部:第三方的一些缓存系统内部:在历史节点或者查询节点做缓存

协调节点(CoodinatorNode):

负责历史节点的数据负载均衡,以及通过规则(Rule)管理数据的生命周期
① 通过从MySQL读取元数据信息,来决定深度存储上哪些数据段应该在那个历史节点中被加载,
② 通过ZK感知历史节点,历史节点增加,会自动分配相关的Segment,历史节点删除,会将原本在这台节点上的Segment分配给其他的历史节点
注:Coordinator是定期运行的,并且运行间隔可以通过配置参数配置

三个依赖:

1)Mysql:

存储关于Druid中的metadata,规则数据,配置数据等,
主要包含以下几张表:
“druid_config”(通常是空的),
“druid_rules”(协作节点使用的一些规则信息,比如哪个segment从哪个node去load)
“druid_segments”(存储每个segment的metadata信息);

2)Deepstorage:

存储segments,Druid目前已经支持本地磁盘,NFS挂载磁盘,HDFS,S3等。

3)ZooKeeper:

① 查询节点通过Zk来感知实时节点和历史节点的存在,提供查询服务。
② 协调节点通过ZK感知历史节点,实现负载均衡
③ 统治节点、协调节点的lead选举

实时Segment数据文件的流动:

生成:

① 实时节点(中间管理者)会周期性的将同一时间段生成的数据合并成一个Segment数据文件,并上传到DeepStorage中。
② Segment数据文件的相关元数据信息保存到MetaStore中(如mysql,derby等)。
③ 协调节点定时(默认1分钟)从MetaSotre中获取到Segment数据文件的相关元信息后,将按配置的规则分配到符合条件的历史节点中。
④ 协调节点会通知一个历史节点去读
⑤ 历史节点收到协调节点的通知后,会从DeepStorage中拉取该Segment数据文件到本地磁盘,并通过zookeeper向集群声明可以提供查询了。
⑥ 实时节点会丢弃该Segment数据文件,并通过zookeeper向集群声明不在提供该Sgment的查询服务。              //其实第四步已经可以提供查询服务了
⑦ 而对于全局数据来说,查询节点(BrokerNode)会同时从实时节点与历史节点分别查询,对结果整合后返回用户。

查询:

查询首先进入Broker,按照时间进行查询划分
确定哪些历史记录和MiddleManager正在为这些段提供服务
Historical/MiddleManager进程将接受查询,对其进行处理并返回结果
###DataSource
alt
每个datasource按照时间划分。每个时间范围称为一个chunk(一般都是以天分区,则一个chunk为一天)!!!//也可以按其他属性划分
在chunk中数据被分为一个或多个segment,每个segment都是一个单独的文件,通常包含几百万行数据
注:这些segment是按照时间组织成的chunk,所以在按照时间查询数据时,效率非常高。

数据分区:

任何分布式存储/计算系统,都需要对数据进行合理的分区,从而实现存储和计算的均衡,以及数据并行化。
而Druid本身处理的是事件数据,每条数据都会带有一个时间戳,所以很自然的就可以使用时间进行分区。
为什么一个chunk中的数据包含多个segment!!!????原因就是二级分区

二级分区:

很可能每个chunk的数据量是不均衡的,而Duid为了解决这种问题,提供了“二级分区”,每一个二级分区称为一个Shard(分片)
其实chunk、datasource都是抽象的,实际的就是每个分区就是一个Shard,每个Shard只包含一个Segment!!!,因为Segment是Shard持久化的结果
Druid目前支持两种Shard策略:
Hash(基于维值的Hash)
Range(基于某个维度的取值范围)
譬如:
2000-01-01,2000-01-02中的每一个分区都是一个Shard
2000-01-02的数据量比较多,所以有两个Shard,分为partition0、partition1。每个分区都是一个Shard
Shard经过持久化之后就称为了Segment,Segment是数据存储、复制、均衡(Historical的负载均衡)和计算的基本单元了。
Segment具有不可变性,一个Segment一旦创建完成后(MiddleManager节点发布后)就无法被修改,
只能通过生成一个新的Segment来代替旧版本的Segment。

Segment内部存储结构:

Segment内部采用列式存储          //并不是说每列都是一个独立的文件,而是说每列有独立的数据结构,所有列都会存储在一个文件中
Segment中的数据类型主要分为三种:
时间戳
维度列
指标列
对于时间戳列和指标列,实际存储是一个数组
对于维度列不会像指标列和时间戳这么简单,因为它需要支持filter和groupby:
所以Druid使用了字典编码(DictionaryEncoding)和位图索引(BitmapIndex)来存储每个维度列。每个维度列需要三个数据结构:
1、需要一个字典数据结构,将维值(维度列值都会被认为是字符串类型)映射成一个整数ID。
2、使用上面的字典编码,将该列所有维值放在一个列表中。
3、对于列中不同的值,使用bitmap数据结构标识哪些行包含这些值。      //位图索引,这个需要记住
注:使用Bitmap位图索引可以执行快速过滤操作(找到符合条件的行号,以减少读取的数据量)
Druid针对维度列之所以使用这三个数据结构,是因为:
使用字典将字符串映射成整数ID,可以紧凑的表示结构2和结构3中的值。
使用Bitmap位图索引可以执行快速过滤操作(找到符合条件的行号,以减少读取的数据量),因为Bitmap可以快速执行AND和OR操作。
对于groupby和TopN操作需要使用结构2中的列值列表
实例:
1.使用字典将列值映射为整数
{
“JustinBieher”:0,
“ke$ha”:1
}
2.使用1中的编码,将列值放到一个列表中
[0,0,1,1]
3.使用bitmap来标识不同列值
value=0:[1,1,0,0]//1代表该行含有该值,0标识不含有
value=1:[0,0,1,1]
因为是一个稀疏矩阵,所以比较好压缩!!
Druid而且运用了RoaringBitmap能够对压缩后的位图直接进行布尔运算,可以大大提高查询效率和存储效率(不需要解压缩)

Segment命名:

如果一个Datasource下有几百万个Segment文件,我们又如何快速找出我们所需要的文件呢?答案就是通过文件名称快速索引查找。
Segment的命名包含四部分:
数据源(Datasource)、时间间隔(包含开始时间和结束时间两部分)、版本号和分区(Segment有分片的情况下才会有)。
eg:wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-09T10:06:02.498Z
wikipedia:Datasource名称
开始时间:2015-09-12T00:00:00.000Z//该Segment所存储最早的数据,时间格式是ISO8601
结束时间:2015-09-13T00:00:00.000Z//该segment所存储最晚的数据,时间格式是ISO8601
版本号:2019-09-09T10:06:02.498Z//此Segment的启动时间,因为Druid支持批量覆盖操作,
//当批量摄入与之前相同数据源、相同时间间隔数据时,数据就会被覆盖,这时候版本号就会被更新
分片号:从0开始,如果分区号为0,可以省略//分区的表现其实就是分目录
注:单机形式运行Druid,这样Druid生成的Segment文件都在${DRUID_HOME}/var/druid/segments目录下
注:为了保证Druid的查询效率,每个Segment文件的大小建议在300MB~700MB之间
注:版本号的意义:
在druid,如果您所做的只是追加数据,那么每个时间chunk只会有一个版本。
但是当您覆盖数据时,因为druid通过首先加载新数据(但不允许查询)来处理这个问题,一旦新数据全部加载,
切换所有新查询以使用这些新数据。然后它在几分钟后掉落旧段!!!

存储聚合

无论是实时数据消费还是批量数据处理,Druid在基于DataSource机构存储数据时即可选择对任意的指标列进行聚合操作:
1、基于维度列:相同的维度列数据会进行聚合
2、基于时间段:某一时间段的所有行会进行聚合,时间段可以通过queryGranularity参数指定
聚合:提前做好sum,count等操作

Segment生命周期:

在元数据存储中!每个Segment都会有一个used字段,标记该段是否能用于查询

is_Published:

当Segment构建完毕,就将元数据存储在元数据存储区中,此Segment为发布状态

is_available:

如果Segment当前可用于查询(实时任务或历史进程),则为true。

is_realtime:

如果是由实时任务产生的,那么会为true,但是一段时间之后,也会变为false

is_overshadowed:

标记该段是否已被其他段覆盖!处于此状态的段很快就会将其used标志自动设置为false。

Kafka 是如何保证数据可靠性和一致性

数据可靠性

Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从 Producter 往 Broker 发送消息、Topic 分区副本以及 Leader 选举几个角度介绍数据的可靠性。

Topic 分区副本

在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3。

Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

Producer 往 Broker 发送消息

如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:

acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka 。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。

acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 Leader,但在消息被复制到 follower 副本之前 Leader发生崩溃。

acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。

根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。
另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。

Leader 选举

在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的,具体可以参见《图文了解 Kafka 的副本复制机制》。只有 ISR 里的成员才有被选为 leader 的可能。

所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:

producer 级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync
topic 级别:设置 replication.factor>=3,并且 min.insync.replicas>=2;
broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;

数据一致性

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

Kafka 生产者分区策略

1)分区的原因
1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic
又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以 Partition 为单位读写了。
2)分区的原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition
数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。

消费者分区分配策略

Range 范围分区(默认的)

假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个
最后分配结果如下

C1:0,1,2,3
C2:4,5,6
C3:7,8,9

如果有11个分区将会是:

C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)

在这种情况下,C1多消费了两个分区

RoundRobin 轮询分区

把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer

假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)

有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)

分区将会按照一定的顺序排列起来,消费者将会组成一个环状的结构,然后开始轮询。
P0-0分配给C0
P0-1分配给C1但是C1并没订阅T0,于是跳过C1把P0-1分配给C2,
P0-2分配给C0
P1-0分配给C1,
P1-1分配给C0,
P2-0分配给C1,
P2-1分配给C2,
P2-2分配给C1,
p2-3分配给C2

C0: P0-0,P0-2,P1-1
C1:P1-0,P2-0,P2-2
C2:P0-1,P2-1,P2-3

Python数据质量检测

1. 重复值检查

1.1 什么是重复值
重复值的检查首先要明确一点,即重复值的定义。对于一份二维表形式的数据集来说,什么是重复值?主要有两个层次:
① 关键字段出现重复记录,比如主索引字段出现重复;
② 所有字段出现重复记录。
第一个层次是否是重复,必须从这份数据的业务含义进行确定。比如一张表,从业务上讲,一个用户应该只会有一条记录,那么如果某个用户出现了超过一条的记录,那么这就是重复值。第二个层次,就一定是重复值了。
1.2 重复值产生的原因
重复值的产生主要有两个原因,一是上游源数据造成的,二是数据准备脚本中的数据关联造成的。从数据准备角度来看,首先检查数据准备的脚本,判断使用的源表是否有重复记录,同时检查关联语句的正确性和严谨性,比如关联条件是否合理、是否有限定数据周期等等。
比如:检查源表数据是否重复的SQL:

1
2
3
SELECT MON_ID,COUNT(*),COUNT(DISTINCT USER_ID)
FROM TABLE_NAME
GROUP BY MON_ID;

如果是上游源数据出现重复,那么应该及时反映给上游进行修正;如果是脚本关联造成的,修改脚本,重新生成数据即可。
还有一份情况,这份数据集是一份单独的数据集,并不是在数据仓库中开发得到的数据,既没有上游源数据,也不存在生成数据的脚本,比如公开数据集,那么如何处理其中的重复值?一般的处理方式就是直接删除重复值。

1
2
3
4
5
6
7
8
import pandas as pd
dataset = pd.read_excel("/labcenter/python/dataset.xlsx")
#判断重复数据
dataset.duplicated() #全部字段重复是重复数据
dataset.duplicated(['col2']) #col2字段重复是重复数据
#删除重复数据
dataset.drop_duplicates() #全部字段重复是重复数据
dataset.drop_duplicates(['col2']) #col2字段重复是重复数据

2. 缺失值检查

缺失值主要是指数据集中部分记录存在部分字段的信息缺失。

2.1 缺失值出现的原因
出现趋势值主要有三种原因:
① 上游源系统因为技术或者成本原因无法完全获取到这一信息,比如对用户手机APP上网记录的解析;
② 从业务上讲,这一信息本来就不存在,比如一个学生的收入,一个未婚者的配偶姓名;
③ 数据准备脚本开发中的错误造成的。
第一种原因,短期内无法解决;第二种原因,数据的缺失并不是错误,无法避免;第三种原因,则只需通过查证修改脚本即可。
缺失值的存在既代表了某一部分信息的丢失,也影响了挖掘分析结论的可靠性与稳定性,因此,必须对缺失值进行处理。
如果缺失值记录数超过了全部记录数的50%,则应该从数据集中直接剔除掉该字段,尝试从业务上寻找替代字段;
如果缺失值记录数没有超过50%,则应该首先看这个字段在业务上是否有替代字段,如果有,则直接剔除掉该字段,如果没有,则必须对其进行处理。

1
2
3
4
5
6
7
##查看哪些字段有缺失值   
dataset.isnull().any() #获取含有NaN的字段
##统计各字段的缺失值个数
dataset.isnull().apply(pd.value_counts)
##删除含有缺失值的字段
nan_col = dataset.isnull().any()
dataset.drop(nan_col[nan_col].index,axis=1)

2.2 缺失值的处理
缺失值的处理主要有两种方式:过滤和填充。

(1)缺失值的过滤
直接删除含有缺失值的记录,总体上会影响样本个数,如果删除样本过多或者数据集本来就是小数据集时,这种方式并不建议采用。

1
2
##删除含有缺失值的记录
dataset.dropna()

(2)缺失值的填充
缺失值的填充主要三种方法:
① 方法一:使用特定值填充
使用缺失值字段的平均值、中位数、众数等统计量填充。
优点:简单、快速
缺点:容易产生数据倾斜
② 方法二:使用算法预测填充
将缺失值字段作为因变量,将没有缺失值字段作为自变量,使用决策树、随机森林、KNN、回归等预测算法进行缺失值的预测,用预测结果进行填充。
优点:相对精确
缺点:效率低,如果缺失值字段与其他字段相关性不大,预测效果差
③ 方法三:将缺失值单独作为一个分组,指定值进行填充
从业务上选择一个单独的值进行填充,使缺失值区别于其他值而作为一个分组,从而不影响算法计算。
优点:简单,实用
缺点:效率低

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
##使用Pandas进行特定值填充
dataset.fillna(0) ##不同字段的缺失值都用0填充
dataset.fillna({'col2':20,'col5':0}) ##不同字段使用不同的填充值
dataset.fillna(dataset.mean()) ##分别使用各字段的平均值填充
dataset.fillna(dataset.median()) ##分别使用个字段的中位数填充

##使用sklearn中的预处理方法进行缺失值填充(只适用于连续型字段)
from sklearn.preprocessing import Imputer
dataset2 = dataset.drop(['col4'],axis=1)
colsets = dataset2.columns
nan_rule1 = Imputer(missing_values='NaN',strategy='mean',axis=0) ##创建填充规则(平均值填充)
pd.DataFrame(nan_rule1.fit_transform(dataset2),columns=colsets) ##应用规则
nan_rule2 = Imputer(missing_values='median',strategy='mean',axis=0) ##创建填充规则(中位数填充)
pd.DataFrame(nan_rule2.fit_transform(dataset2),columns=colsets) ##应用规则
nan_rule3 = Imputer(missing_values='most_frequent',strategy='mean',axis=0) ##创建填充规则(众数填充)
pd.DataFrame(nan_rule3.fit_transform(dataset2),columns=colsets) ##应用规则

Oozie常用命令

Oozie任务重试

1
oozie job -rerun 0000000-191106143754176-oozie-oozi-W  -D oozie.wf.rerun.failnodes=true

检查xml格式

1
xmllint -noout hive-site.xml

查看coordinator状态

1
oozie jobs -jobtype coordinator  -filter status=RUNNING

查看执行错误的workflow.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[root@dlbdn3 ~]# oozie job -info 0000027-181203143359779-oozie-oozi-C -oozie http://localhost:11000/oozie -order=desc -timezone='Asia/Shanghai' -filter status=KILLED AND FAILED
Job ID : 0000027-181203143359779-oozie-oozi-C
------------------------------------------------------------------------------------------------------------------------------------
Job Name : PG-LANDING-JOB-Coordinator
App Path : hdfs://dlbdn3:8020/user/hue/oozie/deployments/_ericsson_-oozie-13-1543825800.46
Status : KILLED
Start Time : 2018-09-13 00:00 CST
End Time : 2040-09-01 15:15 CST
Pause Time : -
Concurrency : 1
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Err Code Created Nominal Time
0000027-181203143359779-oozie-oozi-C@30 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:25 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@29 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:20 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@28 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:15 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@27 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:10 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@26 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:05 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@25 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:00 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@24 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:55 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@23 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:50 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@22 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:45 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@21 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:40 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@20 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:35 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@19 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:30 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@18 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:25 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@17 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:20 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@16 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:15 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@15 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:10 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@14 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:05 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@13 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:00 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@12 KILLED 0000039-181203143359779-oozie-oozi-W - 2018-12-03 16:34 CST 2018-09-13 00:55 CST
------------------------------------------------------------------------------------------------------------------------------------

查看一个具体的workflow信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@dlbdn3 ~]# oozie job --oozie http://localhost:11000/oozie -info 0000039-181203143359779-oozie-oozi-W -localtime
Job ID : 0000039-181203143359779-oozie-oozi-W
------------------------------------------------------------------------------------------------------------------------------------
Workflow Name : PG-LANDING-JOB
App Path : hdfs://dlbdn3:8020/user/hue/oozie/workspaces/hue-oozie-1535681485.06
Status : KILLED
Run : 0
User : ericsson
Group : -
Created : 2018-12-03 16:42 CST
Started : 2018-12-03 16:42 CST
Last Modified : 2018-12-03 16:43 CST
Ended : 2018-12-03 16:43 CST
CoordAction ID: 0000027-181203143359779-oozie-oozi-C@12

Actions
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Ext Status Err Code
------------------------------------------------------------------------------------------------------------------------------------
0000039-181203143359779-oozie-oozi-W@:start: OK - OK -
------------------------------------------------------------------------------------------------------------------------------------
0000039-181203143359779-oozie-oozi-W@spark-23f2 KILLED job_1543800485319_0062 KILLED -
------------------------------------------------------------------------------------------------------------------------------------

查看当前运行的workflow有哪些

1
2
3
4
5
6
7
8
9
10
11
12
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf  -filter status=RUNNING
No Jobs match your criteria!
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf -filter status=RUNNING
Job ID App Name Status User Group Started Ended
------------------------------------------------------------------------------------------------------------------------------------
0000041-181203143359779-oozie-oozi-W PG-LANDING-JOBRUNNING ericsson - 2018-12-03 08:50 GMT -
------------------------------------------------------------------------------------------------------------------------------------
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf -filter status=RUNNING -localtime
Job ID App Name Status User Group Started Ended
------------------------------------------------------------------------------------------------------------------------------------
0000041-181203143359779-oozie-oozi-W PG-LANDING-JOBRUNNING ericsson - 2018-12-03 16:50 CST -
------------------------------------------------------------------------------------------------------------------------------------

深入理解Kafka副本机制

一、Kafka集群

Kafka 使用 Zookeeper 来维护集群成员 (brokers) 的信息。每个 broker 都有一个唯一标识 broker.id,用于标识自己在集群中的身份,可以在配置文件 server.properties 中进行配置,或者由程序自动生成。下面是 Kafka brokers 集群自动创建的过程:

每一个 broker 启动的时候,它会在 Zookeeper 的 /brokers/ids 路径下创建一个 临时节点,并将自己的 broker.id 写入,从而将自身注册到集群;

当有多个 broker 时,所有 broker 会竞争性地在 Zookeeper 上创建 /controller 节点,由于 Zookeeper 上的节点不会重复,所以必然只会有一个 broker 创建成功,此时该 broker 称为 controller broker。它除
了具备其他 broker 的功能外,还负责管理主题分区及其副本的状态。

当 broker 出现宕机或者主动退出从而导致其持有的 Zookeeper 会话超时时,会触发注册在 Zookeeper 上的 watcher 事件,此时 Kafka 会进行相应的容错处理;如果宕机的是 controller broker 时,还会触发新的controller 选举。

二、副本机制

为了保证高可用,kafka 的分区是多副本的,如果一个副本丢失了,那么还可以从其他副本中获取分区数据。但是这要求对应副本的数据必须是完整的,这是 Kafka 数据一致性的基础,所以才需要使用 controller broker 来进行专门的管理。下面将详解介绍 Kafka 的副本机制。

2.1 分区和副本
Kafka 的主题被分为多个分区 ,分区是 Kafka 最基本的存储单位。每个分区可以有多个副本 (可以在创建主题时使用 replication-factor 参数进行指定)。其中一个副本是首领副本 (Leader replica),所有的事件都直接发送给首领副本;其他副本是跟随者副本 (Follower replica),需要通过复制来保持与首领副本数据一致,当首领副本不可用时,其中一个跟随者副本将成为新首领。
alt

2.2 ISR机制
每个分区都有一个 ISR(in-sync Replica) 列表,用于维护所有同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副本来说,它需要满足以下条件才能被认为是同步副本:

与 Zookeeper 之间有一个活跃的会话,即必须定时向 Zookeeper 发送心跳;
在规定的时间内从首领副本那里低延迟地获取过消息。

如果副本不满足上面条件的话,就会被从 ISR 列表中移除,直到满足条件才会被再次加入。

这里给出一个主题创建的示例:使用 –replication-factor 指定副本系数为 3,创建成功后使用 –describe 命令可以看到分区 0 的有 0,1,2 三个副本,且三个副本都在 ISR 列表中,其中 1 为首领副本。
alt

2.3 不完全的首领选举
对于副本机制,在 broker 级别有一个可选的配置参数 unclean.leader.election.enable,默认值为 fasle,代表禁止不完全的首领选举。这是针对当首领副本挂掉且 ISR 中没有其他可用副本时,是否允许某个不完全同步的副本成为首领副本,这可能会导致数据丢失或者数据不一致,在某些对数据一致性要求较高的场景 (如金融领域),这可能无法容忍的,所以其默认值为 false,如果你能够允许部分数据不一致的话,可以配置为 true。

2.4 最少同步副本
ISR 机制的另外一个相关参数是 min.insync.replicas , 可以在 broker 或者主题级别进行配置,代表 ISR 列表中至少要有几个可用副本。这里假设设置为 2,那么当可用副本数量小于该值时,就认为整个分区处于不可用状态。此时客户端再向分区写入数据时候就会抛出异常 org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

2.5 发送确认
Kafka 在生产者上有一个可选的参数 ack,该参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入成功:
acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

三、数据请求

3.1 元数据请求机制
在所有副本中,只有领导副本才能进行消息的读写处理。由于不同分区的领导副本可能在不同的 broker 上,如果某个 broker 收到了一个分区请求,但是该分区的领导副本并不在该 broker 上,那么它就会向客户端返回一个 Not a Leader for Partition 的错误响应。 为了解决这个问题,Kafka 提供了元数据请求机制。

首先集群中的每个 broker 都会缓存所有主题的分区副本信息,客户端会定期发送发送元数据请求,然后将获取的元数据进行缓存。定时刷新元数据的时间间隔可以通过为客户端配置 metadata.max.age.ms 来进行指定。有了元数据信息后,客户端就知道了领导副本所在的 broker,之后直接将读写请求发送给对应的 broker 即可。

如果在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已经过时了,此时还有可能会收到 Not a Leader for Partition 的错误响应,这种情况下客户端会再次求发出元数据请求,然后刷新本地缓存,之后再去正确的 broker 上执行对应的操作,过程如下图:
alt

3.2 数据可见性
需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
alt

3.3 零拷贝
Kafka 所有数据的写入和读取都是通过零拷贝来实现的。传统拷贝与零拷贝的区别如下
传统模式下的四次拷贝与四次上下文切换
以将磁盘文件通过网络发送为例。传统模式下,一般使用如下伪代码所示的方法先将文件数据读入内存,然后通过 Socket 将内存中的数据发送出去。

1
2
buffer = File.read
Socket.send(buffer)

这一过程实际上发生了四次数据拷贝。首先通过系统调用将文件数据读入到内核态 Buffer(DMA 拷贝),然后应用程序将内存态 Buffer 数据读入到用户态 Buffer(CPU 拷贝),接着用户程序通过 Socket 发送数据时将用户态 Buffer 数据拷贝到内核态 Buffer(CPU 拷贝),最后通过 DMA 拷贝将数据拷贝到 NIC Buffer。同时,还伴随着四次上下文切换,如下图所示:
alt

sendfile和transferTo实现零拷贝
Linux 2.4+ 内核通过 sendfile 系统调用,提供了零拷贝。数据通过 DMA 拷贝到内核态 Buffer 后,直接通过 DMA 拷贝到 NIC Buffer,无需 CPU 拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件到网络发送由一个 sendfile 调用完成,整个过程只有两次上下文切换,因此大大提高了性能。零拷贝过程如下图所示:
alt

从具体实现来看,Kafka 的数据传输通过 TransportLayer 来完成,其子类 PlaintextTransportLayer 的 transferFrom 方法通过调用 Java NIO 中 FileChannel 的 transferTo 方法实现零拷贝,如下所示:

1
2
3
4
@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝。实际上是否能使用零拷贝与操作系统相关,如果操作系统提供 sendfile 这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。

四、物理存储

4.1 分区分配
在创建主题时,Kafka 会首先决定如何在 broker 间分配分区副本,它遵循以下原则:

在所有 broker 上均匀地分配分区副本;
确保分区的每个副本分布在不同的 broker 上;
如果使用了 broker.rack 参数为 broker 指定了机架信息,那么会尽可能的把每个分区的副本分配到不同机架的 broker 上,以避免一个机架不可用而导致整个分区不可用。

基于以上原因,如果你在一个单节点上创建一个 3 副本的主题,通常会抛出下面的异常:

1
2
Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor   
Exception: Replication factor: 3 larger than available brokers: 1.

4.2 分区数据保留规则
保留数据是 Kafka 的一个基本特性, 但是 Kafka 不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。相反, Kafka 为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。分别对应以下四个参数:

log.retention.bytes :删除数据前允许的最大数据量;默认值-1,代表没有限制;
log.retention.ms:保存数据文件的毫秒数,如果未设置,则使用 log.retention.minutes 中的值,默认为 null;
log.retention.minutes:保留数据文件的分钟数,如果未设置,则使用 log.retention.hours 中的值,默认为 null;
log.retention.hours:保留数据文件的小时数,默认值为 168,也就是一周。
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以 Kafka 把分区分成若干个片段,当前正在写入数据的片段叫作活跃片段。活动片段永远不会被删除。如果按照默认值保留数据一周,而且每天使用一个新片段,那么你就会看到,在每天使用一个新片段的同时会删除一个最老的片段,所以大部分时间该分区会有 7 个片段存在。

4.3 文件格式
通常保存在磁盘上的数据格式与生产者发送过来消息格式是一样的。 如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送 (格式如下所示) ,然后保存到磁盘上。之后消费者读取后再自己解压这个包装消息,获取每条消息的具体信息。
alt

APP数据统计-用户活跃统计周活跃,月活跃(不是按照自然周计算,每天的前7天 前30天)

code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.Row
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import com.jiatui.utils.Constants
import com.jiatui.utils.CommonUtils
import org.apache.spark.storage.StorageLevel

object App_user_active_statistics {

def main(args: Array[String]): Unit = {
val Array(appName, logLevel, parallelismStr, dstTbl, start_date, end_date,enabled) = args
val conf = new SparkConf().setAppName(appName)
conf.set("spark.default.parallelism", parallelismStr)
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.sql.shuffle.partitions", parallelismStr)
conf.set("spark.sql.adaptive.enabled", enabled)
conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "128000000")
val sc = new SparkContext(conf)
sc.setLogLevel(logLevel)

val hiveContext = new HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hiveContext.setConf("hive.exec.max.dynamic.partitions.pernode", "2000")
hiveContext.setConf("hive.exec.max.dynamic.partitions", "5000")

val ts = CommonUtils.getCurrentTime("yyyy-MM-dd HH:mm:ss")
// (2019-05-01 2019-07-16)
val allDate = CommonUtils.findDates(start_date, end_date)
// 2019-05-01,2019-05-02,2019-05-03
val allDate_bc = sc.broadcast(allDate)
//起始日期再推30天
val twenty_nine_days_ago = CommonUtils.getDateByStep(start_date, -29)
val all_data_sql = s"""
SELECT app_user_id,user_type,company_id,snapshot_date
FROM
dwd.DWD_APP_USER_ACTIVE_RECORD
WHERE dt between '$twenty_nine_days_ago' AND '$end_date'
"""

val all_active_data = hiveContext.sql(all_data_sql).persist(StorageLevel.MEMORY_AND_DISK)

val day_active_data_rdd = all_active_data.flatMap(x => {

val data = new ArrayBuffer[Row]()

try {

val app_user_id = x.getAs[String]("app_user_id")
val user_type = x.getAs[String]("user_type")
val company_id = x.getAs[String]("company_id")
val snapshot_date = x.getAs[String]("snapshot_date")

//循环日期
val allDatefor = allDate_bc.value

for (singleDate <- allDatefor) {

if (singleDate.equals(snapshot_date)) {

data += modifyRecord(0, 1L, user_type, company_id, singleDate)
}
}

} catch {

case t: Throwable => t.printStackTrace()

}

data

})

//周活和月活数据
val week_month_active_data_rdd = all_active_data.flatMap(x => {

val data = new ArrayBuffer[(( String, String, String, String), Int)]()
try {

val app_user_id = x.getAs[String]("app_user_id")
val user_type = x.getAs[String]("user_type")
val company_id = x.getAs[String]("company_id")
val snapshot_date = x.getAs[String]("snapshot_date")

val allDatefor = allDate_bc.value

for (singleDate <- allDatefor) {
// 二个日期的差值
val date_diff = CommonUtils.getDateDiff(snapshot_date, singleDate)
if (date_diff >= 0 && date_diff <= 6) {
data += (((singleDate, company_id, app_user_id, user_type), 1))
}
}
} catch {
case t: Throwable => t.printStackTrace()
}
data
}).reduceByKey {
case (x, y) =>
x
}.flatMap {
x =>
Some(modifyRecord(1, 1L, x._1._4, x._1._2, x._1._1))

}

//月活跃
val month_active_data_rdd = all_active_data.flatMap(x => {

val data = new ArrayBuffer[(( String, String, String, String), Int)]()
try {

val app_user_id = x.getAs[String]("app_user_id")
val user_type = x.getAs[String]("user_type")
val company_id = x.getAs[String]("company_id")
val snapshot_date = x.getAs[String]("snapshot_date")

val allDatefor = allDate_bc.value

for (singleDate <- allDatefor) {
// 二个日期的差值
val date_diff = CommonUtils.getDateDiff(snapshot_date, singleDate)
if (date_diff >= 0 && date_diff <= 29) {
data += ((( singleDate, company_id, app_user_id, user_type),2))
}
}
} catch {
case t: Throwable => t.printStackTrace()
}
data
}).reduceByKey {
case (x, y) =>
x
}.flatMap {
x =>
Some(modifyRecord(2, 1L, x._1._4, x._1._2, x._1._1))

}

val total_new_app_user_sql = s"""
SELECT app_user_id,user_type,company_id,snapshot_date
FROM
dwd.dwd_app_user_increased_record
WHERE dt <= '$end_date'
"""

val total_new_app_user_rdd = hiveContext.sql(total_new_app_user_sql).flatMap(x => {

val data = new ArrayBuffer[Row]()
try {

val app_user_id = x.getAs[String]("app_user_id")
val user_type = x.getAs[String]("user_type")
val company_id = x.getAs[String]("company_id")
val snapshot_date = x.getAs[String]("snapshot_date")


val allDatefor = allDate_bc.value

for (singleDate <- allDatefor) {

if (singleDate.compareTo(snapshot_date) >= 0) {

data += modifyRecord(4, 1L, user_type, company_id, singleDate)

}
if (singleDate.equals(snapshot_date)) {

data += modifyRecord(3, 1L, user_type, company_id, singleDate)

}
}


} catch {
case t: Throwable => t.printStackTrace()
}
data
})

val row_rdd = day_active_data_rdd.union(week_month_active_data_rdd).union(month_active_data_rdd).union(total_new_app_user_rdd)
val struct = StructType(
Array(
StructField("day_active_app_user_cnt", LongType),
StructField("week_active_app_user_cnt", LongType),
StructField("month_active_app_user_cnt", LongType),
StructField("new_app_user_cnt", LongType),
StructField("total_app_user_cnt", LongType),
StructField("user_type", StringType),
StructField("company_id", StringType),
StructField("snapshot_date", StringType)))

val df = hiveContext.createDataFrame(row_rdd, struct)

val tmpTable = "tmp"
df.registerTempTable(tmpTable)

val final_sql = s"""
INSERT OVERWRITE TABLE $dstTbl partition(dt)
SELECT
day_active_app_user_cnt,
week_active_app_user_cnt,
month_active_app_user_cnt,
new_app_user_cnt,
total_app_user_cnt,
user_type,
company_id,
from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss'),
snapshot_date,
snapshot_date
FROM $tmpTable
"""

println("sql = " + final_sql)
hiveContext.sql(final_sql)

sc.stop()
}

def modifyRecord(index: Int, new_value: Long, user_type: String, company_id: String, snapshot_date: String) = {
val L1 = List(0L, 0L, 0L, 0L, 0L, user_type, company_id, snapshot_date)
val L2 = L1.updated(index, new_value)
Row.fromSeq(L2)

}
}
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量