Apache Atlas安装数据治理

Atlas概述

Apache Atlas为组织提供开放式元数据管理和治理功能,用以构建其数据资产目录,对这些资产进行分类和管理,并为数据分析师和数据治理团队,提供围绕这些数据资产的协作功能。

Atlas架构原理

alt

Atlas安装及使用

1)Atlas官网地址:https://atlas.apache.org/

2)文档查看地址:https://atlas.apache.org/0.8.4/index.html

3)下载地址:https://www.apache.org/dyn/closer.cgi/atlas/0.8.4/apache-atlas-0.8.4-sources.tar.gz

HDP安装Solr5.2.1

HDP安装Atlas0.8.2

[root@hadoop101 atlas]$ bin/import-hive.sh
Using Hive configuration directory [/opt/module/hive/conf]
Log file for import is /opt/module/atlas/logs/import-hive.log
log4j:WARN No such property [maxFileSize] in org.apache.log4j.PatternLayout.
log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.PatternLayout.

输入用户名:admin;输入密码:admin
Enter username for atlas :- admin
Enter password for atlas :-
Hive Meta Data import was successful!!!

显示所有hive tables

alt

选择数据库中的表可以看到之间的血缘关系图

alt

字段的血缘关系

alt

总结

Apache Atlas为Hadoop集群提供了包括数据分类、集中策略引擎、数据血缘、安全和生命周期管理在内的元数据治理核心能力,其与Apache Falcon,Apache Ranger相互整合可以形成完整的数据治理解决方案。但是Atlas目前还是Apache孵化项目,尚未成熟,有待发展。

Atlas目前还存在以下一些需要改进之处:

缺乏对元数据的全局视图,对元数据的血缘追溯只能够展示具体某张表或某个SQL的生命周期(其前提是用户必须对Hadoop的元数据结构十分清楚,才能够通过Atlas的查询语句去定位自己需要了解的表)

0.8以前的版本,对元数据只能进行只读操作,例如只能展示Hive的表但是不能创建新表

与Hadoop各组件的集成尚待完善,例如Atlas对Hive的元数据变更操作的捕获只支持hive CLI,不支持beeline/JDBC

SQL优化技巧

###
首先,对于MySQL层优化我一般遵从五个原则:

减少数据访问:设置合理的字段类型,启用压缩,通过索引访问等减少磁盘 IO。

返回更少的数据:只返回需要的字段和数据分页处理,减少磁盘 IO 及网络 IO。

减少交互次数:批量 DML 操作,函数存储等减少数据连接次数。

减少服务器 CPU 开销:尽量减少数据库排序操作以及全表查询,减少 CPU 内存占用。

利用更多资源:使用表分区,可以增加并行操作,更大限度利用 CPU 资源。

总结到 SQL 优化中,就如下三点:

最大化利用索引。

尽可能避免全表扫描。

减少无效数据的查询。

理解 SQL 优化原理 ,首先要搞清楚 SQL 执行顺序

1
2
3
4
5
6
7
8
9
10
SELECT 
DISTINCT <select_list>
FROM <left_table>
<join_type> JOIN <right_table>
ON <join_condition>
WHERE <where_condition>
GROUP BY <group_by_list>
HAVING <having_condition>
ORDER BY <order_by_condition>
LIMIT <limit_number>

SELECT 语句,执行顺序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
FROM
<表名> # 选取表,将多个表数据通过笛卡尔积变成一个表。
ON
<筛选条件> # 对笛卡尔积的虚表进行筛选
JOIN <join, left join, right join...>
<join表> # 指定join,用于添加数据到on之后的虚表中,例如left join会将左表的剩余数据添加到虚表中
WHERE
<where条件> # 对上述虚表进行筛选
GROUP BY
<分组条件> # 分组
<SUM()等聚合函数> # 用于having子句进行判断,在书写上这类聚合函数是写在having判断里面的
HAVING
<分组筛选> # 对分组后的结果进行聚合筛选
SELECT
<返回数据列表> # 返回的单列必须在group by子句中,聚合函数除外
DISTINCT
# 数据除重
ORDER BY
<排序条件> # 排序
LIMIT
<行数限制>

避免不走索引的场景

①尽量避免在字段开头模糊查询,会导致数据库引擎放弃索引进行全表扫描

1
SELECT * FROM t WHERE username LIKE '%陈%'

优化方式:尽量在字段后面使用模糊查询。

1
SELECT * FROM t WHERE username LIKE '陈%'

如果需求是要在前面使用模糊查询:

使用 MySQL 内置函数 INSTR(str,substr)来匹配,作用类似于 Java 中的 indexOf(),查询字符串出现的角标位置。

使用 FullText 全文索引,用 match against 检索。

数据量较大的情况,建议引用 ElasticSearch、Solr,亿级数据量检索速度秒级。

当表数据量较少(几千条儿那种),别整花里胡哨的,直接用 like ‘%xx%’。

尽量避免使用 in 和 not in,会导致引擎走全表扫描

1
SELECT * FROM t WHERE id IN (2,3)

优化方式:如果是连续数值,可以用 between 代替

1
SELECT * FROM t WHERE id BETWEEN 2 AND 3

如果是子查询,可以用 exists 代替。

1
2
3
4
-- 不走索引
select * from A where A.id in (select id from B);
-- 走索引
select * from A where exists (select * from B where B.id = A.id);

