Flink与Spark多方面区别对比

本文转载于
场景描述:Flink是标准的实时处理引擎,而且Spark的两个模块Spark Streaming和Structured Streaming都是基于微批处理的,不过现在Spark Streaming已经非常稳定基本都没有更新了,然后重点移到spark sql和structured Streaming了

Flink和Spark的区别在编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面存在不同。

维表join和异步IO

Structured Streaming不直接支持与维表的join操作,但是可以使用map、flatmap及udf等来实现该功能,所有的这些都是同步算子,不支持异步IO操作。但是Structured Streaming直接与静态数据集的join,可以也可以帮助实现维表的join功能,当然维表要不可变。

Flink支持与维表进行join操作,除了map,flatmap这些算子之外,flink还有异步IO算子,可以用来实现维表,提升性能。

状态管理

状态维护应该是流处理非常核心的概念了,比如join,分组,聚合等操作都需要维护历史状态。那么flink在这方面很好,structured Streaming也是可以,但是spark Streaming就比较弱了,只有个别状态维护算子upstatebykye等,大部分状态需要用户自己维护,虽然这个对用户来说有更大的可操作性和可以更精细控制但是带来了编程的麻烦。flink和Structured Streaming都支持自己完成了join及聚合的状态维护。

Structured Streaming有高级的算子,用户可以完成自定义的mapGroupsWithState和flatMapGroupsWithState,可以理解类似Spark Streaming 的upstatebykey等状态算子。

就拿mapGroupsWithState为例:
由于Flink与Structured Streaming的架构的不同,task是常驻运行的,flink不需要状态算子,只需要状态类型的数据结构。

首先看一下Keyed State下,我们可以用哪些原子状态:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用

reduceFunction,最后合并到一个单一的状态值。

FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。

MapState:即状态值为一个map。用户通过put或putAll方法添加元素。

Join操作

Flink的join操作

flink的join操作没有大的限制,支持种类丰富,比如:

Inner Equi-join

1
SELECT * FROM Orders INNER JOIN Product ONOrders.productId = Product.id

Outer Equi-join

1
2
3
4
5
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId =Product.id

SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId =Product.id

SELECT * FROM Orders FULL OUTER JOIN Product ONOrders.productId = Product.id

Time-windowed Join

1
SELECT * FROM Oderso,Shipmentss WHEREo.id=s.orderIdAND o.ordertimeBETWEENs.shiptime INTERVAL'4'HOURANDs.shiptime

Expanding arrays into a relation

1
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Join with Table Function

1
2
3
4
5
6
7
8
9
10
11
Inner Join

A row of the left (outer) table is dropped, if its table function call returns an empty result.
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

Left Outer Join
If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.

SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

Join with Temporal Table

1
2
3
4
5
6
7
8

SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency

Structured Streaming的join操作
Structured Streaming的join限制颇多了,限于篇幅问题在这里只讲一下join的限制
alt

容错机制及一致性语义

Spark Streaming 保证仅一次处理

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。

由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:
repartition(1) Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:

1
2
3
4
5
6
7

Dstream.foreachRDD(rdd=>{
rdd.repartition(1).foreachPartition(partition=>{ // 开启事务
partition.foreach(each=>{// 提交数据
}) // 提交事务
})
})

将结果和 offset 一起提交

也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。

Flink 与 kafka 0.11 保证仅一次处理

若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。

在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。

本例中的 Flink 应用如图所示包含以下组件:

一个source,从Kafka中读取数据(即KafkaConsumer)

一个时间窗口化的聚会操作

一个sink,将结果写回到Kafka(即KafkaProducer)

alt
下面详细讲解 flink 的两段提交思路

alt

Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。

这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。

当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交

alt

当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图
alt
当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。

本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:

alt
以上就是 flink 实现恰一次处理的基本逻辑。

背压

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。

Spark Streaming 的背压
Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset

PIDRateEsimator 的 compute 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def compute(       
time: Long,
// in milliseconds
numElements: Long,
processingDelay: Long,
// in milliseconds schedulingDelay: Long
// in milliseconds
): Option[Double] = {logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {if (time > latestTime && numElements > 0 && processingDelay > 0) {
val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate/ batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate)
logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)
latestTime = time
if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None }
else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } }
else { logTrace("Rate estimation skipped") None } } }
}
}
}

}

Flink 的背压
与 Spark Streaming 的背压不同的是,Flink 背压是 jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 16 所示:

alt
阻塞占比在 web 上划分了三个等级:

OK: 0 <= Ratio <= 0.10,表示状态良好;
LOW: 0.10 < Ratio <= 0.5,表示有待观察;
HIGH: 0.5 < Ratio <= 1,表示要处理了。

表管理

flink和structured streaming都可以讲流注册成一张表,然后使用sql进行分析,不过两者之间区别还是有些的。
Structured Streaming将流注册成临时表,然后用sql进行查询,操作也是很简单跟静态的dataset/dataframe一样。

1
2
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")

其实,此处回想Spark Streaming 如何注册临时表呢?在foreachRDD里,讲rdd转换为dataset/dataframe,然后将其注册成临时表,该临时表特点是代表当前批次的数据,而不是全量数据。Structured Streaming注册的临时表就是流表,针对整个实时流的。Sparksession.sql执行结束后,返回的是一个流dataset/dataframe,当然这个很像spark sql的sql文本执行,所以为了区别一个dataframe/dataset是否是流式数据,可以df.isStreaming来判断。

当然,flink也支持直接注册流表,然后写sql分析,sql文本在flink中使用有两种形式:

1
2
3
4
5

1). tableEnv.sqlQuery("SELECT product,amount FROM Orders WHERE product LIKE '%Rubber%'")

2). tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHEREproduct LIKE '%Rubber%'");

对于第一种形式,sqlQuery执行结束之后会返回一张表也即是Table对象,然后可以进行后续操作或者直接输出,如:result.writeAsCsv(“”);。
而sqlUpdate是直接将结果输出到了tablesink,所以要首先注册tablesink,方式如下:

1
2
3
4
5
6
7
TableSink csvSink = newCsvTableSink("/path/to/file", ...);

String[] fieldNames = {"product","amount"};

TypeInformation[] fieldTypes ={Types.STRING, Types.INT};

tableEnv.registerTableSink("RubberOrders",fieldNames, fieldTypes, csvSink);

flink注册表的形式比较多,直接用数据源注册表,如:

1
2
tableEnv.registerExternalCatalog();
tableEnv.registerTableSource();

也可以从datastream转换成表,如:

1
2
tableEnv.registerDataStream("Orders",ds, "user, product, amount");
Table table = tableEnv.fromDataStream(ds,"user, product, amount");

Flink端到端状态一致性EXACTLY_ONCE实现

状态一致性:

alt
有状态的流处理,内部每个算子任务都可以有自己的状态;

对于流处理器内部(没有接入sink)来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确;

一条数据不应该丢失,也不应该重复计算;

在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正常的

状态一致性分类:

AT_MOST_ONCE(最多一次),当任务故障时最简单做法是什么都不干,既不恢复丢失状态,也不重播丢失数据。At-most-once语义的含义是最多处理一次事件。

AT_LEAST_ONCE(至少一次),在大多数真实应用场景,我们希望不丢失数据。这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。

EXACTLY_ONCE(精确一次),恰好处理一次是最严格的的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。

一致性检查点(Checkpoints)

Flink使用了一种轻量级快照机制 — 检查点(Checkpoint)来保证exactly-one语义;

有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

应用状态的一致性检查点,是Flink故障恢复机制的核心。

alt

端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如kafka)和输出到持久化系统;

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终,每个组件都保证了它自己的一致性;

整个端到端的一致性级别取决于所有组件中一致性最弱的组件;

端到端exactly-once

内部保证 — checkpoint

source端 — 可重设数据的读取位置;可重新读取偏移量

sink端 – 从故障恢复时,数据不会重复写入外部系统:幂等写入和事务写入;

幂等写入(Idempotent Writes):

幂等操作即一个操作可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了;

alt

它的原理不是不重复写入而是重复写完之后结果还是一样;它的瑕疵是不能做到完全意义上exactly-once(在故障恢复时,突然把外部系统写入操作跳到之前的某个状态然后继续往里边写,故障之前发生的那一段的状态变化又重演了直到最后发生故障那一刻追上就正常了;假如中间这段又被读取了就可能会有些问题);

事务写入(Transactional Writes):

事务Transactional Writes

应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销;
具有原子性,一个事务中的一系列的操作要么全部成功,要么一个都不做。
实现思想:构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中。

实现方式:预习日志和两阶段提交

预习日志(Write-Ahead-Log,WAL)

把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统;

简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定;

DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink;

瑕疵:
A. sink系统没说它支持事务,有可能出现一部分写进去了一部分没写进去(如果算失败,再写一次就写了两次了);

B. checkpoint做完了sink才去真正写入(但其实得等sink都写完checkpoint才能生效,所以WAL这个机制jobmanager确定它写完还不算真正写完,还得有一个外部系统已经确认完成的checkpoint)

两阶段提交(Two–Phase–Commit,2PC)– 真正能够实现exactly-once

对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里;

然后将这些数据写入外部sink系统,但不提交他们 – 这时只是预提交;

当它收到checkpoint完成时的通知,它才正式提交事务,实现结果的真正写入;

这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统,Flink提供了TwoPhaseCommitSinkFunction接口

2PC对外部sink的要求

外部sink系统必须事务支持,或者sink任务必须能够模拟外部系统上的事务;

在checkpoint的间隔期间里,必须能够开启一个事务,并接受数据写入;

在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态,在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失;

sink任务必须能够在进程失败后恢复事务;

提交事务必须是幂等操作;

不同Source和sink的一致性保证:

alt

Flink+kafka端到端状态一致性的保证
Flink和kafka天生就是一对,用kafka做为source,用kafka做完sink <===> 实现端到端的一致性

内部 – 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性;

source – kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性;

sink – kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

alt

alt

alt

默认是AT_LEAST_ONCE

Exactly-once两阶段提交

alt
JobManager协调各个TaskManager进行checkpoint存储;

checkpoint保存在StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存;

alt
当checkpoint启动时,JobManager会将检查点分界线(barrier)注入数据流;

barrier会在算子间传递下去;

alt

每个算子会对当前的状态做个快照,保存到状态后端;

checkpoint机制可以保证内部的状态一致性;

alt
每个内部的transform任务遇到barrier时,都会把状态存到checkpoint里;

sink任务首先把数据写入外部kafka,这些数据都属于预提交的事务;遇到barrier时,把状态保存到状态后端,并开启新的预提交事务(以barrier为界之前的数据属于上一个事务,之后的数据属于下一个新的事务);

alt
当所有算子任务的快照完成,也就是这次的checkpoint完成时,JobManager会向所有任务发通知,确认这次checkpoint完成;

sink任务收到确认通知,正式提交之前的事务,kafka中未确认数据改完“已确认”;

Exactly-once两阶段提交步骤:

第一条数据来在之后,开启一个kafka的事务(transaction),正常写入kafka分区日志但标记为未提交,这就是“预提交”;

JobManagere触发checkpoint操作,barrier从source开始向下传递,遇到barrier的算子将状态存入状态后端,并通知JobManagere;

sink连接器接收到barrier,保存当前状态,存入checkpoint,通知JobManager并开启下一阶段的事务,用于提交下个检查点的数据;

JobManager收到所有任务的通知,发出确认信息,表示checkpoint完成;

sink任务收到JobManager的确认信息,正式提交这段时间的数据;

外部kafka关闭事务,提交的数据可以正常消费了。

在代码中真正实现flink和kafak的端到端exactly-once语义:

alt

alt

A. 这里需要配置下,因为它默认的是AT_LEAST_ONCE;

B. 对于外部kafka读取的消费者的隔离级别,默认是read_on_commited,如果默认是可以读未提交的数据,就相当于整个一致性还没得到保证(未提交的数据没有最终确认那边就可以读了,相当于那边已经消费数据了,事务就是假的了) 所以需要修改kafka的隔离级别;

C. timeout超时问题,flink和kafka 默认sink是超时1h,而kafak集群中配置的tranctraction事务的默认超时时间是15min,flink-kafak这边的连接器的时间长,这边还在等着做操作 ,kafak那边等checkpoint等的时间太长直接关闭了。所以两边的超时时间最起码前边要比后边的小

Flink使用assignAscendingTimestamps 生成水印的三个重载方法

