Datax3.0简介

Datex3.0概览

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
(这是一个单机多任务的ETL工具)

alt

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

当前使用现状

此前已经开源DataX1.0版本,此次介绍为阿里云开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX

DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

alt
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0插件体系

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图

alt

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

alt

核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

DataXJob根据分库分表切分成了100个Task。

根据20个并发,DataX计算共需要分配4个TaskGroup。

4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

五、DataX 3.0六大核心优势

可靠的数据质量监控

完美解决数据传输个别类型失真问题
DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。

提供作业全链路的流量、数据量运行时监控
DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。

提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!

丰富的数据转换功能

DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。

精准的速度控制

还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

1
2
3
4
"speed": {
"channel": 8, ----并发数限速(根据自己CPU合理控制并发数)
"byte": 524288, ----字节流限速(根据自己的磁盘和网络合理控制字节数)
"record": 10000 ----记录流限速(根据数据合理空行数)

强劲的同步性能

DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试

健壮的容错机制

DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
线程内部重试

DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

线程级别重试

目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

作者:香山上的麻雀
链接:https://www.jianshu.com/p/f5f0dc99d5ab
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Flink与Spark多方面区别对比

本文转载于
场景描述:Flink是标准的实时处理引擎,而且Spark的两个模块Spark Streaming和Structured Streaming都是基于微批处理的,不过现在Spark Streaming已经非常稳定基本都没有更新了,然后重点移到spark sql和structured Streaming了

Flink和Spark的区别在编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面存在不同。

维表join和异步IO

Structured Streaming不直接支持与维表的join操作,但是可以使用map、flatmap及udf等来实现该功能,所有的这些都是同步算子,不支持异步IO操作。但是Structured Streaming直接与静态数据集的join,可以也可以帮助实现维表的join功能,当然维表要不可变。

Flink支持与维表进行join操作,除了map,flatmap这些算子之外,flink还有异步IO算子,可以用来实现维表,提升性能。

状态管理

状态维护应该是流处理非常核心的概念了,比如join,分组,聚合等操作都需要维护历史状态。那么flink在这方面很好,structured Streaming也是可以,但是spark Streaming就比较弱了,只有个别状态维护算子upstatebykye等,大部分状态需要用户自己维护,虽然这个对用户来说有更大的可操作性和可以更精细控制但是带来了编程的麻烦。flink和Structured Streaming都支持自己完成了join及聚合的状态维护。

Structured Streaming有高级的算子,用户可以完成自定义的mapGroupsWithState和flatMapGroupsWithState,可以理解类似Spark Streaming 的upstatebykey等状态算子。

就拿mapGroupsWithState为例:
由于Flink与Structured Streaming的架构的不同,task是常驻运行的,flink不需要状态算子,只需要状态类型的数据结构。

首先看一下Keyed State下,我们可以用哪些原子状态:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用

reduceFunction,最后合并到一个单一的状态值。

FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。

MapState:即状态值为一个map。用户通过put或putAll方法添加元素。

Join操作

Flink的join操作

flink的join操作没有大的限制,支持种类丰富,比如:

Inner Equi-join

1
SELECT * FROM Orders INNER JOIN Product ONOrders.productId = Product.id

Outer Equi-join

1
2
3
4
5
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId =Product.id

SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId =Product.id

SELECT * FROM Orders FULL OUTER JOIN Product ONOrders.productId = Product.id

Time-windowed Join

1
SELECT * FROM Oderso,Shipmentss WHEREo.id=s.orderIdAND o.ordertimeBETWEENs.shiptime INTERVAL'4'HOURANDs.shiptime

Expanding arrays into a relation

1
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Join with Table Function

1
2
3
4
5
6
7
8
9
10
11
Inner Join

A row of the left (outer) table is dropped, if its table function call returns an empty result.
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

Left Outer Join
If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.

SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

Join with Temporal Table

1
2
3
4
5
6
7
8

SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency

Structured Streaming的join操作
Structured Streaming的join限制颇多了,限于篇幅问题在这里只讲一下join的限制
alt

容错机制及一致性语义

Spark Streaming 保证仅一次处理

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。

由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:
repartition(1) Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:

1
2
3
4
5
6
7

Dstream.foreachRDD(rdd=>{
rdd.repartition(1).foreachPartition(partition=>{ // 开启事务
partition.foreach(each=>{// 提交数据
}) // 提交事务
})
})

将结果和 offset 一起提交

也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。

Flink 与 kafka 0.11 保证仅一次处理

若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。

在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。

本例中的 Flink 应用如图所示包含以下组件:

一个source,从Kafka中读取数据(即KafkaConsumer)

一个时间窗口化的聚会操作

一个sink,将结果写回到Kafka(即KafkaProducer)

alt
下面详细讲解 flink 的两段提交思路

alt

Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。

这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。

当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交

alt

当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图
alt
当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。

本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:

alt
以上就是 flink 实现恰一次处理的基本逻辑。

背压

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。

Spark Streaming 的背压
Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset

PIDRateEsimator 的 compute 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def compute(       
time: Long,
// in milliseconds
numElements: Long,
processingDelay: Long,
// in milliseconds schedulingDelay: Long
// in milliseconds
): Option[Double] = {logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {if (time > latestTime && numElements > 0 && processingDelay > 0) {
val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate/ batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate)
logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)
latestTime = time
if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None }
else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } }
else { logTrace("Rate estimation skipped") None } } }
}
}
}

}