尽量避免使用 or,会导致数据库引擎放弃索引进行全表扫描

1
SELECT * FROM t WHERE id = 1 OR id = 3

优化方式:可以用 union 代替 or。

1
2
3
SELECT * FROM t WHERE id = 1
UNION
SELECT * FROM t WHERE id = 3

尽量避免进行 null 值的判断,会导致数据库引擎放弃索引进行全表扫描

1
SELECT * FROM t WHERE score IS NULL

优化方式:可以给字段添加默认值 0,对 0 值进行判断。

1
SELECT * FROM t WHERE score = 0

尽量避免在 where 条件中等号的左侧进行表达式、函数操作,会导致数据库引擎放弃索引进行全表扫描

可以将表达式、函数操作移动到等号右侧,如下

1
2
3
4
-- 全表扫描
SELECT * FROM T WHERE score/10 = 9
-- 走索引
SELECT * FROM T WHERE score = 10*9

当数据量大时,避免使用 where 1=1 的条件

通常为了方便拼装查询条件,我们会默认使用该条件,数据库引擎会放弃索引进行全表扫描

1
2
-- 全表扫描
SELECT username, age, sex FROM T WHERE 1=1

优化方式:用代码拼装 SQL 时进行判断,没 where 条件就去掉 where,有 where 条件就加 and。

查询条件不能用 <> 或者 !=

使用索引列作为条件进行查询时,需要避免使用<>或者!=等判断条件。

如确实业务需要,使用到不等于符号,需要在重新评估索引建立,避免在此字段上建立索引,改由查询条件中其他索引字段代替。

where 条件仅包含复合索引非前置列

如下:复合(联合)索引包含 key_part1,key_part2,key_part3 三列,但 SQL 语句没有包含索引前置列”key_part1”,按照 MySQL 联合索引的最左匹配原则,不会走联合索引

1
select col1 from table where key_part2=1 and key_part3=2

隐式类型转换造成不使用索引

如下 SQL 语句由于索引对列类型为 varchar,但给定的值为数值,涉及隐式类型转换,造成不能正确走索引。

1
select col1 from table where col_varchar=123;

order by 条件要与 where 中条件一致,否则 order by 不会利用索引进行排序

1
2
3
4
5
-- 不走age索引
SELECT * FROM t order by age;

-- 走age索引
SELECT * FROM t where age > 0 order by age;

对于上面的语句,数据库的处理顺序是:

第一步:根据 where 条件和统计信息生成执行计划,得到数据。

第二步:将得到的数据排序。当执行处理数据(order by)时,数据库会先查看第一步的执行计划,看 order by 的字段是否在执行计划中利用了索引。如果是,则可以利用索引顺序而直接取得已经排好序的数据。如果不是,则重新进行排序操作。

第三步:返回排序后的数据。

当 order by 中的字段出现在 where 条件中时,才会利用索引而不再二次排序,更准确的说,order by 中的字段在执行计划中利用了索引时,不用排序操作。

这个结论不仅对 order by 有效,对其他需要排序的操作也有效。比如 group by 、union 、distinct 等。

正确使用 hint 优化语句

MySQL 中可以使用 hint 指定优化器在执行时选择或忽略特定的索引。

一般而言,处于版本变更带来的表结构索引变化,更建议避免使用 hint,而是通过 Analyze table 多收集统计信息。

但在特定场合下,指定 hint 可以排除其他索引干扰而指定更优的执行计划:

USE INDEX 在你查询语句中表名的后面,添加 USE INDEX 来提供希望 MySQL 去参考的索引列表,就可以让 MySQL 不再考虑其他可用的索引。

例子: SELECT col1 FROM table USE INDEX (mod_time, name)…

IGNORE INDEX 如果只是单纯的想让 MySQL 忽略一个或者多个索引,可以使用 IGNORE INDEX 作为 Hint。

例子: SELECT col1 FROM table IGNORE INDEX (priority) …

FORCE INDEX 为强制 MySQL 使用一个特定的索引,可在查询中使用FORCE INDEX 作为 Hint。

例子: SELECT col1 FROM table FORCE INDEX (mod_time) …

在查询的时候,数据库系统会自动分析查询语句,并选择一个最合适的索引。但是很多时候,数据库系统的查询优化器并不一定总是能使用最优索引。

如果我们知道如何选择索引,可以使用 FORCE INDEX 强制查询使用指定的索引。

1
SELECT * FROM students FORCE INDEX (idx_class_id) WHERE class_id = 1 ORDER BY id DESC;

SELECT 语句其他优化

①避免出现 select *

首先,select * 操作在任何类型数据库中都不是一个好的 SQL 编写习惯。

使用 select * 取出全部列,会让优化器无法完成索引覆盖扫描这类优化,会影响优化器对执行计划的选择,也会增加网络带宽消耗,更会带来额外的 I/O,内存和 CPU 消耗。

建议提出业务实际需要的列数,将指定列名以取代 select *。具体详情见《为什么大家都说SELECT * 效率低》

②避免出现不确定结果的函数

特定针对主从复制这类业务场景。由于原理上从库复制的是主库执行的语句,使用如 now()、rand()、sysdate()、current_user() 等不确定结果的函数很容易导致主库与从库相应的数据不一致。

另外不确定值的函数,产生的 SQL 语句无法利用 query cache。