先简单介绍一下Timestamp 和Watermark 的概念:

  1. Timestamp和Watermark都是基于事件的时间字段生成的
  2. Timestamp和Watermark是两个不同的东西,并且一旦生成都跟事件数据没有关系了(所有即使事件中不再包含生成Timestamp和Watermark的字段也没关系)
  3. 事件数据和 Timestamp 一一对应(事件在流中传递以StreamRecord对象表示,value 和 timestamp 是它的两个成员变量)
  4. Watermark 在生成之后与事件数据没有直接关系,Watermark 作为一个消息,和事件数据一样在流中传递(Watermark 和StreamRecord 具有相同的父类:StreamElement)
  5. Timestamp 与 Watermark 在生成之后,会在下游window算子中做比较,判断事件数据是否是过期数据
  6. 只有window算子才会用Watermark判断事件数据是否过期

assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]

此方法是数据流的快捷方式,其中已知元素时间戳在每个并行流中单调递增。在这种情况下,系统可以通过跟踪上升时间戳自动且完美地生成水印。

1
2
3
4
5
6
7
8
9
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// flink auto create timestamp & watermark
.assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

注:这种方法创建时间戳与水印最简单,返回一个long类型的数字就可以了

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

基于给定的水印生成器生成水印,即使没有新元素到达也会定期检查给定水印生成器的新水印,以指定允许延迟时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks periodically(定期生成水印)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
override def extractTimestamp(element: LateDataEvent): Long = {
println("want watermark : " + sdf.parse(element.createTime).getTime)
sdf.parse(element.createTime).getTime
}
})

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

此方法仅基于流元素创建水印,对于通过[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]处理的每个元素,
调用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大于以前的水印,会发出新的水印,
此方法可以完全控制水印的生成,但是要注意,每秒生成数百个水印会影响性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks every event
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
// check extractTimestamp emitted watermark is non-null and large than previously
override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
// generate next watermark
override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
val eventTime = sdf.parse(element.createTime).getTime
eventTime
}
})

转载于

Flink使用SQL操作几种类型的window

Flink SQL 支持三种窗口类型, 分别为 Tumble Windows / HOP Windows 和 Session Windows. 其中 HOP windows 对应 Table API 中的 Sliding Window, 同时每种窗口分别有相应的使用场景和方法

Tumble Window(翻转窗口)
Hop Window(滑动窗口)
Session Window(会话窗口)

HOPWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class HOPWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// HOP(time_attr, interval1, interval2)
// interval1 滑动长度
// interval2 窗口长度
Table result = tEnv.sqlQuery("SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start," +
"HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}

}

TumbleWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class TumbleWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
"TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}


}

SessionWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class SessionWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),

/* Start Session */
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),

/* Start Session */
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// SESSION(time_attr, interval)
// interval 表示两条数据触发session的最大间隔
Table result = tEnv.sqlQuery("SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start," +
"SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}
}

AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

AppendStreamTableSink: 可将动态表转换为Append流。适用于动态表只有Insert的场景。

RetractStreamTableSink: 可将动态表转换为Retract流。适用于动态表有Insert、Delete、Update的场景。

UpsertStreamTableSink: 可将动态表转换为Upsert流。适用于动态表有Insert、Delete、Update的场景。

注意:

RetractStreamTableSink中: Insert被编码成一条Add消息; Delete被编码成一条Retract消息;Update被编码成两条消息(先是一条Retract消息,再是一条Add消息),即先删除再增加。

UpsertStreamTableSink: Insert和Update均被编码成一条消息(Upsert消息); Delete被编码成一条Delete消息。

UpsertStreamTableSink和RetractStreamTableSink最大的不同在于Update编码成一条消息,效率上比RetractStreamTableSink高。

上述说的编码指的是动态表转换为DataStream时,表的增删改如何体现到DataStream上。

测试数据

1
2
3
4
5
6
7
8
9
10
11
12
{"userID": "user_1", "eventTime": "2016-01-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_2", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_3", "eventTime": "2016-01-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_4", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_5", "eventTime": "2016-01-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}
{"userID": "user_1", "eventTime": "2016-01-01 10:03:16", "eventType": "browse", "productID": "product_5", "productPrice": 20}

AppendStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class AppendStreamTableSink {

public static void main(String[] args) throws Exception {

// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
// props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
// props.setProperty("connector.version", "universal");
// DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer011<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

// DataStream<UserBrowseLog> browseStream=streamExecutionEnvironment
// .addSource(new FlinkKafkaConsumer010<>("demo", new SimpleStringSchema(), props))
// .process(new BrowseKafkaProcessFunction());

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "eventTime", "eventType", "productID", "productPrice",
"eventTimeTimestamp" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.INT(), DataTypes.BIGINT() };
TableSink<Row> myAppendStreamTableSink = new MyAppendStreamTableSink(sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout", myAppendStreamTableSink);

// 5、连续查询

String sql = "insert into sink_stdout select userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp from source_kafka_browse_log where userID='user_1'";
tableEnvironment.sqlUpdate(sql);

tableEnvironment.execute(AppendStreamTableSink.class.getSimpleName());

}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

private static class MyAppendStreamTableSink implements org.apache.flink.table.sinks.AppendStreamTableSink<Row> {

private TableSchema tableSchema;

public MyAppendStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}

@Override
public DataType getConsumedDataType() {
return tableSchema.toRowDataType();
}

// 已过时
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}

@Override
public DataStreamSink<Row> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new SinkFunction());
}

private static class SinkFunction extends RichSinkFunction<Row> {
public SinkFunction() {
}

@Override
public void invoke(Row value, Context context) throws Exception {
System.out.println(value);
}
}

}

}

结果

1
2
3
4
5
6
user_1,2016-01-01 10:02:00,browse,product_5,20,1451613720000
user_1,2016-01-01 10:02:16,browse,product_5,20,1451613736000
user_1,2016-01-01 10:02:15,browse,product_5,20,1451613735000
user_1,2016-01-01 10:02:02,browse,product_5,20,1451613722000
user_1,2016-01-01 10:02:06,browse,product_5,20,1451613726000
user_1,2016-01-01 10:03:16,browse,product_5,20,1451613796000

RetractStreamTableSink 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.model.UserBrowseLog;

public class RetractStreamTableSink {

public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.RetractStreamTableSink<Row> retractStreamTableSink = new MyRetractStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}

