AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

AppendStreamTableSink: 可将动态表转换为Append流。适用于动态表只有Insert的场景。

RetractStreamTableSink: 可将动态表转换为Retract流。适用于动态表有Insert、Delete、Update的场景。

UpsertStreamTableSink: 可将动态表转换为Upsert流。适用于动态表有Insert、Delete、Update的场景。

注意:

RetractStreamTableSink中: Insert被编码成一条Add消息; Delete被编码成一条Retract消息;Update被编码成两条消息(先是一条Retract消息,再是一条Add消息),即先删除再增加。

UpsertStreamTableSink: Insert和Update均被编码成一条消息(Upsert消息); Delete被编码成一条Delete消息。

UpsertStreamTableSink和RetractStreamTableSink最大的不同在于Update编码成一条消息,效率上比RetractStreamTableSink高。

上述说的编码指的是动态表转换为DataStream时,表的增删改如何体现到DataStream上。

测试数据

1
2
3
4
5
6
7
8
9
10
11
12
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:03:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}

AppendStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class AppendStreamTableSink {

public static void main(String[] args) throws Exception {

// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
// props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
// props.setProperty("connector.version", "universal");
// DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer011<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

// DataStream<UserBrowseLog> browseStream=streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer010<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "eventTime", "eventType", "productID", "productPrice",
"eventTimeTimestamp" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.BIGINT() };
TableSink<Row> myAppendStreamTableSink = new MyAppendStreamTableSink(sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout", myAppendStreamTableSink);

// 5、连续查询

String sql = "insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'";
tableEnvironment.sqlUpdate(sql);

tableEnvironment.execute(AppendStreamTableSink.class.getSimpleName());

}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

private static class MyAppendStreamTableSink implements org.apache.flink.table.sinks.AppendStreamTableSink<Row> {

private TableSchema tableSchema;

public MyAppendStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

@Override
public DataType getConsumedDataType() {
return tableSchema.toRowDataType();
}

// 已过时
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}

@Override
public DataStreamSink<Row> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new SinkFunction());
}

private static class SinkFunction extends RichSinkFunction<Row> {
public SinkFunction() {
}

@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println(value);
}
}

}

}

结果

1
2
3
4
5
6
user_1,2016-01-01 10:02:00,browse,product_5,20,1451613720000
user_1,2016-01-01 10:02:16,browse,product_5,20,1451613736000
user_1,2016-01-01 10:02:15,browse,product_5,20,1451613735000
user_1,2016-01-01 10:02:02,browse,product_5,20,1451613722000
user_1,2016-01-01 10:02:06,browse,product_5,20,1451613726000
user_1,2016-01-01 10:03:16,browse,product_5,20,1451613796000

RetractStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class RetractStreamTableSink {

public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.RetractStreamTableSink<Row> retractStreamTableSink = new MyRetractStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

/**
* 自定义RetractStreamTableSink
*
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* DataStream里的数据格式是Tuple2类型,如Tuple2<Boolean, Row>。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
private static class MyRetractStreamTableSink implements org.apache.flink.table.sinks.RetractStreamTableSink<Row> {

private TableSchema tableSchema;

public MyRetractStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

public TableSchema getTableSchema() {
return tableSchema;
}

// 已过时
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
}

// 最终会转换成DataStream处理
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}

public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if (flag) {
System.out.println("增加... " + value);
} else {
System.out.println("删除... " + value);
}
}
}
}

}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
增加... (true,user_2,1)
删除... (false,user_2,1)
增加... (true,user_2,2)
增加... (true,user_1,1)
删除... (false,user_1,1)
增加... (true,user_1,2)
删除... (false,user_1,2)
增加... (true,user_1,3)
删除... (false,user_1,3)
增加... (true,user_1,4)
删除... (false,user_1,4)
增加... (true,user_1,5)
删除... (false,user_1,5)
增加... (true,user_1,6)

UpsertStreamTableSink示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.RetractStreamTableSink.BrowseKafkaProcessFunction;
import com.shuai.test.model.UserBrowseLog;

public class UpsertStreamTableSink {
public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.UpsertStreamTableSink<Row> retractStreamTableSink = new MyUpsertStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}


/**
* 自定义UpsertStreamTableSink
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成一条Add消息。如Tuple2<True, Row>。
* 在SortLimit(即order by ... limit ...)的场景下,被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
@SuppressWarnings("unused")
private static class MyUpsertStreamTableSink implements org.apache.flink.table.sinks.UpsertStreamTableSink<Row> {

private TableSchema tableSchema;

@SuppressWarnings("unused")
public MyUpsertStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}


// 设置Unique Key
// 如上SQL中有GroupBy,则这里的唯一键会自动被推导为GroupBy的字段
@Override
public void setKeyFields(String[] keys) {}

// 是否只有Insert
// 如上SQL场景,需要Update,则这里被推导为isAppendOnly=false
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {}

// 最终会转换成DataStream处理

public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}


public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
/**
*
*/
private static final long serialVersionUID = 1L;

public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}


}

结果

1
2
3
4
5
6
7
8
增加... (true,user_1,1)
增加... (true,user_1,2)
增加... (true,user_1,3)
增加... (true,user_1,4)
增加... (true,user_1,5)
增加... (true,user_1,6)
增加... (true,user_2,1)
增加... (true,user_2,2)
文章目录
  1. 1. Flink Table & SQL StreamTableSink有三类接口: AppendStreamTableSink、UpsertStreamTableSink、RetractStreamTableSink。
  2. 2. 测试数据
  3. 3. AppendStreamTableSink 示例
  4. 4. 结果
  5. 5. RetractStreamTableSink 示例
  6. 6. 结果
  7. 7. UpsertStreamTableSink示例
  8. 8. 结果
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量