③多表关联查询时,小表在前,大表在后

在 MySQL 中,执行 from 后的表关联查询是从左往右执行的(Oracle 相反),第一张表会涉及到全表扫描。

所以将小表放在前面,先扫小表,扫描快效率较高,在扫描后面的大表,或许只扫描大表的前 100 行就符合返回条件并 return 了。

例如:表 1 有 50 条数据,表 2 有 30 亿条数据;如果全表扫描表 2,你品,那就先去吃个饭再说吧是吧。

④使用表的别名

当在 SQL 语句中连接多个表时,请使用表的别名并把别名前缀于每个列名上。这样就可以减少解析的时间并减少哪些友列名歧义引起的语法错误。

⑤用 where 字句替换 HAVING 字句

避免使用 HAVING 字句,因为 HAVING 只会在检索出所有记录之后才对结果集进行过滤,而 where 则是在聚合前刷选记录,如果能通过 where 字句限制记录的数目,那就能减少这方面的开销。

HAVING 中的条件一般用于聚合函数的过滤,除此之外,应该将条件写在 where 字句中。

where 和 having 的区别:where 后面不能使用组函数。

⑥调整 Where 字句中的连接顺序

MySQL 采用从左往右,自上而下的顺序解析 where 子句。根据这个原理,应将过滤数据多的条件往前放,最快速度缩小结果集。

增删改 DML 语句优化

①大批量插入数据
如果同时执行大量的插入,建议使用多个值的 INSERT 语句(方法二)。这比使用分开 INSERT 语句快(方法一),一般情况下批量插入效率有几倍的差别。
方法一:

1
2
3
4
5
insert into T values(1,2); 

insert into T values(1,3);

insert into T values(1,4);

方法二:

1
Insert into T values(1,2),(1,3),(1,4);

选择后一种方法的原因有三:
减少 SQL 语句解析的操作,MySQL 没有类似 Oracle 的 share pool,采用方法二,只需要解析一次就能进行数据的插入操作。

在特定场景可以减少对 DB 连接次数。

SQL 语句较短,可以减少网络传输的 IO。

②适当使用 commit
适当使用 commit 可以释放事务占用的资源而减少消耗,commit 后能释放的资源如下:

事务占用的 undo 数据块。

事务在 redo log 中记录的数据块。

释放事务施加的,减少锁争用影响性能。特别是在需要使用 delete 删除大量数据的时候,必须分解删除量并定期 commit。

③避免重复查询更新的数据
针对业务中经常出现的更新行同时又希望获得改行信息的需求,MySQL 并不支持 PostgreSQL 那样的 UPDATE RETURNING 语法,在 MySQL 中可以通过变量实现。

例如,更新一行记录的时间戳,同时希望查询当前记录中存放的时间戳是什么?
简单方法实现:

1
2
3
Update t1 set time=now() where col1=1; 

Select time from t1 where id =1;
1
2
3
Update t1 set time=now () where col1=1 and @now: = now (); 

Select @now;

前后二者都需要两次网络来回,但使用变量避免了再次访问数据表,特别是当 t1 表数据量较大时,后者比前者快很多。

④查询优先还是更新(insert、update、delete)优先
MySQL 还允许改变语句调度的优先级,它可以使来自多个客户端的查询更好地协作,这样单个客户端就不会由于锁定而等待很长时间。改变优先级还可以确保特定类型的查询被处理得更快

我们首先应该确定应用的类型,判断应用是以查询为主还是以更新为主的,是确保查询效率还是确保更新的效率,决定是查询优先还是更新优先

下面我们提到的改变调度策略的方法主要是针对只存在表锁的存储引擎,比如 MyISAM 、MEMROY、MERGE,对于 Innodb 存储引擎,语句的执行是由获得行锁的顺序决定的。

MySQL 的默认的调度策略可用总结如下:
写入操作优先于读取操作。

对某张数据表的写入操作某一时刻只能发生一次,写入请求按照它们到达的次序来处理。

对某张数据表的多个读取操作可以同时地进行。

MySQL 提供了几个语句调节符,允许你修改它的调度策略:
LOW_PRIORITY 关键字应用于 DELETE、INSERT、LOAD DATA、REPLACE 和 UPDATE。

HIGH_PRIORITY 关键字应用于 SELECT 和 INSERT 语句。

DELAYED 关键字应用于 INSERT 和 REPLACE 语句

如果写入操作是一个 LOW_PRIORITY(低优先级)请求,那么系统就不会认为它的优先级高于读取操作。

在这种情况下,如果写入者在等待的时候,第二个读取者到达了,那么就允许第二个读取者插到写入者之前。

只有在没有其它的读取者的时候,才允许写入者开始操作。这种调度修改可能存在 LOW_PRIORITY 写入操作永远被阻塞的情况。

SELECT 查询的 HIGH_PRIORITY(高优先级)关键字也类似。它允许 SELECT 插入正在等待的写入操作之前,即使在正常情况下写入操作的优先级更高。

另外一种影响是,高优先级的 SELECT 在正常的 SELECT 语句之前执行,因为这些语句会被写入操作阻塞。

如果希望所有支持 LOW_PRIORITY 选项的语句都默认地按照低优先级来处理,那么请使用–low-priority-updates 选项来启动服务器。