Flink 的背压
与 Spark Streaming 的背压不同的是,Flink 背压是 jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 16 所示:

alt
阻塞占比在 web 上划分了三个等级:

OK: 0 <= Ratio <= 0.10,表示状态良好;
LOW: 0.10 < Ratio <= 0.5,表示有待观察;
HIGH: 0.5 < Ratio <= 1,表示要处理了。

表管理

flink和structured streaming都可以讲流注册成一张表,然后使用sql进行分析,不过两者之间区别还是有些的。
Structured Streaming将流注册成临时表,然后用sql进行查询,操作也是很简单跟静态的dataset/dataframe一样。

1
2
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")

其实,此处回想Spark Streaming 如何注册临时表呢?在foreachRDD里,讲rdd转换为dataset/dataframe,然后将其注册成临时表,该临时表特点是代表当前批次的数据,而不是全量数据。Structured Streaming注册的临时表就是流表,针对整个实时流的。Sparksession.sql执行结束后,返回的是一个流dataset/dataframe,当然这个很像spark sql的sql文本执行,所以为了区别一个dataframe/dataset是否是流式数据,可以df.isStreaming来判断。

当然,flink也支持直接注册流表,然后写sql分析,sql文本在flink中使用有两种形式:

1
2
3
4
5

1). tableEnv.sqlQuery("SELECT product,amount FROM Orders WHERE product LIKE '%Rubber%'")

2). tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHEREproduct LIKE '%Rubber%'");

对于第一种形式,sqlQuery执行结束之后会返回一张表也即是Table对象,然后可以进行后续操作或者直接输出,如:result.writeAsCsv(“”);。
而sqlUpdate是直接将结果输出到了tablesink,所以要首先注册tablesink,方式如下:

1
2
3
4
5
6
7
TableSink csvSink = newCsvTableSink("/path/to/file", ...);

String[] fieldNames = {"product","amount"};

TypeInformation[] fieldTypes ={Types.STRING, Types.INT};

tableEnv.registerTableSink("RubberOrders",fieldNames, fieldTypes, csvSink);

flink注册表的形式比较多,直接用数据源注册表,如:

1
2
tableEnv.registerExternalCatalog();
tableEnv.registerTableSource();

也可以从datastream转换成表,如:

1
2
tableEnv.registerDataStream("Orders",ds, "user, product, amount");
Table table = tableEnv.fromDataStream(ds,"user, product, amount");

Flink端到端状态一致性EXACTLY_ONCE实现

状态一致性:

alt
有状态的流处理,内部每个算子任务都可以有自己的状态;

对于流处理器内部(没有接入sink)来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确;

一条数据不应该丢失,也不应该重复计算;

在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正常的

状态一致性分类:

AT_MOST_ONCE(最多一次),当任务故障时最简单做法是什么都不干,既不恢复丢失状态,也不重播丢失数据。At-most-once语义的含义是最多处理一次事件。

AT_LEAST_ONCE(至少一次),在大多数真实应用场景,我们希望不丢失数据。这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。

EXACTLY_ONCE(精确一次),恰好处理一次是最严格的的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。

一致性检查点(Checkpoints)

Flink使用了一种轻量级快照机制 — 检查点(Checkpoint)来保证exactly-one语义;

有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

应用状态的一致性检查点,是Flink故障恢复机制的核心。

alt

端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如kafka)和输出到持久化系统;

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终,每个组件都保证了它自己的一致性;

整个端到端的一致性级别取决于所有组件中一致性最弱的组件;