/**
* 自定义RetractStreamTableSink
*
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* DataStream里的数据格式是Tuple2类型,如Tuple2<Boolean, Row>。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
private static class MyRetractStreamTableSink implements org.apache.flink.table.sinks.RetractStreamTableSink<Row> {

private TableSchema tableSchema;

public MyRetractStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames, fieldTypes).build();
}

public TableSchema getTableSchema() {
return tableSchema;
}

// 已过时
public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
}

// 最终会转换成DataStream处理
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}

public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames());
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if (flag) {
System.out.println("增加... " + value);
} else {
System.out.println("删除... " + value);
}
}
}
}

}

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
增加... (true,user_2,1)
删除... (false,user_2,1)
增加... (true,user_2,2)
增加... (true,user_1,1)
删除... (false,user_1,1)
增加... (true,user_1,2)
删除... (false,user_1,2)
增加... (true,user_1,3)
删除... (false,user_1,3)
增加... (true,user_1,4)
删除... (false,user_1,4)
增加... (true,user_1,5)
删除... (false,user_1,5)
增加... (true,user_1,6)

UpsertStreamTableSink示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.shuai.test;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import com.alibaba.fastjson.JSON;
import com.shuai.test.RetractStreamTableSink.BrowseKafkaProcessFunction;
import com.shuai.test.model.UserBrowseLog;

public class UpsertStreamTableSink {
public static void main(String[] args) throws Exception {
// 设置运行环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment,
environmentSettings);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("demo", new SimpleStringSchema(), props);

DataStream<UserBrowseLog> browseStream = streamExecutionEnvironment.addSource(consumer)
.process(new BrowseKafkaProcessFunction()).setParallelism(1);

tableEnvironment.registerDataStream("source_kafka_browse_log", browseStream,
"userID,eventTime,eventType,productID,productPrice,eventTimeTimestamp");

// 4、注册AppendStreamTableSink
String[] sinkFieldNames = { "userID", "browseNumber" };
DataType[] sinkFieldTypes = { DataTypes.STRING(), DataTypes.BIGINT() };
org.apache.flink.table.sinks.UpsertStreamTableSink<Row> retractStreamTableSink = new MyUpsertStreamTableSink(
sinkFieldNames, sinkFieldTypes);
tableEnvironment.registerTableSink("sink_stdout",retractStreamTableSink);

//5、连续查询
//统计每个Uid的浏览次数
String sql="insert into sink_stdout select userID,count(1) as browseNumber from source_kafka_browse_log where userID in ('user_1','user_2') group by userID ";
tableEnvironment.sqlUpdate(sql);


//6、开始执行
tableEnvironment.execute(RetractStreamTableSink.class.getSimpleName());



}

public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {

@Override
public void processElement(String value, ProcessFunction<String, UserBrowseLog>.Context ctx,
Collector<UserBrowseLog> out) throws Exception {
UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
// 增加一个long类型的时间戳
// 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
java.time.format.DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format)
.atOffset(ZoneOffset.of("+08:00"));
// 转换成毫秒时间戳
long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
browseLog.setEventTimeTimestamp(eventTimeTimestamp);

out.collect(browseLog);

}

}


/**
* 自定义UpsertStreamTableSink
* Table在内部被转换成具有Add(增加)和Retract(撤消/删除)的消息流,最终交由DataStream的SinkFunction处理。
* Boolean是Add(增加)或Retract(删除)的flag(标识)。Row是真正的数据类型。
* Table中的Insert被编码成一条Add消息。如Tuple2<True, Row>。
* Table中的Update被编码成一条Add消息。如Tuple2<True, Row>。
* 在SortLimit(即order by ... limit ...)的场景下,被编码成两条消息。一条删除消息Tuple2<False, Row>,一条增加消息Tuple2<True, Row>。
*/
@SuppressWarnings("unused")
private static class MyUpsertStreamTableSink implements org.apache.flink.table.sinks.UpsertStreamTableSink<Row> {

private TableSchema tableSchema;

@SuppressWarnings("unused")
public MyUpsertStreamTableSink(String[] fieldNames, DataType[] fieldTypes) {
this.tableSchema = TableSchema.builder().fields(fieldNames,fieldTypes).build();
}

@Override
public TableSchema getTableSchema() {
return tableSchema;
}


// 设置Unique Key
// 如上SQL中有GroupBy,则这里的唯一键会自动被推导为GroupBy的字段
@Override
public void setKeyFields(String[] keys) {}

// 是否只有Insert
// 如上SQL场景,需要Update,则这里被推导为isAppendOnly=false
@Override
public void setIsAppendOnly(Boolean isAppendOnly) {}

@Override
public TypeInformation<Row> getRecordType() {
return new RowTypeInfo(tableSchema.getFieldTypes(),tableSchema.getFieldNames());
}

// 已过时
public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {}

// 最终会转换成DataStream处理

public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
return dataStream.addSink(new SinkFunction());
}


public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
return null;
}

private static class SinkFunction extends RichSinkFunction<Tuple2<Boolean, Row>> {
/**
*
*/
private static final long serialVersionUID = 1L;

public SinkFunction() {
}

@Override
public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
Boolean flag = value.f0;
if(flag){
System.out.println("增加... "+value);
}else {
System.out.println("删除... "+value);
}
}
}
}


}

结果

1
2
3
4
5
6
7
8
增加... (true,user_1,1)
增加... (true,user_1,2)
增加... (true,user_1,3)
增加... (true,user_1,4)
增加... (true,user_1,5)
增加... (true,user_1,6)
增加... (true,user_2,1)
增加... (true,user_2,2)

Kafka to Flink - HDFS

kafka制造数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.flink.etl;

import lombok.AllArgsConstructor;
import lombok.ToString;

@lombok.Data
@ToString