通过使用 INSERTHIGH_PRIORITY 来把 INSERT 语句提高到正常的写入优先级,可以消除该选项对单个 INSERT 语句的影响。

查询条件优化

①对于复杂的查询,可以使用中间临时表暂存数据

②优化 group by 语句

默认情况下,MySQL 会对 GROUP BY 分组的所有值进行排序,如 “GROUP BY col1,col2,….;” 查询的方法如同在查询中指定 “ORDER BY col1,col2,…;” 。

如果显式包括一个包含相同的列的 ORDER BY 子句,MySQL 可以毫不减速地对它进行优化,尽管仍然进行排序。

因此,如果查询包括 GROUP BY 但你并不想对分组的值进行排序,你可以指定 ORDER BY NULL 禁止排序。

例如:

1
SELECT col1, col2, COUNT(*) FROM table GROUP BY col1, col2 ORDER BY NULL ;

③优化 join 语句
MySQL 中可以通过子查询来使用 SELECT 语句来创建一个单列的查询结果,然后把这个结果作为过滤条件用在另一个查询中。

使用子查询可以一次性的完成很多逻辑上需要多个步骤才能完成的 SQL 操作,同时也可以避免事务或者表锁死,并且写起来也很容易。但是,有些情况下,子查询可以被更有效率的连接(JOIN)..替代。

例子:假设要将所有没有订单记录的用户取出来,可以用下面这个查询完成:

1
SELECT col1 FROM customerinfo WHERE CustomerID NOT in (SELECT CustomerID FROM salesinfo )

如果使用连接(JOIN)..来完成这个查询工作,速度将会有所提升。

尤其是当 salesinfo 表中对 CustomerID 建有索引的话,性能将会更好,查询如下:

1
2
3
SELECT col1 FROM customerinfo 
LEFT JOIN salesinfoON customerinfo.CustomerID=salesinfo.CustomerID
WHERE salesinfo.CustomerID IS NULL

连接(JOIN)..之所以更有效率一些,是因为 MySQL 不需要在内存中创建临时表来完成这个逻辑上的需要两个步骤的查询工作。

④优化 union 查询

MySQL 通过创建并填充临时表的方式来执行 union 查询。除非确实要消除重复的行,否则建议使用 union all。

原因在于如果没有 all 这个关键词,MySQL 会给临时表加上 distinct 选项,这会导致对整个临时表的数据做唯一性校验,这样做的消耗相当高。

高效:

1
2
3
4
5
SELECT COL1, COL2, COL3 FROM TABLE WHERE COL1 = 10 

UNION ALL

SELECT COL1, COL2, COL3 FROM TABLE WHERE COL3= 'TEST';

低效:

1
2
3
4
5
SELECT COL1, COL2, COL3 FROM TABLE WHERE COL1 = 10 

UNION

SELECT COL1, COL2, COL3 FROM TABLE WHERE COL3= 'TEST';

⑤拆分复杂 SQL 为多个小 SQL,避免大事务
如下:
简单的 SQL 容易使用到 MySQL 的 QUERY CACHE。

减少锁表时间特别是使用 MyISAM 存储引擎的表。

可以使用多核 CPU。

⑥使用 truncate 代替 delete
当删除全表中记录时,使用 delete 语句的操作会被记录到 undo 块中,删除记录也记录 binlog。

当确认需要删除全表时,会产生很大量的 binlog 并占用大量的 undo 数据块,此时既没有很好的效率也占用了大量的资源.

使用 truncate 替代,不会记录可恢复的信息,数据不能被恢复。也因此使用 truncate 操作有其极少的资源占用与极快的时间。另外,使用 truncate 可以回收表的水位,使自增字段值归零。

⑦使用合理的分页方式以提高分页效率

使用合理的分页方式以提高分页效率 针对展现等分页需求,合适的分页方式能够提高分页的效率。
案例 1:

1
2
select * from t where thread_id = 10000 and deleted = 0 
order by gmt_create asc limit 0, 15;

上述例子通过一次性根据过滤条件取出所有字段进行排序返回。数据访问开销=索引 IO+索引全部记录结果对应的表数据 IO。

因此,该种写法越翻到后面执行效率越差,时间越长,尤其表数据量很大的时候。

适用场景:当中间结果集很小(10000 行以下)或者查询条件复杂(指涉及多个不同查询字段或者多表连接)时适用。

案例 2:

1
2
3
select t.* from (select id from t where thread_id = 10000 and deleted = 0
order by gmt_create asc limit 0, 15) a, t
where a.id = t.id;

上述例子必须满足 t 表主键是 id 列,且有覆盖索引 secondary key:(thread_id, deleted, gmt_create)。

通过先根据过滤条件利用覆盖索引取出主键 id 进行排序,再进行 join 操作取出其他字段。

数据访问开销=索引 IO+索引分页后结果(例子中是 15 行)对应的表数据 IO。因此,该写法每次翻页消耗的资源和时间都基本相同,就像翻第一页一样。

适用场景:当查询和排序字段(即 where 子句和 order by 子句涉及的字段)有对应覆盖索引时,且中间结果集很大的情况时适用。

建表优化

①在表中建立索引,优先考虑 where、order by 使用到的字段。

②尽量使用数字型字段(如性别,男:1 女:2),若只含数值信息的字段尽量不要设计为字符型,这会降低查询和连接的性能,并会增加存储开销。