端到端exactly-once

内部保证 — checkpoint

source端 — 可重设数据的读取位置;可重新读取偏移量

sink端 – 从故障恢复时,数据不会重复写入外部系统:幂等写入和事务写入;

幂等写入(Idempotent Writes):

幂等操作即一个操作可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了;

alt

它的原理不是不重复写入而是重复写完之后结果还是一样;它的瑕疵是不能做到完全意义上exactly-once(在故障恢复时,突然把外部系统写入操作跳到之前的某个状态然后继续往里边写,故障之前发生的那一段的状态变化又重演了直到最后发生故障那一刻追上就正常了;假如中间这段又被读取了就可能会有些问题);

事务写入(Transactional Writes):

事务Transactional Writes

应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销;
具有原子性,一个事务中的一系列的操作要么全部成功,要么一个都不做。
实现思想:构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中。

实现方式:预习日志和两阶段提交

预习日志(Write-Ahead-Log,WAL)

把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统;

简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定;

DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink;

瑕疵:
A. sink系统没说它支持事务,有可能出现一部分写进去了一部分没写进去(如果算失败,再写一次就写了两次了);

B. checkpoint做完了sink才去真正写入(但其实得等sink都写完checkpoint才能生效,所以WAL这个机制jobmanager确定它写完还不算真正写完,还得有一个外部系统已经确认完成的checkpoint)

两阶段提交(Two–Phase–Commit,2PC)– 真正能够实现exactly-once

对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里;

然后将这些数据写入外部sink系统,但不提交他们 – 这时只是预提交;

当它收到checkpoint完成时的通知,它才正式提交事务,实现结果的真正写入;

这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统,Flink提供了TwoPhaseCommitSinkFunction接口

2PC对外部sink的要求

外部sink系统必须事务支持,或者sink任务必须能够模拟外部系统上的事务;

在checkpoint的间隔期间里,必须能够开启一个事务,并接受数据写入;

在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态,在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失;

sink任务必须能够在进程失败后恢复事务;

提交事务必须是幂等操作;

不同Source和sink的一致性保证:

alt

Flink+kafka端到端状态一致性的保证
Flink和kafka天生就是一对,用kafka做为source,用kafka做完sink <===> 实现端到端的一致性

内部 – 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性;

source – kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性;

sink – kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

alt

alt

alt

默认是AT_LEAST_ONCE

Exactly-once两阶段提交

alt
JobManager协调各个TaskManager进行checkpoint存储;

checkpoint保存在StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存;

alt
当checkpoint启动时,JobManager会将检查点分界线(barrier)注入数据流;

barrier会在算子间传递下去;

alt

每个算子会对当前的状态做个快照,保存到状态后端;

checkpoint机制可以保证内部的状态一致性;

alt
每个内部的transform任务遇到barrier时,都会把状态存到checkpoint里;

sink任务首先把数据写入外部kafka,这些数据都属于预提交的事务;遇到barrier时,把状态保存到状态后端,并开启新的预提交事务(以barrier为界之前的数据属于上一个事务,之后的数据属于下一个新的事务);

alt
当所有算子任务的快照完成,也就是这次的checkpoint完成时,JobManager会向所有任务发通知,确认这次checkpoint完成;

sink任务收到确认通知,正式提交之前的事务,kafka中未确认数据改完“已确认”;

Exactly-once两阶段提交步骤:

第一条数据来在之后,开启一个kafka的事务(transaction),正常写入kafka分区日志但标记为未提交,这就是“预提交”;

JobManagere触发checkpoint操作,barrier从source开始向下传递,遇到barrier的算子将状态存入状态后端,并通知JobManagere;

sink连接器接收到barrier,保存当前状态,存入checkpoint,通知JobManager并开启下一阶段的事务,用于提交下个检查点的数据;

JobManager收到所有任务的通知,发出确认信息,表示checkpoint完成;

sink任务收到JobManager的确认信息,正式提交这段时间的数据;

外部kafka关闭事务,提交的数据可以正常消费了。

在代码中真正实现flink和kafak的端到端exactly-once语义:

alt

alt

A. 这里需要配置下,因为它默认的是AT_LEAST_ONCE;

B. 对于外部kafka读取的消费者的隔离级别,默认是read_on_commited,如果默认是可以读未提交的数据,就相当于整个一致性还没得到保证(未提交的数据没有最终确认那边就可以读了,相当于那边已经消费数据了,事务就是假的了) 所以需要修改kafka的隔离级别;