public class Data {

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

private long timestamp;
private String event;
private String uuid;

public Data(long timestamp, String event, String uuid) {

this.timestamp=timestamp;
this.event=event;
this.uuid=uuid;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.flink.etl;

import com.alibaba.fastjson.JSON;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;



public class DataGenerator {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.3.122:9092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
List<String> events = Arrays.asList("page_view", "adv_click", "thumbs_up");
Random random = new Random();
Data data = null;
ProducerRecord<String, String> record = null;
try {
while (true) {
long timestamp = System.currentTimeMillis();
String event = events.get(random.nextInt(events.size()));
String uuid = UUID.randomUUID().toString();
data = new Data(timestamp, event, uuid);
record = new ProducerRecord<>("data-collection-topic", JSON.toJSONString(data));
producer.send(record);
Thread.sleep(3000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}

}

StreamingJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.flink.etl;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"data-collection-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.create()
.withMaxPartSize(1024*1024*128) // 设置每个文件的最大大小 ,默认是128M。这里设置为128M
.withRolloverInterval(TimeUnit.MINUTES.toMillis(86400)) // 滚动写入新文件的时间,默认60s。这里设置为无限大
.withInactivityInterval(TimeUnit.MINUTES.toMillis(60)) // 60s空闲,就滚动写入新的文件
.build();
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("hdfs://DATASEA/home/dmp_operator1/bpointdata"), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner()).withRollingPolicy(rollingPolicy).build();
stream.addSink(sink);
env.enableCheckpointing(10_000);
env.setParallelism(1);
env.setStateBackend((StateBackend) new FsStateBackend("hdfs://DATASEA/home/dmp_operator1/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}

EventTimeBucketAssigner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.flink.etl;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
private ObjectMapper mapper = new ObjectMapper();

@Override
public String getBucketId(String element, Context context) {
JsonNode node = null;
long date = System.currentTimeMillis();
try {
node = mapper.readTree(element);
// date = (long) (node.path("timestamp").floatValue() * 1000);
date = (long) (node.path("timestamp").floatValue());
} catch (IOException e) {
e.printStackTrace();
}
// String partitionValue = new SimpleDateFormat("yyyyMMddHHmm").format(new Date(date));
String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));
return "dt=" + partitionValue;
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}

Druid原理和架构

Druid简介

概念:主要是解决低延迟下实时数据摄入与查询的平台,本质是一个数据存储,但是数据仍然是保存在(hdfs、文件系统等)中。
特点:
① 列式存储格式:
可以将列作为索引,为仅查看几列的查询提供了巨大的速度提升
② 高可用、高并发:
① 集群扩展、缩小、删除、宕机都不会停止服务,全天候运行
② HA、sql的并行化执行、可扩展、容灾等
③ 支持1000+的并发用户,并提供隔离机制支持多租户模式(多租户就是并发互不影响)
④ 低延迟
Druid采用了列式存储、倒排索引、位图索引等关键技术,能够在亚秒级别内完成海量数据的过滤、聚合以及多维分析等操作。
⑤ 存储时候聚合:
无论是实时数据消费还是批量数据处理,Druid在基于DataSource结构存储数据时即可选择对任意的指标列进行聚合操作
聚合:提前做好sum,count等操作

Druid架构

alt
总体可以分为:四个节点+三个依赖

四个节点:

实时节点(RealtimeNode)(新版本的druid好像没有实时节点的说法了):

实时摄入数据,对于旧的数据周期性的生成segment数据文件,上传到deepstorage中
为了避免单点故障,索引服务(Indexer)的主从架构已经逐渐替代了实时节点,所以现在的实时节点,其实里面包含了很多角色:
作用:可以通过索引服务的API,写数据导入任务,用以新增、删除、合并Segment等。是一个主从架构:

统治节点(overlord):

类似于YarnResourceManager:负责集群资源的管理和分配
监视数据服务器上的MiddleManager进程,将提取任务分配给MiddleManager

中间管理者(middlemanager):

类似于YarnNodeManager:负责单个节点资源的管理和分配
新数据提取到群集中的过程。他们负责从外部数据源读取并发布新的段

苦工(peon):

类似于Yarncontainer:负责具体任务的执行
Peon进程是由MiddleManagers产生的任务执行引擎。
每个Peon运行一个单独的JVM,并负责执行单个任务。
Peon总是与生成它们的MiddleManager在同一主机上运行

Router(路由:可选):

可在Druid代理,统治节点和协调器之前提供统一的API网关
注:统治节点和中间管理者的通信是通过zookeeper完成的

历史节点(HistoricalNode):

加载已生成的segment数据文件,以供数据查询
启动或者受到协调节点通知的时候,通过druid_rules表去查找需要加载的数据,然后检查自身的本地缓存中已存在的Segment数据文件,
然后从DeepStorage中下载其他不在本地的Segment数据文件,后加载到内存!!!再提供查询。

查询节点(BrokerNode):

对外提供数据查询服务,并同时从实时节点与历史节点查询数据,合并后返回给调用方
缓存:外部:第三方的一些缓存系统内部:在历史节点或者查询节点做缓存

协调节点(CoodinatorNode):

负责历史节点的数据负载均衡,以及通过规则(Rule)管理数据的生命周期
① 通过从MySQL读取元数据信息,来决定深度存储上哪些数据段应该在那个历史节点中被加载,
② 通过ZK感知历史节点,历史节点增加,会自动分配相关的Segment,历史节点删除,会将原本在这台节点上的Segment分配给其他的历史节点
注:Coordinator是定期运行的,并且运行间隔可以通过配置参数配置

三个依赖:

1)Mysql:

存储关于Druid中的metadata,规则数据,配置数据等,
主要包含以下几张表:
“druid_config”(通常是空的),
“druid_rules”(协作节点使用的一些规则信息,比如哪个segment从哪个node去load)
“druid_segments”(存储每个segment的metadata信息);

2)Deepstorage:

存储segments,Druid目前已经支持本地磁盘,NFS挂载磁盘,HDFS,S3等。

3)ZooKeeper:

① 查询节点通过Zk来感知实时节点和历史节点的存在,提供查询服务。
② 协调节点通过ZK感知历史节点,实现负载均衡
③ 统治节点、协调节点的lead选举

实时Segment数据文件的流动:

生成:

① 实时节点(中间管理者)会周期性的将同一时间段生成的数据合并成一个Segment数据文件,并上传到DeepStorage中。
② Segment数据文件的相关元数据信息保存到MetaStore中(如mysql,derby等)。
③ 协调节点定时(默认1分钟)从MetaSotre中获取到Segment数据文件的相关元信息后,将按配置的规则分配到符合条件的历史节点中。
④ 协调节点会通知一个历史节点去读
⑤ 历史节点收到协调节点的通知后,会从DeepStorage中拉取该Segment数据文件到本地磁盘,并通过zookeeper向集群声明可以提供查询了。
⑥ 实时节点会丢弃该Segment数据文件,并通过zookeeper向集群声明不在提供该Sgment的查询服务。              //其实第四步已经可以提供查询服务了
⑦ 而对于全局数据来说,查询节点(BrokerNode)会同时从实时节点与历史节点分别查询,对结果整合后返回用户。

查询:

查询首先进入Broker,按照时间进行查询划分
确定哪些历史记录和MiddleManager正在为这些段提供服务
Historical/MiddleManager进程将接受查询,对其进行处理并返回结果
###DataSource
alt
每个datasource按照时间划分。每个时间范围称为一个chunk(一般都是以天分区,则一个chunk为一天)!!!//也可以按其他属性划分
在chunk中数据被分为一个或多个segment,每个segment都是一个单独的文件,通常包含几百万行数据
注:这些segment是按照时间组织成的chunk,所以在按照时间查询数据时,效率非常高。

数据分区:

任何分布式存储/计算系统,都需要对数据进行合理的分区,从而实现存储和计算的均衡,以及数据并行化。
而Druid本身处理的是事件数据,每条数据都会带有一个时间戳,所以很自然的就可以使用时间进行分区。
为什么一个chunk中的数据包含多个segment!!!????原因就是二级分区

二级分区:

很可能每个chunk的数据量是不均衡的,而Duid为了解决这种问题,提供了“二级分区”,每一个二级分区称为一个Shard(分片)
其实chunk、datasource都是抽象的,实际的就是每个分区就是一个Shard,每个Shard只包含一个Segment!!!,因为Segment是Shard持久化的结果
Druid目前支持两种Shard策略:
Hash(基于维值的Hash)
Range(基于某个维度的取值范围)
譬如:
2000-01-01,2000-01-02中的每一个分区都是一个Shard
2000-01-02的数据量比较多,所以有两个Shard,分为partition0、partition1。每个分区都是一个Shard
Shard经过持久化之后就称为了Segment,Segment是数据存储、复制、均衡(Historical的负载均衡)和计算的基本单元了。
Segment具有不可变性,一个Segment一旦创建完成后(MiddleManager节点发布后)就无法被修改,
只能通过生成一个新的Segment来代替旧版本的Segment。

Segment内部存储结构:

Segment内部采用列式存储          //并不是说每列都是一个独立的文件,而是说每列有独立的数据结构,所有列都会存储在一个文件中
Segment中的数据类型主要分为三种:
时间戳
维度列
指标列
对于时间戳列和指标列,实际存储是一个数组
对于维度列不会像指标列和时间戳这么简单,因为它需要支持filter和groupby:
所以Druid使用了字典编码(DictionaryEncoding)和位图索引(BitmapIndex)来存储每个维度列。每个维度列需要三个数据结构:
1、需要一个字典数据结构,将维值(维度列值都会被认为是字符串类型)映射成一个整数ID。
2、使用上面的字典编码,将该列所有维值放在一个列表中。
3、对于列中不同的值,使用bitmap数据结构标识哪些行包含这些值。      //位图索引,这个需要记住
注:使用Bitmap位图索引可以执行快速过滤操作(找到符合条件的行号,以减少读取的数据量)
Druid针对维度列之所以使用这三个数据结构,是因为:
使用字典将字符串映射成整数ID,可以紧凑的表示结构2和结构3中的值。
使用Bitmap位图索引可以执行快速过滤操作(找到符合条件的行号,以减少读取的数据量),因为Bitmap可以快速执行AND和OR操作。
对于groupby和TopN操作需要使用结构2中的列值列表
实例:
1.使用字典将列值映射为整数
{
“JustinBieher”:0,
“ke$ha”:1
}
2.使用1中的编码,将列值放到一个列表中
[0,0,1,1]
3.使用bitmap来标识不同列值
value=0:[1,1,0,0]//1代表该行含有该值,0标识不含有
value=1:[0,0,1,1]
因为是一个稀疏矩阵,所以比较好压缩!!
Druid而且运用了RoaringBitmap能够对压缩后的位图直接进行布尔运算,可以大大提高查询效率和存储效率(不需要解压缩)

Segment命名:

如果一个Datasource下有几百万个Segment文件,我们又如何快速找出我们所需要的文件呢?答案就是通过文件名称快速索引查找。
Segment的命名包含四部分:
数据源(Datasource)、时间间隔(包含开始时间和结束时间两部分)、版本号和分区(Segment有分片的情况下才会有)。
eg:wikipedia_2015-09-12T00:00:00.000Z_2015-09-13T00:00:00.000Z_2019-09-09T10:06:02.498Z
wikipedia:Datasource名称
开始时间:2015-09-12T00:00:00.000Z//该Segment所存储最早的数据,时间格式是ISO8601
结束时间:2015-09-13T00:00:00.000Z//该segment所存储最晚的数据,时间格式是ISO8601
版本号:2019-09-09T10:06:02.498Z//此Segment的启动时间,因为Druid支持批量覆盖操作,
//当批量摄入与之前相同数据源、相同时间间隔数据时,数据就会被覆盖,这时候版本号就会被更新
分片号:从0开始,如果分区号为0,可以省略//分区的表现其实就是分目录
注:单机形式运行Druid,这样Druid生成的Segment文件都在${DRUID_HOME}/var/druid/segments目录下
注:为了保证Druid的查询效率,每个Segment文件的大小建议在300MB~700MB之间
注:版本号的意义:
在druid,如果您所做的只是追加数据,那么每个时间chunk只会有一个版本。
但是当您覆盖数据时,因为druid通过首先加载新数据(但不允许查询)来处理这个问题,一旦新数据全部加载,
切换所有新查询以使用这些新数据。然后它在几分钟后掉落旧段!!!

存储聚合

无论是实时数据消费还是批量数据处理,Druid在基于DataSource机构存储数据时即可选择对任意的指标列进行聚合操作:
1、基于维度列:相同的维度列数据会进行聚合
2、基于时间段:某一时间段的所有行会进行聚合,时间段可以通过queryGranularity参数指定
聚合:提前做好sum,count等操作

Segment生命周期:

在元数据存储中!每个Segment都会有一个used字段,标记该段是否能用于查询

is_Published:

当Segment构建完毕,就将元数据存储在元数据存储区中,此Segment为发布状态

is_available:

如果Segment当前可用于查询(实时任务或历史进程),则为true。

is_realtime:

如果是由实时任务产生的,那么会为true,但是一段时间之后,也会变为false

is_overshadowed:

标记该段是否已被其他段覆盖!处于此状态的段很快就会将其used标志自动设置为false。

Kafka 是如何保证数据可靠性和一致性

数据可靠性

Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从 Producter 往 Broker 发送消息、Topic 分区副本以及 Leader 选举几个角度介绍数据的可靠性。