这是因为引擎在处理查询和连接时会 逐个比较字符串中每一个字符,而对于数字型而言只需要比较一次就够了。

③查询数据量大的表 会造成查询缓慢。主要的原因是扫描行数过多。这个时候可以通过程序,分段分页进行查询,循环遍历,将结果合并处理进行展示。

要查询 100000 到 100050 的数据,如下:

1
2
SELECT * FROM (SELECT ROW_NUMBER() OVER(ORDER BY ID ASC) AS rowid,* 
FROM infoTab)t WHERE t.rowid > 100000 AND t.rowid <= 100050

④用 varchar/nvarchar 代替 char/nchar。

尽可能的使用 varchar/nvarchar 代替 char/nchar ,因为首先变长字段存储空间小,可以节省存储空间,其次对于查询来说,在一个相对较小的字段内搜索效率显然要高些。

不要以为 NULL 不需要空间,比如:char(100) 型,在字段建立时,空间就固定了, 不管是否插入值(NULL 也包含在内),都是占用 100 个字符的空间的,如果是 varchar 这样的变长字段, null 不占用空间。

Datax3.0简介

Datex3.0概览

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
(这是一个单机多任务的ETL工具)

alt

设计理念

为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

当前使用现状

此前已经开源DataX1.0版本,此次介绍为阿里云开源全新版本DataX3.0,有了更多更强大的功能和更好的使用体验。Github主页地址:https://github.com/alibaba/DataX

DataX3.0框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

alt
Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX3.0插件体系

DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图

alt

DataX3.0核心架构

DataX 3.0 开源版本支持单机多线程模式完成同步作业运行,本小节按一个DataX作业生命周期的时序图,从整体架构设计非常简要说明DataX各个模块相互关系。

alt

核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。

切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。

每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。

DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程:

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:

DataXJob根据分库分表切分成了100个Task。

根据20个并发,DataX计算共需要分配4个TaskGroup。

4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

五、DataX 3.0六大核心优势

可靠的数据质量监控

完美解决数据传输个别类型失真问题
DataX旧版对于部分数据类型(比如时间戳)传输一直存在毫秒阶段等数据失真情况,新版本DataX3.0已经做到支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,让数据可以完整无损的传输到目的端。

提供作业全链路的流量、数据量运行时监控
DataX3.0运行过程中可以将作业本身状态、数据流量、数据速度、执行进度等信息进行全面的展示,让用户可以实时了解作业状态。并可在作业执行过程中智能判断源端和目的端的速度对比情况,给予用户更多性能排查信息。

提供脏数据探测
在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。DataX目前可以实现脏数据精确过滤、识别、采集、展示,为用户提供多种的脏数据处理模式,让用户准确把控数据质量大关!

丰富的数据转换功能

DataX作为一个服务于大数据的ETL工具,除了提供数据快照搬迁功能之外,还提供了丰富数据转换的功能,让数据在传输过程中可以轻松完成数据脱敏,补全,过滤等数据转换功能,另外还提供了自动groovy函数,让用户自定义转换函数。详情请看DataX3的transformer详细介绍。

精准的速度控制

还在为同步过程对在线存储压力影响而担心吗?新版本DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