C. timeout超时问题,flink和kafka 默认sink是超时1h,而kafak集群中配置的tranctraction事务的默认超时时间是15min,flink-kafak这边的连接器的时间长,这边还在等着做操作 ,kafak那边等checkpoint等的时间太长直接关闭了。所以两边的超时时间最起码前边要比后边的小

Flink使用assignAscendingTimestamps 生成水印的三个重载方法

先简单介绍一下Timestamp 和Watermark 的概念:

  1. Timestamp和Watermark都是基于事件的时间字段生成的
  2. Timestamp和Watermark是两个不同的东西,并且一旦生成都跟事件数据没有关系了(所有即使事件中不再包含生成Timestamp和Watermark的字段也没关系)
  3. 事件数据和 Timestamp 一一对应(事件在流中传递以StreamRecord对象表示,value 和 timestamp 是它的两个成员变量)
  4. Watermark 在生成之后与事件数据没有直接关系,Watermark 作为一个消息,和事件数据一样在流中传递(Watermark 和StreamRecord 具有相同的父类:StreamElement)
  5. Timestamp 与 Watermark 在生成之后,会在下游window算子中做比较,判断事件数据是否是过期数据
  6. 只有window算子才会用Watermark判断事件数据是否过期

assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]

此方法是数据流的快捷方式,其中已知元素时间戳在每个并行流中单调递增。在这种情况下,系统可以通过跟踪上升时间戳自动且完美地生成水印。

1
2
3
4
5
6
7
8
9
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// flink auto create timestamp & watermark
.assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

注:这种方法创建时间戳与水印最简单,返回一个long类型的数字就可以了

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

基于给定的水印生成器生成水印,即使没有新元素到达也会定期检查给定水印生成器的新水印,以指定允许延迟时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks periodically(定期生成水印)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
override def extractTimestamp(element: LateDataEvent): Long = {
println("want watermark : " + sdf.parse(element.createTime).getTime)
sdf.parse(element.createTime).getTime
}
})

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

此方法仅基于流元素创建水印,对于通过[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]处理的每个元素,
调用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大于以前的水印,会发出新的水印,
此方法可以完全控制水印的生成,但是要注意,每秒生成数百个水印会影响性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks every event
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
// check extractTimestamp emitted watermark is non-null and large than previously
override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
// generate next watermark
override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
val eventTime = sdf.parse(element.createTime).getTime
eventTime
}
})

转载于

Flink使用SQL操作几种类型的window

Flink SQL 支持三种窗口类型, 分别为 Tumble Windows / HOP Windows 和 Session Windows. 其中 HOP windows 对应 Table API 中的 Sliding Window, 同时每种窗口分别有相应的使用场景和方法

Tumble Window(翻转窗口)
Hop Window(滑动窗口)
Session Window(会话窗口)

HOPWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class HOPWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// HOP(time_attr, interval1, interval2)
// interval1 滑动长度
// interval2 窗口长度
Table result = tEnv.sqlQuery("SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start," +
"HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}

}

TumbleWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class TumbleWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
"TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}


}

SessionWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class SessionWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),

/* Start Session */
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),

/* Start Session */
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// SESSION(time_attr, interval)
// interval 表示两条数据触发session的最大间隔
Table result = tEnv.sqlQuery("SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start," +
"SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}
}

AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

AppendStreamTableSink: 可将动态表转换为Append流。适用于动态表只有Insert的场景。

RetractStreamTableSink: 可将动态表转换为Retract流。适用于动态表有Insert、Delete、Update的场景。

UpsertStreamTableSink: 可将动态表转换为Upsert流。适用于动态表有Insert、Delete、Update的场景。

注意:

RetractStreamTableSink中: Insert被编码成一条Add消息; Delete被编码成一条Retract消息;Update被编码成两条消息(先是一条Retract消息,再是一条Add消息),即先删除再增加。

UpsertStreamTableSink: Insert和Update均被编码成一条消息(Upsert消息); Delete被编码成一条Delete消息。

UpsertStreamTableSink和RetractStreamTableSink最大的不同在于Update编码成一条消息,效率上比RetractStreamTableSink高。

上述说的编码指的是动态表转换为DataStream时,表的增删改如何体现到DataStream上。

测试数据

1
2
3
4
5
6
7
8
9
10
11
12
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:03:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}

AppendStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class AppendStreamTableSink {

public static void main(String[] args) throws Exception {

// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
// props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
// props.setProperty("connector.version", "universal");
// DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer011<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

// DataStream<UserBrowseLog> browseStream=streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer010<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "eventTime", "eventType", "productID", "productPrice",
"eventTimeTimestamp" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.BIGINT() };
TableSink<Row> myAppendStreamTableSink = new MyAppendStreamTableSink(sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout", myAppendStreamTableSink);

// 5、连续查询

String sql = "insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'";
tableEnvironment.sqlUpdate(sql);

tableEnvironment.execute(AppendStreamTableSink.class.getSimpleName());

}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

private static class MyAppendStreamTableSink implements org.apache.flink.table.sinks.AppendStreamTableSink<Row> {

private TableSchema tableSchema;

public MyAppendStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

@Override
public DataType getConsumedDataType() {
return tableSchema.toRowDataType();
}

// 已过时
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}

@Override
public DataStreamSink<Row> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new SinkFunction());
}

private static class SinkFunction extends RichSinkFunction<Row> {
public SinkFunction() {
}

@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println(value);
}
}

}

}

结果

1
2
3
4
5
6
user_1,2016-01-01 10:02:00,browse,product_5,20,1451613720000
user_1,2016-01-01 10:02:16,browse,product_5,20,1451613736000
user_1,2016-01-01 10:02:15,browse,product_5,20,1451613735000
user_1,2016-01-01 10:02:02,browse,product_5,20,1451613722000
user_1,2016-01-01 10:02:06,browse,product_5,20,1451613726000
user_1,2016-01-01 10:03:16,browse,product_5,20,1451613796000

RetractStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class RetractStreamTableSink {

public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.RetractStreamTableSink<Row> retractStreamTableSink = new MyRetractStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

/**
* 自定义RetractStreamTableSink
*
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* DataStream里的数据格式是Tuple2类型,如Tuple2<Boolean, Row>。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
private static class MyRetractStreamTableSink implements org.apache.flink.table.sinks.RetractStreamTableSink<Row> {

private TableSchema tableSchema;

public MyRetractStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

public TableSchema getTableSchema() {
return tableSchema;
}

// 已过时
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
}

// 最终会转换成DataStream处理
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}

public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if (flag) {
System.out.println("增加... " + value);
} else {
System.out.println("删除... " + value);
}
}
}
}

}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
增加... (true,user_2,1)
删除... (false,user_2,1)
增加... (true,user_2,2)
增加... (true,user_1,1)
删除... (false,user_1,1)
增加... (true,user_1,2)
删除... (false,user_1,2)
增加... (true,user_1,3)
删除... (false,user_1,3)
增加... (true,user_1,4)
删除... (false,user_1,4)
增加... (true,user_1,5)
删除... (false,user_1,5)
增加... (true,user_1,6)

UpsertStreamTableSink示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.RetractStreamTableSink.BrowseKafkaProcessFunction;
import com.shuai.test.model.UserBrowseLog;

public class UpsertStreamTableSink {
public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.UpsertStreamTableSink<Row> retractStreamTableSink = new MyUpsertStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}


/**
* 自定义UpsertStreamTableSink
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成一条Add消息。如Tuple2<True, Row>。
* 在SortLimit(即order by ... limit ...)的场景下,被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
@SuppressWarnings("unused")
private static class MyUpsertStreamTableSink implements org.apache.flink.table.sinks.UpsertStreamTableSink<Row> {

private TableSchema tableSchema;

@SuppressWarnings("unused")
public MyUpsertStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}


// 设置Unique Key
// 如上SQL中有GroupBy,则这里的唯一键会自动被推导为GroupBy的字段
@Override
public void setKeyFields(String[] keys) {}

// 是否只有Insert
// 如上SQL场景,需要Update,则这里被推导为isAppendOnly=false
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {}

// 最终会转换成DataStream处理

public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}


public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
/**
*
*/
private static final long serialVersionUID = 1L;

public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}


}

结果

1
2
3
4
5
6
7
8
增加... (true,user_1,1)
增加... (true,user_1,2)
增加... (true,user_1,3)
增加... (true,user_1,4)
增加... (true,user_1,5)
增加... (true,user_1,6)
增加... (true,user_2,1)
增加... (true,user_2,2)

Kafka to Flink - HDFS

kafka制造数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.flink.etl;

import lombok.AllArgsConstructor;
import lombok.ToString;

@lombok.Data
@ToString