Topic 分区副本

在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3。

Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

Producer 往 Broker 发送消息

如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:

acks = 0:意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka 。在这种情况下还是有可能发生错误,比如发送的对象无能被序列化或者网卡发生故障,但如果是分区离线或整个集群长时间不可用,那就不会收到任何错误。在 acks=0 模式下的运行速度是非常快的(这就是为什么很多基准测试都是基于这个模式),你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式, 一定会丢失一些消息。

acks = 1:意味若 Leader 在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应。在这个模式下,如果发生正常的 Leader 选举,生产者会在选举时收到一个 LeaderNotAvailableException 异常,如果生产者能恰当地处理这个错误,它会重试发送悄息,最终消息会安全到达新的 Leader 那里。不过在这个模式下仍然有可能丢失数据,比如消息已经成功写入 Leader,但在消息被复制到 follower 副本之前 Leader发生崩溃。

acks = all(这个和 request.required.acks = -1 含义一样):意味着 Leader 在返回确认或错误响应之前,会等待所有同步副本都收到悄息。如果和 min.insync.replicas 参数结合起来,就可以决定在返回确认前至少有多少个副本能够收到悄息,生产者会一直重试直到消息被成功提交。不过这也是最慢的做法,因为生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。

根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。
另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。

Leader 选举

在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的,具体可以参见《图文了解 Kafka 的副本复制机制》。只有 ISR 里的成员才有被选为 leader 的可能。

所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:

producer 级别:acks=all(或者 request.required.acks=-1),同时发生模式为同步 producer.type=sync
topic 级别:设置 replication.factor>=3,并且 min.insync.replicas>=2;
broker 级别:关闭不完全的 Leader 选举,即 unclean.leader.election.enable=false;

数据一致性

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

Kafka 生产者分区策略

1)分区的原因
1)方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic
又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
(2)可以提高并发,因为可以以 Partition 为单位读写了。
2)分区的原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象。
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition
数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后
面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition
值,也就是常说的 round-robin 算法。

消费者分区分配策略

Range 范围分区(默认的)

假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个
最后分配结果如下

C1:0,1,2,3
C2:4,5,6
C3:7,8,9

如果有11个分区将会是:

C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)

在这种情况下,C1多消费了两个分区

RoundRobin 轮询分区

把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer

假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)

有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)

分区将会按照一定的顺序排列起来,消费者将会组成一个环状的结构,然后开始轮询。
P0-0分配给C0
P0-1分配给C1但是C1并没订阅T0,于是跳过C1把P0-1分配给C2,
P0-2分配给C0
P1-0分配给C1,
P1-1分配给C0,
P2-0分配给C1,
P2-1分配给C2,
P2-2分配给C1,
p2-3分配给C2

C0: P0-0,P0-2,P1-1
C1:P1-0,P2-0,P2-2
C2:P0-1,P2-1,P2-3

Python数据质量检测

1. 重复值检查

1.1 什么是重复值
重复值的检查首先要明确一点,即重复值的定义。对于一份二维表形式的数据集来说,什么是重复值?主要有两个层次:
① 关键字段出现重复记录,比如主索引字段出现重复;
② 所有字段出现重复记录。
第一个层次是否是重复,必须从这份数据的业务含义进行确定。比如一张表,从业务上讲,一个用户应该只会有一条记录,那么如果某个用户出现了超过一条的记录,那么这就是重复值。第二个层次,就一定是重复值了。
1.2 重复值产生的原因
重复值的产生主要有两个原因,一是上游源数据造成的,二是数据准备脚本中的数据关联造成的。从数据准备角度来看,首先检查数据准备的脚本,判断使用的源表是否有重复记录,同时检查关联语句的正确性和严谨性,比如关联条件是否合理、是否有限定数据周期等等。
比如:检查源表数据是否重复的SQL:

1
2
3
SELECT MON_ID,COUNT(*),COUNT(DISTINCT USER_ID)
FROM TABLE_NAME
GROUP BY MON_ID;

如果是上游源数据出现重复,那么应该及时反映给上游进行修正;如果是脚本关联造成的,修改脚本,重新生成数据即可。
还有一份情况,这份数据集是一份单独的数据集,并不是在数据仓库中开发得到的数据,既没有上游源数据,也不存在生成数据的脚本,比如公开数据集,那么如何处理其中的重复值?一般的处理方式就是直接删除重复值。

1
2
3
4
5
6
7
8
import pandas as pd
dataset = pd.read_excel("/labcenter/python/dataset.xlsx")
#判断重复数据
dataset.duplicated() #全部字段重复是重复数据
dataset.duplicated(['col2']) #col2字段重复是重复数据
#删除重复数据
dataset.drop_duplicates() #全部字段重复是重复数据
dataset.drop_duplicates(['col2']) #col2字段重复是重复数据

2. 缺失值检查

缺失值主要是指数据集中部分记录存在部分字段的信息缺失。

2.1 缺失值出现的原因
出现趋势值主要有三种原因:
① 上游源系统因为技术或者成本原因无法完全获取到这一信息,比如对用户手机APP上网记录的解析;
② 从业务上讲,这一信息本来就不存在,比如一个学生的收入,一个未婚者的配偶姓名;
③ 数据准备脚本开发中的错误造成的。
第一种原因,短期内无法解决;第二种原因,数据的缺失并不是错误,无法避免;第三种原因,则只需通过查证修改脚本即可。
缺失值的存在既代表了某一部分信息的丢失,也影响了挖掘分析结论的可靠性与稳定性,因此,必须对缺失值进行处理。
如果缺失值记录数超过了全部记录数的50%,则应该从数据集中直接剔除掉该字段,尝试从业务上寻找替代字段;
如果缺失值记录数没有超过50%,则应该首先看这个字段在业务上是否有替代字段,如果有,则直接剔除掉该字段,如果没有,则必须对其进行处理。

1
2
3
4
5
6
7
##查看哪些字段有缺失值   
dataset.isnull().any() #获取含有NaN的字段
##统计各字段的缺失值个数
dataset.isnull().apply(pd.value_counts)
##删除含有缺失值的字段
nan_col = dataset.isnull().any()
dataset.drop(nan_col[nan_col].index,axis=1)

2.2 缺失值的处理
缺失值的处理主要有两种方式:过滤和填充。

(1)缺失值的过滤
直接删除含有缺失值的记录,总体上会影响样本个数,如果删除样本过多或者数据集本来就是小数据集时,这种方式并不建议采用。