1
2
3
4
"speed": {
"channel": 8, ----并发数限速(根据自己CPU合理控制并发数)
"byte": 524288, ----字节流限速(根据自己的磁盘和网络合理控制字节数)
"record": 10000 ----记录流限速(根据数据合理空行数)

强劲的同步性能

DataX3.0每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个Task并行执行,单机多线程执行模型可以让DataX速度随并发成线性增长。在源端和目的端性能都足够的情况下,单个作业一定可以打满网卡。另外,DataX团队对所有的已经接入的插件都做了极致的性能优化,并且做了完整的性能测试

健壮的容错机制

DataX作业是极易受外部因素的干扰,网络闪断、数据源不稳定等因素很容易让同步到一半的作业报错停止。因此稳定性是DataX的基本要求,在DataX 3.0的设计中,重点完善了框架和插件的稳定性。目前DataX3.0可以做到线程级别、进程级别(暂时未开放)、作业级别多层次局部/全局的重试,保证用户的作业稳定运行。
线程内部重试

DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

线程级别重试

目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

作者:香山上的麻雀
链接:https://www.jianshu.com/p/f5f0dc99d5ab
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Flink与Spark多方面区别对比

本文转载于
场景描述:Flink是标准的实时处理引擎,而且Spark的两个模块Spark Streaming和Structured Streaming都是基于微批处理的,不过现在Spark Streaming已经非常稳定基本都没有更新了,然后重点移到spark sql和structured Streaming了

Flink和Spark的区别在编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面存在不同。

维表join和异步IO

Structured Streaming不直接支持与维表的join操作,但是可以使用map、flatmap及udf等来实现该功能,所有的这些都是同步算子,不支持异步IO操作。但是Structured Streaming直接与静态数据集的join,可以也可以帮助实现维表的join功能,当然维表要不可变。

Flink支持与维表进行join操作,除了map,flatmap这些算子之外,flink还有异步IO算子,可以用来实现维表,提升性能。

状态管理

状态维护应该是流处理非常核心的概念了,比如join,分组,聚合等操作都需要维护历史状态。那么flink在这方面很好,structured Streaming也是可以,但是spark Streaming就比较弱了,只有个别状态维护算子upstatebykye等,大部分状态需要用户自己维护,虽然这个对用户来说有更大的可操作性和可以更精细控制但是带来了编程的麻烦。flink和Structured Streaming都支持自己完成了join及聚合的状态维护。

Structured Streaming有高级的算子,用户可以完成自定义的mapGroupsWithState和flatMapGroupsWithState,可以理解类似Spark Streaming 的upstatebykey等状态算子。

就拿mapGroupsWithState为例:
由于Flink与Structured Streaming的架构的不同,task是常驻运行的,flink不需要状态算子,只需要状态类型的数据结构。

首先看一下Keyed State下,我们可以用哪些原子状态:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用

reduceFunction,最后合并到一个单一的状态值。

FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。

MapState:即状态值为一个map。用户通过put或putAll方法添加元素。

Join操作

Flink的join操作

flink的join操作没有大的限制,支持种类丰富,比如:

Inner Equi-join

1
SELECT * FROM Orders INNER JOIN Product ONOrders.productId = Product.id

Outer Equi-join

1
2
3
4
5
SELECT * FROM Orders LEFT JOIN Product ON Orders.productId =Product.id

SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId =Product.id

SELECT * FROM Orders FULL OUTER JOIN Product ONOrders.productId = Product.id

Time-windowed Join

1
SELECT * FROM Oderso,Shipmentss WHEREo.id=s.orderIdAND o.ordertimeBETWEENs.shiptime INTERVAL'4'HOURANDs.shiptime

Expanding arrays into a relation

1
SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Join with Table Function

1
2
3
4
5
6
7
8
9
10
11
Inner Join

A row of the left (outer) table is dropped, if its table function call returns an empty result.
SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag

Left Outer Join
If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.

SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE

Join with Temporal Table

1
2
3
4
5
6
7
8

SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency

Structured Streaming的join操作
Structured Streaming的join限制颇多了,限于篇幅问题在这里只讲一下join的限制
alt

容错机制及一致性语义

Spark Streaming 保证仅一次处理

对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。

对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。

由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:
repartition(1) Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:

1
2
3
4
5
6
7

Dstream.foreachRDD(rdd=>{
rdd.repartition(1).foreachPartition(partition=>{ // 开启事务
partition.foreach(each=>{// 提交数据
}) // 提交事务
})
})

将结果和 offset 一起提交

也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。

Flink 与 kafka 0.11 保证仅一次处理

若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。

在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。

本例中的 Flink 应用如图所示包含以下组件:

一个source,从Kafka中读取数据(即KafkaConsumer)

一个时间窗口化的聚会操作

一个sink,将结果写回到Kafka(即KafkaProducer)

alt
下面详细讲解 flink 的两段提交思路

alt

Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。

这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。

当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交

alt

当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图
alt
当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。

本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:

alt
以上就是 flink 实现恰一次处理的基本逻辑。

背压

消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。

Spark Streaming 的背压
Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset

PIDRateEsimator 的 compute 方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def compute(       
time: Long,
// in milliseconds
numElements: Long,
processingDelay: Long,
// in milliseconds schedulingDelay: Long
// in milliseconds
): Option[Double] = {logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {if (time > latestTime && numElements > 0 && processingDelay > 0) {
val delaySinceUpdate = (time - latestTime).toDouble / 1000
val processingRate = numElements.toDouble / processingDelay * 1000
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate/ batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate)
logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin)
latestTime = time
if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None }
else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } }
else { logTrace("Rate estimation skipped") None } } }
}
}
}

}

Flink 的背压
与 Spark Streaming 的背压不同的是,Flink 背压是 jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 16 所示:

alt
阻塞占比在 web 上划分了三个等级:

OK: 0 <= Ratio <= 0.10,表示状态良好;
LOW: 0.10 < Ratio <= 0.5,表示有待观察;
HIGH: 0.5 < Ratio <= 1,表示要处理了。

表管理

flink和structured streaming都可以讲流注册成一张表,然后使用sql进行分析,不过两者之间区别还是有些的。
Structured Streaming将流注册成临时表,然后用sql进行查询,操作也是很简单跟静态的dataset/dataframe一样。

1
2
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")

其实,此处回想Spark Streaming 如何注册临时表呢?在foreachRDD里,讲rdd转换为dataset/dataframe,然后将其注册成临时表,该临时表特点是代表当前批次的数据,而不是全量数据。Structured Streaming注册的临时表就是流表,针对整个实时流的。Sparksession.sql执行结束后,返回的是一个流dataset/dataframe,当然这个很像spark sql的sql文本执行,所以为了区别一个dataframe/dataset是否是流式数据,可以df.isStreaming来判断。

当然,flink也支持直接注册流表,然后写sql分析,sql文本在flink中使用有两种形式:

1
2
3
4
5

1). tableEnv.sqlQuery("SELECT product,amount FROM Orders WHERE product LIKE '%Rubber%'")