public class Data {

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

private long timestamp;
private String event;
private String uuid;

public Data(long timestamp, String event, String uuid) {

this.timestamp=timestamp;
this.event=event;
this.uuid=uuid;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.flink.etl;

import com.alibaba.fastjson.JSON;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;



public class DataGenerator {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.3.122:9092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
List<String> events = Arrays.asList("page_view", "adv_click", "thumbs_up");
Random random = new Random();
Data data = null;
ProducerRecord<String, String> record = null;
try {
while (true) {
long timestamp = System.currentTimeMillis();
String event = events.get(random.nextInt(events.size()));
String uuid = UUID.randomUUID().toString();
data = new Data(timestamp, event, uuid);
record = new ProducerRecord<>("data-collection-topic", JSON.toJSONString(data));
producer.send(record);
Thread.sleep(3000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}

}

StreamingJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.flink.etl;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"data-collection-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.create()
.withMaxPartSize(1024*1024*128) // 设置每个文件的最大大小 ,默认是128M。这里设置为128M
.withRolloverInterval(TimeUnit.MINUTES.toMillis(86400)) // 滚动写入新文件的时间,默认60s。这里设置为无限大
.withInactivityInterval(TimeUnit.MINUTES.toMillis(60)) // 60s空闲,就滚动写入新的文件
.build();
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("hdfs://DATASEA/home/dmp_operator1/bpointdata"), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner()).withRollingPolicy(rollingPolicy).build();
stream.addSink(sink);
env.enableCheckpointing(10_000);
env.setParallelism(1);
env.setStateBackend((StateBackend) new FsStateBackend("hdfs://DATASEA/home/dmp_operator1/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}

EventTimeBucketAssigner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.flink.etl;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
private ObjectMapper mapper = new ObjectMapper();

@Override
public String getBucketId(String element, Context context) {
JsonNode node = null;
long date = System.currentTimeMillis();
try {
node = mapper.readTree(element);
// date = (long) (node.path("timestamp").floatValue() * 1000);
date = (long) (node.path("timestamp").floatValue());
} catch (IOException e) {
e.printStackTrace();
}
// String partitionValue = new SimpleDateFormat("yyyyMMddHHmm").format(new Date(date));
String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));
return "dt=" + partitionValue;
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}

Druid原理和架构

Druid简介

概念:主要是解决低延迟下实时数据摄入与查询的平台,本质是一个数据存储,但是数据仍然是保存在(hdfs、文件系统等)中。
特点:
① 列式存储格式:
可以将列作为索引,为仅查看几列的查询提供了巨大的速度提升
② 高可用、高并发:
① 集群扩展、缩小、删除、宕机都不会停止服务,全天候运行
② HA、sql的并行化执行、可扩展、容灾等
③ 支持1000+的并发用户,并提供隔离机制支持多租户模式(多租户就是并发互不影响)
④ 低延迟
Druid采用了列式存储、倒排索引、位图索引等关键技术,能够在亚秒级别内完成海量数据的过滤、聚合以及多维分析等操作。
⑤ 存储时候聚合:
无论是实时数据消费还是批量数据处理,Druid在基于DataSource结构存储数据时即可选择对任意的指标列进行聚合操作
聚合:提前做好sum,count等操作

Druid架构

alt
总体可以分为:四个节点+三个依赖

四个节点:

实时节点(RealtimeNode)(新版本的druid好像没有实时节点的说法了):

实时摄入数据,对于旧的数据周期性的生成segment数据文件,上传到deepstorage中
为了避免单点故障,索引服务(Indexer)的主从架构已经逐渐替代了实时节点,所以现在的实时节点,其实里面包含了很多角色:
作用:可以通过索引服务的API,写数据导入任务,用以新增、删除、合并Segment等。是一个主从架构:

统治节点(overlord):

类似于YarnResourceManager:负责集群资源的管理和分配
监视数据服务器上的MiddleManager进程,将提取任务分配给MiddleManager

中间管理者(middlemanager):

类似于YarnNodeManager:负责单个节点资源的管理和分配
新数据提取到群集中的过程。他们负责从外部数据源读取并发布新的段

苦工(peon):

类似于Yarncontainer:负责具体任务的执行
Peon进程是由MiddleManagers产生的任务执行引擎。
每个Peon运行一个单独的JVM,并负责执行单个任务。
Peon总是与生成它们的MiddleManager在同一主机上运行

Router(路由:可选):

可在Druid代理,统治节点和协调器之前提供统一的API网关
注:统治节点和中间管理者的通信是通过zookeeper完成的

历史节点(HistoricalNode):

加载已生成的segment数据文件,以供数据查询
启动或者受到协调节点通知的时候,通过druid_rules表去查找需要加载的数据,然后检查自身的本地缓存中已存在的Segment数据文件,
然后从DeepStorage中下载其他不在本地的Segment数据文件,后加载到内存!!!再提供查询。

查询节点(BrokerNode):

对外提供数据查询服务,并同时从实时节点与历史节点查询数据,合并后返回给调用方
缓存:外部:第三方的一些缓存系统内部:在历史节点或者查询节点做缓存

协调节点(CoodinatorNode):

负责历史节点的数据负载均衡,以及通过规则(Rule)管理数据的生命周期
① 通过从MySQL读取元数据信息,来决定深度存储上哪些数据段应该在那个历史节点中被加载,
② 通过ZK感知历史节点,历史节点增加,会自动分配相关的Segment,历史节点删除,会将原本在这台节点上的Segment分配给其他的历史节点
注:Coordinator是定期运行的,并且运行间隔可以通过配置参数配置

三个依赖:

1)Mysql:

存储关于Druid中的metadata,规则数据,配置数据等,
主要包含以下几张表:
“druid_config”(通常是空的),
“druid_rules”(协作节点使用的一些规则信息,比如哪个segment从哪个node去load)
“druid_segments”(存储每个segment的metadata信息);

2)Deepstorage:

存储segments,Druid目前已经支持本地磁盘,NFS挂载磁盘,HDFS,S3等。

3)ZooKeeper:

① 查询节点通过Zk来感知实时节点和历史节点的存在,提供查询服务。
② 协调节点通过ZK感知历史节点,实现负载均衡
③ 统治节点、协调节点的lead选举

实时Segment数据文件的流动:

生成:

① 实时节点(中间管理者)会周期性的将同一时间段生成的数据合并成一个Segment数据文件,并上传到DeepStorage中。
② Segment数据文件的相关元数据信息保存到MetaStore中(如mysql,derby等)。
③ 协调节点定时(默认1分钟)从MetaSotre中获取到Segment数据文件的相关元信息后,将按配置的规则分配到符合条件的历史节点中。
④ 协调节点会通知一个历史节点去读
⑤ 历史节点收到协调节点的通知后,会从DeepStorage中拉取该Segment数据文件到本地磁盘,并通过zookeeper向集群声明可以提供查询了。
⑥ 实时节点会丢弃该Segment数据文件,并通过zookeeper向集群声明不在提供该Sgment的查询服务。              //其实第四步已经可以提供查询服务了
⑦ 而对于全局数据来说,查询节点(BrokerNode)会同时从实时节点与历史节点分别查询,对结果整合后返回用户。

查询:

查询首先进入Broker,按照时间进行查询划分
确定哪些历史记录和MiddleManager正在为这些段提供服务
Historical/MiddleManager进程将接受查询,对其进行处理并返回结果
###DataSource
alt
每个datasource按照时间划分。每个时间范围称为一个chunk(一般都是以天分区,则一个chunk为一天)!!!//也可以按其他属性划分
在chunk中数据被分为一个或多个segment,每个segment都是一个单独的文件,通常包含几百万行数据
注:这些segment是按照时间组织成的chunk,所以在按照时间查询数据时,效率非常高。

数据分区:

任何分布式存储/计算系统,都需要对数据进行合理的分区,从而实现存储和计算的均衡,以及数据并行化。
而Druid本身处理的是事件数据,每条数据都会带有一个时间戳,所以很自然的就可以使用时间进行分区。
为什么一个chunk中的数据包含多个segment!!!????原因就是二级分区

二级分区:

很可能每个chunk的数据量是不均衡的,而Duid为了解决这种问题,提供了“二级分区”,每一个二级分区称为一个Shard(分片)
其实chunk、datasource都是抽象的,实际的就是每个分区就是一个Shard,每个Shard只包含一个Segment!!!,因为Segment是Shard持久化的结果
Druid目前支持两种Shard策略:
Hash(基于维值的Hash)
Range(基于某个维度的取值范围)
譬如:
2000-01-01,2000-01-02中的每一个分区都是一个Shard
2000-01-02的数据量比较多,所以有两个Shard,分为partition0、partition1。每个分区都是一个Shard
Shard经过持久化之后就称为了Segment,Segment是数据存储、复制、均衡(Historical的负载均衡)和计算的基本单元了。
Segment具有不可变性,一个Segment一旦创建完成后(MiddleManager节点发布后)就无法被修改,
只能通过生成一个新的Segment来代替旧版本的Segment。