1
2
##删除含有缺失值的记录
dataset.dropna()

(2)缺失值的填充
缺失值的填充主要三种方法:
① 方法一:使用特定值填充
使用缺失值字段的平均值、中位数、众数等统计量填充。
优点:简单、快速
缺点:容易产生数据倾斜
② 方法二:使用算法预测填充
将缺失值字段作为因变量,将没有缺失值字段作为自变量,使用决策树、随机森林、KNN、回归等预测算法进行缺失值的预测,用预测结果进行填充。
优点:相对精确
缺点:效率低,如果缺失值字段与其他字段相关性不大,预测效果差
③ 方法三:将缺失值单独作为一个分组,指定值进行填充
从业务上选择一个单独的值进行填充,使缺失值区别于其他值而作为一个分组,从而不影响算法计算。
优点:简单,实用
缺点:效率低

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
##使用Pandas进行特定值填充
dataset.fillna(0) ##不同字段的缺失值都用0填充
dataset.fillna({'col2':20,'col5':0}) ##不同字段使用不同的填充值
dataset.fillna(dataset.mean()) ##分别使用各字段的平均值填充
dataset.fillna(dataset.median()) ##分别使用个字段的中位数填充

##使用sklearn中的预处理方法进行缺失值填充(只适用于连续型字段)
from sklearn.preprocessing import Imputer
dataset2 = dataset.drop(['col4'],axis=1)
colsets = dataset2.columns
nan_rule1 = Imputer(missing_values='NaN',strategy='mean',axis=0) ##创建填充规则(平均值填充)
pd.DataFrame(nan_rule1.fit_transform(dataset2),columns=colsets) ##应用规则
nan_rule2 = Imputer(missing_values='median',strategy='mean',axis=0) ##创建填充规则(中位数填充)
pd.DataFrame(nan_rule2.fit_transform(dataset2),columns=colsets) ##应用规则
nan_rule3 = Imputer(missing_values='most_frequent',strategy='mean',axis=0) ##创建填充规则(众数填充)
pd.DataFrame(nan_rule3.fit_transform(dataset2),columns=colsets) ##应用规则

Oozie常用命令

Oozie任务重试

1
oozie job -rerun 0000000-191106143754176-oozie-oozi-W  -D oozie.wf.rerun.failnodes=true

检查xml格式

1
xmllint -noout hive-site.xml

查看coordinator状态

1
oozie jobs -jobtype coordinator  -filter status=RUNNING

查看执行错误的workflow.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
[root@dlbdn3 ~]# oozie job -info 0000027-181203143359779-oozie-oozi-C -oozie http://localhost:11000/oozie -order=desc -timezone='Asia/Shanghai' -filter status=KILLED AND FAILED
Job ID : 0000027-181203143359779-oozie-oozi-C
------------------------------------------------------------------------------------------------------------------------------------
Job Name : PG-LANDING-JOB-Coordinator
App Path : hdfs://dlbdn3:8020/user/hue/oozie/deployments/_ericsson_-oozie-13-1543825800.46
Status : KILLED
Start Time : 2018-09-13 00:00 CST
End Time : 2040-09-01 15:15 CST
Pause Time : -
Concurrency : 1
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Err Code Created Nominal Time
0000027-181203143359779-oozie-oozi-C@30 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:25 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@29 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:20 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@28 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:15 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@27 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:10 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@26 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:05 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@25 KILLED - - 2018-12-03 16:39 CST 2018-09-13 02:00 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@24 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:55 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@23 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:50 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@22 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:45 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@21 KILLED - - 2018-12-03 16:39 CST 2018-09-13 01:40 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@20 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:35 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@19 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:30 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@18 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:25 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@17 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:20 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@16 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:15 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@15 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:10 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@14 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:05 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@13 KILLED - - 2018-12-03 16:34 CST 2018-09-13 01:00 CST
------------------------------------------------------------------------------------------------------------------------------------
0000027-181203143359779-oozie-oozi-C@12 KILLED 0000039-181203143359779-oozie-oozi-W - 2018-12-03 16:34 CST 2018-09-13 00:55 CST
------------------------------------------------------------------------------------------------------------------------------------

查看一个具体的workflow信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@dlbdn3 ~]# oozie job --oozie http://localhost:11000/oozie -info 0000039-181203143359779-oozie-oozi-W -localtime
Job ID : 0000039-181203143359779-oozie-oozi-W
------------------------------------------------------------------------------------------------------------------------------------
Workflow Name : PG-LANDING-JOB
App Path : hdfs://dlbdn3:8020/user/hue/oozie/workspaces/hue-oozie-1535681485.06
Status : KILLED
Run : 0
User : ericsson
Group : -
Created : 2018-12-03 16:42 CST
Started : 2018-12-03 16:42 CST
Last Modified : 2018-12-03 16:43 CST
Ended : 2018-12-03 16:43 CST
CoordAction ID: 0000027-181203143359779-oozie-oozi-C@12

Actions
------------------------------------------------------------------------------------------------------------------------------------
ID Status Ext ID Ext Status Err Code
------------------------------------------------------------------------------------------------------------------------------------
0000039-181203143359779-oozie-oozi-W@:start: OK - OK -
------------------------------------------------------------------------------------------------------------------------------------
0000039-181203143359779-oozie-oozi-W@spark-23f2 KILLED job_1543800485319_0062 KILLED -
------------------------------------------------------------------------------------------------------------------------------------

查看当前运行的workflow有哪些

1
2
3
4
5
6
7
8
9
10
11
12
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf  -filter status=RUNNING
No Jobs match your criteria!
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf -filter status=RUNNING
Job ID App Name Status User Group Started Ended
------------------------------------------------------------------------------------------------------------------------------------
0000041-181203143359779-oozie-oozi-W PG-LANDING-JOBRUNNING ericsson - 2018-12-03 08:50 GMT -
------------------------------------------------------------------------------------------------------------------------------------
[root@dlbdn3 ~]# oozie jobs -oozie http://localhost:11000/oozie -jobtype wf -filter status=RUNNING -localtime
Job ID App Name Status User Group Started Ended
------------------------------------------------------------------------------------------------------------------------------------
0000041-181203143359779-oozie-oozi-W PG-LANDING-JOBRUNNING ericsson - 2018-12-03 16:50 CST -
------------------------------------------------------------------------------------------------------------------------------------
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量