2). tableEnv.sqlUpdate(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHEREproduct LIKE '%Rubber%'");

对于第一种形式,sqlQuery执行结束之后会返回一张表也即是Table对象,然后可以进行后续操作或者直接输出,如:result.writeAsCsv(“”);。
而sqlUpdate是直接将结果输出到了tablesink,所以要首先注册tablesink,方式如下:

1
2
3
4
5
6
7
TableSink csvSink = newCsvTableSink("/path/to/file", ...);

String[] fieldNames = {"product","amount"};

TypeInformation[] fieldTypes ={Types.STRING, Types.INT};

tableEnv.registerTableSink("RubberOrders",fieldNames, fieldTypes, csvSink);

flink注册表的形式比较多,直接用数据源注册表,如:

1
2
tableEnv.registerExternalCatalog();
tableEnv.registerTableSource();

也可以从datastream转换成表,如:

1
2
tableEnv.registerDataStream("Orders",ds, "user, product, amount");
Table table = tableEnv.fromDataStream(ds,"user, product, amount");

Flink端到端状态一致性EXACTLY_ONCE实现

状态一致性:

alt
有状态的流处理,内部每个算子任务都可以有自己的状态;

对于流处理器内部(没有接入sink)来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确;

一条数据不应该丢失,也不应该重复计算;

在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正常的

状态一致性分类:

AT_MOST_ONCE(最多一次),当任务故障时最简单做法是什么都不干,既不恢复丢失状态,也不重播丢失数据。At-most-once语义的含义是最多处理一次事件。

AT_LEAST_ONCE(至少一次),在大多数真实应用场景,我们希望不丢失数据。这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次。

EXACTLY_ONCE(精确一次),恰好处理一次是最严格的的保证,也是最难实现的。恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次。

一致性检查点(Checkpoints)

Flink使用了一种轻量级快照机制 — 检查点(Checkpoint)来保证exactly-one语义;

有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

应用状态的一致性检查点,是Flink故障恢复机制的核心。

alt

端到端(end-to-end)状态一致性

目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如kafka)和输出到持久化系统;

端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终,每个组件都保证了它自己的一致性;

整个端到端的一致性级别取决于所有组件中一致性最弱的组件;

端到端exactly-once

内部保证 — checkpoint

source端 — 可重设数据的读取位置;可重新读取偏移量

sink端 – 从故障恢复时,数据不会重复写入外部系统:幂等写入和事务写入;

幂等写入(Idempotent Writes):

幂等操作即一个操作可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了;

alt

它的原理不是不重复写入而是重复写完之后结果还是一样;它的瑕疵是不能做到完全意义上exactly-once(在故障恢复时,突然把外部系统写入操作跳到之前的某个状态然后继续往里边写,故障之前发生的那一段的状态变化又重演了直到最后发生故障那一刻追上就正常了;假如中间这段又被读取了就可能会有些问题);

事务写入(Transactional Writes):

事务Transactional Writes

应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤销;
具有原子性,一个事务中的一系列的操作要么全部成功,要么一个都不做。
实现思想:构建的事务对应着checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中。

实现方式:预习日志和两阶段提交

预习日志(Write-Ahead-Log,WAL)

把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统;

简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定;

DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink;

瑕疵:
A. sink系统没说它支持事务,有可能出现一部分写进去了一部分没写进去(如果算失败,再写一次就写了两次了);

B. checkpoint做完了sink才去真正写入(但其实得等sink都写完checkpoint才能生效,所以WAL这个机制jobmanager确定它写完还不算真正写完,还得有一个外部系统已经确认完成的checkpoint)

两阶段提交(Two–Phase–Commit,2PC)– 真正能够实现exactly-once

对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里;

然后将这些数据写入外部sink系统,但不提交他们 – 这时只是预提交;

当它收到checkpoint完成时的通知,它才正式提交事务,实现结果的真正写入;

这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统,Flink提供了TwoPhaseCommitSinkFunction接口

2PC对外部sink的要求

外部sink系统必须事务支持,或者sink任务必须能够模拟外部系统上的事务;

在checkpoint的间隔期间里,必须能够开启一个事务,并接受数据写入;

在收到checkpoint完成的通知之前,事务必须是“等待提交”的状态,在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失;

sink任务必须能够在进程失败后恢复事务;

提交事务必须是幂等操作;

不同Source和sink的一致性保证:

alt

Flink+kafka端到端状态一致性的保证
Flink和kafka天生就是一对,用kafka做为source,用kafka做完sink <===> 实现端到端的一致性

内部 – 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性;

source – kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性;

sink – kafka producer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction

alt

alt

alt

默认是AT_LEAST_ONCE

Exactly-once两阶段提交

alt
JobManager协调各个TaskManager进行checkpoint存储;

checkpoint保存在StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存;

alt
当checkpoint启动时,JobManager会将检查点分界线(barrier)注入数据流;

barrier会在算子间传递下去;

alt

每个算子会对当前的状态做个快照,保存到状态后端;

checkpoint机制可以保证内部的状态一致性;

alt
每个内部的transform任务遇到barrier时,都会把状态存到checkpoint里;

sink任务首先把数据写入外部kafka,这些数据都属于预提交的事务;遇到barrier时,把状态保存到状态后端,并开启新的预提交事务(以barrier为界之前的数据属于上一个事务,之后的数据属于下一个新的事务);

alt
当所有算子任务的快照完成,也就是这次的checkpoint完成时,JobManager会向所有任务发通知,确认这次checkpoint完成;

sink任务收到确认通知,正式提交之前的事务,kafka中未确认数据改完“已确认”;

Exactly-once两阶段提交步骤:

第一条数据来在之后,开启一个kafka的事务(transaction),正常写入kafka分区日志但标记为未提交,这就是“预提交”;