Segment内部存储结构:

Segment内部采用列式存储          //并不是说每列都是一个独立的文件,而是说每列有独立的数据结构,所有列都会存储在一个文件中
Segment中的数据类型主要分为三种:
时间戳
维度列
指标列
对于时间戳列和指标列,实际存储是一个数组
对于维度列不会像指标列和时间戳这么简单,因为它需要支持filter和groupby:
所以Druid使用了字典编码(DictionaryEncoding)和位图索引(BitmapIndex)来存储每个维度列。每个维度列需要三个数据结构:
1、需要一个字典数据结构,将维值(维度列值都会被认为是字符串类型)映射成一个整数ID。
2、使用上面的字典编码,将该列所有维值放在一个列表中。
3、对于列中不同的值,使用bitmap数据结构标识哪些行包含这些值。      //位图索引,这个需要记住
注:使用Bitmap位图索引可以执行快速过滤操作(找到符合条件的行号,以减少读取的数据量)
Druid针对维度列之所以使用这三个数据结构,是因为:
使用字典将字符串映射成整数ID,可以紧凑的表示结构2和结构3中的值。
使用Bitmap位图索引可以执行快速过滤操作(找到符合条件的行号,以减少读取的数据量),因为Bitmap可以快速执行AND和OR操作。
对于groupby和TopN操作需要使用结构2中的列值列表
实例:
1.使用字典将列值映射为整数
{
“JustinBieher”:0,
“ke$ha”:1
}
2.使用1中的编码,将列值放到一个列表中
[0,0,1,1]
3.使用bitmap来标识不同列值
value=0:[1,1,0,0]//1代表该行含有该值,0标识不含有
value=1:[0,0,1,1]
因为是一个稀疏矩阵,所以比较好压缩!!
Druid而且运用了RoaringBitmap能够对压缩后的位图直接进行布尔运算,可以大大提高查询效率和存储效率(不需要解压缩)

Segment命名:

如果一个Datasource下有几百万个Segment文件,我们又如何快速找出我们所需要的文件呢?答案就是通过文件名称快速索引查找。
Segment的命名包含四部分:
数据源(Datasource)、时间间隔(包含开始时间和结束时间两部分)、版本号和分区(Segment有分片的情况下才会有)。
eg:wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-09T10:06:02.498Z
wikipedia:Datasource名称
开始时间:2015-09-12T00:00:00.000Z//该Segment所存储最早的数据,时间格式是ISO8601
结束时间:2015-09-13T00:00:00.000Z//该segment所存储最晚的数据,时间格式是ISO8601
版本号:2019-09-09T10:06:02.498Z//此Segment的启动时间,因为Druid支持批量覆盖操作,
//当批量摄入与之前相同数据源、相同时间间隔数据时,数据就会被覆盖,这时候版本号就会被更新
分片号:从0开始,如果分区号为0,可以省略//分区的表现其实就是分目录
注:单机形式运行Druid,这样Druid生成的Segment文件都在${DRUID_HOME}/var/druid/segments目录下
注:为了保证Druid的查询效率,每个Segment文件的大小建议在300MB~700MB之间
注:版本号的意义:
在druid,如果您所做的只是追加数据,那么每个时间chunk只会有一个版本。
但是当您覆盖数据时,因为druid通过首先加载新数据(但不允许查询)来处理这个问题,一旦新数据全部加载,
切换所有新查询以使用这些新数据。然后它在几分钟后掉落旧段!!!

存储聚合

无论是实时数据消费还是批量数据处理,Druid在基于DataSource机构存储数据时即可选择对任意的指标列进行聚合操作:
1、基于维度列:相同的维度列数据会进行聚合
2、基于时间段:某一时间段的所有行会进行聚合,时间段可以通过queryGranularity参数指定
聚合:提前做好sum,count等操作

Segment生命周期:

在元数据存储中!每个Segment都会有一个used字段,标记该段是否能用于查询

is_Published:

当Segment构建完毕,就将元数据存储在元数据存储区中,此Segment为发布状态

is_available:

如果Segment当前可用于查询(实时任务或历史进程),则为true。

is_realtime:

如果是由实时任务产生的,那么会为true,但是一段时间之后,也会变为false

is_overshadowed:

标记该段是否已被其他段覆盖!处于此状态的段很快就会将其used标志自动设置为false。

© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量