JobManagere触发checkpoint操作,barrier从source开始向下传递,遇到barrier的算子将状态存入状态后端,并通知JobManagere;

sink连接器接收到barrier,保存当前状态,存入checkpoint,通知JobManager并开启下一阶段的事务,用于提交下个检查点的数据;

JobManager收到所有任务的通知,发出确认信息,表示checkpoint完成;

sink任务收到JobManager的确认信息,正式提交这段时间的数据;

外部kafka关闭事务,提交的数据可以正常消费了。

在代码中真正实现flink和kafak的端到端exactly-once语义:

alt

alt

A. 这里需要配置下,因为它默认的是AT_LEAST_ONCE;

B. 对于外部kafka读取的消费者的隔离级别,默认是read_on_commited,如果默认是可以读未提交的数据,就相当于整个一致性还没得到保证(未提交的数据没有最终确认那边就可以读了,相当于那边已经消费数据了,事务就是假的了) 所以需要修改kafka的隔离级别;

C. timeout超时问题,flink和kafka 默认sink是超时1h,而kafak集群中配置的tranctraction事务的默认超时时间是15min,flink-kafak这边的连接器的时间长,这边还在等着做操作 ,kafak那边等checkpoint等的时间太长直接关闭了。所以两边的超时时间最起码前边要比后边的小

Flink使用assignAscendingTimestamps 生成水印的三个重载方法

先简单介绍一下Timestamp 和Watermark 的概念:

  1. Timestamp和Watermark都是基于事件的时间字段生成的
  2. Timestamp和Watermark是两个不同的东西,并且一旦生成都跟事件数据没有关系了(所有即使事件中不再包含生成Timestamp和Watermark的字段也没关系)
  3. 事件数据和 Timestamp 一一对应(事件在流中传递以StreamRecord对象表示,value 和 timestamp 是它的两个成员变量)
  4. Watermark 在生成之后与事件数据没有直接关系,Watermark 作为一个消息,和事件数据一样在流中传递(Watermark 和StreamRecord 具有相同的父类:StreamElement)
  5. Timestamp 与 Watermark 在生成之后,会在下游window算子中做比较,判断事件数据是否是过期数据
  6. 只有window算子才会用Watermark判断事件数据是否过期

assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]

此方法是数据流的快捷方式,其中已知元素时间戳在每个并行流中单调递增。在这种情况下,系统可以通过跟踪上升时间戳自动且完美地生成水印。

1
2
3
4
5
6
7
8
9
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// flink auto create timestamp & watermark
.assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

注:这种方法创建时间戳与水印最简单,返回一个long类型的数字就可以了

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

基于给定的水印生成器生成水印,即使没有新元素到达也会定期检查给定水印生成器的新水印,以指定允许延迟时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks periodically(定期生成水印)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
override def extractTimestamp(element: LateDataEvent): Long = {
println("want watermark : " + sdf.parse(element.createTime).getTime)
sdf.parse(element.createTime).getTime
}
})

assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]

此方法仅基于流元素创建水印,对于通过[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]处理的每个元素,
调用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,如果返回的水印值大于以前的水印,会发出新的水印,
此方法可以完全控制水印的生成,但是要注意,每秒生成数百个水印会影响性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val input = env.addSource(source)
.map(json => {
val id = json.get("id").asText()
val createTime = json.get("createTime").asText()
val amt = json.get("amt").asText()
LateDataEvent("key", id, createTime, amt)
})
// assign timestamp & watermarks every event
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
// check extractTimestamp emitted watermark is non-null and large than previously
override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp)
}
// generate next watermark
override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
val eventTime = sdf.parse(element.createTime).getTime
eventTime
}
})

转载于

Flink使用SQL操作几种类型的window

Flink SQL 支持三种窗口类型, 分别为 Tumble Windows / HOP Windows 和 Session Windows. 其中 HOP windows 对应 Table API 中的 Sliding Window, 同时每种窗口分别有相应的使用场景和方法

Tumble Window(翻转窗口)
Hop Window(滑动窗口)
Session Window(会话窗口)

HOPWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class HOPWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// HOP(time_attr, interval1, interval2)
// interval1 滑动长度
// interval2 窗口长度
Table result = tEnv.sqlQuery("SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start," +
"HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}

}

TumbleWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class TumbleWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

Table result = tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
"TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}


}

SessionWindowExample

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package sql.window;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;
import java.util.Arrays;

public class SessionWindowExample {

public static void main(String[] args) throws Exception {

// 获取 environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定系统时间概念为 event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


// 初始数据
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),

/* Start Session */
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),

/* Start Session */
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)
));

// 指定时间戳
SingleOutputStreamOperator<Tuple3<Long, String, Integer>> logWithTime = log.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, String, Integer>>() {

@Override
public long extractAscendingTimestamp(Tuple3<Long, String, Integer> element) {
return element.f0;
}
});

// 转换为 Table
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");

// SESSION(time_attr, interval)
// interval 表示两条数据触发session的最大间隔
Table result = tEnv.sqlQuery("SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start," +
"SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)");

TypeInformation<Tuple3<Timestamp,Timestamp,Integer>> tpinf = new TypeHint<Tuple3<Timestamp,Timestamp,Integer>>(){}.getTypeInfo();
tEnv.toAppendStream(result, tpinf).print();

env.execute();
}
}
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量