Spark FastJson 解析SDK上报日期

日志格式为

1
{"CommonInfo":{"env":"iOS","productId":"JiaTuiAPP","userInfo":{"userId":"561239948443779072"},"systemInfo":{"system":"12.2","platform":"iOS","model":"iPhone 5s","pixelRatio":2,"brand":"iPhone","screenWidth":320,"screenHeight":568,"version":"1.0"}},"eventObject":{"eventId":"2048"}}

解析Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.bigdata.JsonFormat
import com.alibaba.fastjson.JSON
object Test {

def main(args: Array[String]): Unit = {
// println("test")


val jsonStr = """{"CommonInfo":{"env":"iOS","productId":"JiaTuiAPP","userInfo":{"userId":"561239948443779072"},"systemInfo":{"system":"12.2","platform":"iOS","model":"iPhone 5s","pixelRatio":2,"brand":"iPhone","screenWidth":320,"screenHeight":568,"version":"1.0"}},"eventObject":{"eventId":"2048"}}"""

val jsonRoot = JSON.parseObject(jsonStr)
val CommonInfo = JSON.parseObject(jsonRoot.get("CommonInfo") + "")
val userInfo = JSON.parseObject(CommonInfo.get("userInfo") + "")
val systemInfo = JSON.parseObject(CommonInfo.get("systemInfo") + "")
val eventObject=JSON.parseObject(jsonRoot.get("eventObject") + "")

val env=CommonInfo.get("env")
val productId=CommonInfo.get("productId")
val userId=userInfo.get("userId")
val system=systemInfo.get("system")
val platform=systemInfo.get("platform")
val model=systemInfo.get("model")
val pixelRatio=systemInfo.get("pixelRatio")
val brand=systemInfo.get("brand")
val screenWidth=systemInfo.get("screenWidth")
val screenHeight=systemInfo.get("screenHeight")
val version=systemInfo.get("version")
val eventId=eventObject.get("eventId")
println(env)
println(productId)
println(userId)
println(system)
println(platform)
println(model)
println(pixelRatio)
println(screenWidth)
println(screenHeight)
println(version)
println(eventId)

}
}

Spark Yarn 的提交二种方式

一、前述

Spark可以和Yarn整合,将Application提交到Yarn上运行,和StandAlone提交模式一样,Yarn也有两种提交任务的方式。

具体

1、yarn-client提交任务方式

1
2
3
./spark-submit --master yarn  --class org.apache.spark.examples.SparkPi  ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn-lient --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

alt

执行原理图解

alt

执行流程
1.客户端提交一个Application,在客户端启动一个Driver进程。
2.Driver进程会向RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
3.RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker节点。
4.AM启动后,会向RS请求一批container资源,用于启动Executor.
5.RS会找到一批NM返回给AM,用于启动Executor。
6.AM会向NM发送命令启动Executor。
7.Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。

总结
1、Yarn-client模式同样是适用于测试,因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加.

    2、 ApplicationMaster的作用:

             为当前的Application申请资源

             给NodeManager发送消息启动Executor。

注意:ApplicationMaster有launchExecutor和申请资源的功能,并没有作业调度的功能。

2、yarn-cluster提交任务方式

提交命令

1
2
./spark-submit --master yarn --deploy-mode cluster  --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

结果在yarn的日志里面:

alt

执行原理
alt
执行流程
1.客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。
2.RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。
3.AM启动,AM发送请求到RS,请求一批container用于启动Executor。
4.RS返回一批NM节点给AM。
5.AM连接到NM,发送请求到NM启动Executor。
6.Executor反向注册到AM所在的节点的Driver。Driver发送task到Executor。

总结
1、Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。

    2.ApplicationMaster的作用:

           为当前的Application申请资源

           给nodemanager发送消息 启动Excutor。

           任务调度。(这里和client模式的区别是AM具有调度能力,因为其就是Driver端,包含Driver进程)

ELK日志采集logstash output -> elasticsearch 数据写入性能优化。

背景

前些时间测试线上ELK环境,发现beats组件直连elasticsearch数据灌入异常快,但是过了logstash数据明显迟缓。判定logstah的灌入存在瓶颈。以下为logstash调优细节。

环境

本次针对的优化对象是线上的日志分析平台,主要数据源为基础服务器数据和应用日志
ES节点:顶配3点集群,各方面负载不高,无写入瓶颈
logstah节点:2个汇聚端,
网络:内网万兆传输,无瓶颈
数据量(条):1~1.5w/s

架构

alt

分析

logstah的功能是一个管道,通过input灌入数据,filter过滤数据,output输入数据

input:filebeat和metricbeat总连接数为500左右,且观察日志无retry 或 timeout等输出,无明显瓶颈
filter和output:logstash的正则解析过程非常消耗资源,但是我们的节点资源消耗居然不高。在新版的logstash中优化了input,filter,output的线程模式,在配置文件中可以通过配置pipeline.workers来调整filter和output的线程数

优化

查询官网手册后,最影响logstash传输效率的参数有以下几个:
1、pipeline.workers:决定filter和output的线程数,官方建议大于CPU数,如果logstah节点是混用服务器,建议等于或小于CPU数
2、pipeline.batch.size:单个线程每次调用ES bulk index API时的事件数。这些时间将被放到内存中。最好的设定值是不断地测试,测试,测试。
3、pipeline.batch.size:单个线程每次调用ES bulk index API时的事件数。这些时间将被放到内存中。最好的设定值是不断地测试,测试,测试。
优化后的logstash
pipeline.batch.size: 2500
pipeline.batch.delay: 5
pipeline.workers: 8
pipeline.batch.size: 2500
pipeline.batch.delay: 50

优化结果

alt

SparkStructured StreamExecution:持续查询的运转引擎

StreamExecution 的初始状态

定义好 Dataset/DataFrame 的产生、变换和写出,再启动 StreamExection 去持续查询。这些 Dataset/DataFrame 的产生、变换和写出的信息就对应保存在 StreamExecution 非常重要的 3 个成员变量中:
1、sources: streaming data 的产生端(比如 kafka 等)
2、logicalPlan: DataFrame/Dataset 的一系列变换(即计算逻辑)
3、sink: 最终结果写出的接收端(比如 file system 等)

StreamExection 另外的重要成员变量是:
1、currentBatchId: 当前执行的 id
2、batchCommitLog: 已经成功处理过的批次有哪些
3、offsetLog, availableOffsets, committedOffsets: 当前执行需要处理的 source data 的 meta 信息
4、offsetSeqMetadata: 当前执行的 watermark 信息(event time 相关,本文暂不涉及、另文解析)等

alt

StreamExecution 的持续查询

alt
一次执行的过程如上图;这里有 6 个关键步骤:
1.StreamExecution 通过 Source.getOffset() 获取最新的 offsets,即最新的数据进度;
2.StreamExecution 将 offsets 等写入到 offsetLog 里
这里的 offsetLog 是一个持久化的 WAL (Write-Ahead-Log),是将来可用作故障恢复用
3.StreamExecution 构造本次执行的 LogicalPlan
(3a) 将预先定义好的逻辑(即 StreamExecution 里的 logicalPlan 成员变量)制作一个副本出来
(3b) 给定刚刚取到的 offsets,通过 Source.getBatch(offsets) 获取本执行新收到的数据的 Dataset/DataFrame 表示,并替换到 (3a) 中的副本里
经过 (3a), (3b) 两步,构造完成的 LogicalPlan 就是针对本执行新收到的数据的 Dataset/DataFrame 变换(即整个处理逻辑)了
4.触发对本次执行的 LogicalPlan 的优化,得到 IncrementalExecution
逻辑计划的优化:通过 Catalyst 优化器完成
物理计划的生成与选择:结果是可以直接用于执行的 RDD DAG
逻辑计划、优化的逻辑计划、物理计划、及最后结果 RDD DAG,合并起来就是 IncrementalExecution
5.将表示计算结果的 Dataset/DataFrame (包含 IncrementalExecution) 交给 Sink,即调用 Sink.add(ds/df)
6.计算完成后的 commit
(6a) 通过 Source.commit() 告知 Source 数据已经完整处理结束;Source 可按需完成数据的 garbage-collection
(6b) 将本次执行的批次 id 写入到 batchCommitLog 里

StreamExecution 的持续查询(增量)

alt
Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据(而不仅仅是本次执行信收到的数据),所以每次执行的结果是针对全量数据进行计算的结果。

但是在实际执行过程中,由于全量数据会越攒越多,那么每次对全量数据进行计算的代价和消耗会越来越大。

Structured Streaming 的做法是:
1.引入全局范围、高可用的 StateStore
2.转全量为增量,即在每次执行时:
先从 StateStore 里 restore 出上次执行后的状态
然后加入本执行的新数据,再进行计算
如果有状态改变,将把改变的状态重新 save 到 StateStore 里
3.为了在 Dataset/DataFrame 框架里完成对 StateStore 的 restore 和 s如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:
所以 Structured Streaming 在编程模型上暴露给用户的是,每次持续查询看做面对全量数据,但在具体实现上转换为增量的持续查询。

故障恢复

alt
由于 exectutor 节点的故障可由 Spark 框架本身很好的 handle,不引起可用性问题,我们本节的故障恢复只讨论 driver 故障恢复。
1.如果在某个执行过程中发生 driver 故障,那么重新起来的 StreamExecution:
2.读取 WAL offsetlog 恢复出最新的 offsets 等;相当于取代正常流程里的 (1)(2) 步
3.读取 batchCommitLog 决定是否需要重做最近一个批次
4.如果需要,那么重做 (3a), (3b), (4), (5), (6a), (6b) 步
这里第 (5) 步需要分两种情况讨论
(i) 如果上次执行在 (5) 结束前即失效,那么本次执行里 sink 应该完整写出计算结果
(ii) 如果上次执行在 (5) 结束后才失效,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)
这样即可保证每次执行的计算结果,在 sink 这个层面,是 不重不丢 的 —— 即使中间发生过 1 次或以上的失效和恢复。

小结:end-to-end exactly-once guarantees

所以在 Structured Streaming 里,我们总结下面的关系[4]:

alt
这里的 end-to-end 指的是,如果 source 选用类似 Kafka, HDFS 等,sink 选用类似 HDFS, MySQL 等,那么 Structured Streaming 将自动保证在 sink 里的计算结果是 exactly-once 的 —— Structured Streaming终于把过去需要使用者去维护的 sink 去重逻辑接盘过去了!:-)

通过BulkLoad(MR)快速将海量数据导入到Hbase

原始文件从mysq导出来的csv文件

1
2
3
4
5
6
7
8
9
10
503003676755886086,503003161271734273,1
503003669797548035,503003161271734273,1
503003568609964035,503003161271734273,1
503003700512428038,503003161271734273,1
503003764244881414,503003161271734273,1
503003647634841604,503003161271734273,4
503003747857739782,503003161271734273,7
503003582082068480,503003161271734273,5
503003646376542208,503003161271734273,3
503003631474180105,503003161271734273,1

使用MR 对文件进行加盐操作根据MD5/region数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.security.MessageDigest;


/**
* 计数系统工具类
*/

public class CounterUtils {


private static final Logger logger = LoggerFactory.getLogger(CounterUtils.class);


/** region数量 */
private static Integer region_num = 4;

/** salt前缀 */
private static String salt_prefix = "00";




/**
* 为HBase rowkey产生salt值
* @param key 原始计数键名
* @return salt值
*/

public static String generateSalt(String key){

Integer region_seq = Math.abs(generateMD5(key).hashCode()) % region_num;

return salt_prefix+String.valueOf(region_seq);

}


/**
* 根据字符串产生MD5值
* @param msg
* @return 字符串对应的MD5值
*/

public static String generateMD5(String msg) {

MessageDigest md5 = null;

try {

md5 = MessageDigest.getInstance("MD5");

} catch (Exception e) {


logger.error("产生MD5值报错",e);

return "";
}

char[] charArray = msg.toCharArray();
byte[] byteArray = new byte[charArray.length];

for (int i = 0; i < charArray.length; i++)
byteArray[i] = (byte) charArray[i];

byte[] md5Bytes = md5.digest(byteArray);
StringBuffer hexValue = new StringBuffer();

for (int i = 0; i < md5Bytes.length; i++) {

int val = ((int) md5Bytes[i]) & 0xff;
if (val < 16)
hexValue.append("0");
hexValue.append(Integer.toHexString(val));
}

return hexValue.toString();

}

}

加盐生成另一种文件MR程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.jiatui.bigdata.util.CounterUtils;



public class DateConversion extends Configured implements Tool {

// 构建map类
public static class TestMap extends Mapper<LongWritable, Text, Text, TestWritable> {

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 根据$开始切割字段名
String[] splited = value.toString().split(",");
// 以第一个和第二个为rowkey值
String rowkey1="LOOK_TIMES_" + splited[1] + "_" + splited[0];

String rowkey = CounterUtils.generateSalt(rowkey1)+rowkey1;

Text k2 = new Text(rowkey);
// 第三个为value值
TestWritable v2 = new TestWritable(splited[2]);

context.write(k2, v2);

}

}

// 构建reduce类
public static class TestReduce extends Reducer<Text, TestWritable, Text, TestWritable> {

public void reduce(Text k2, Iterable<TestWritable> v2s, Context context)
throws IOException, InterruptedException {

String value;

// 循环所有的key值和values值
for (TestWritable testWritable : v2s) {

value = testWritable.value;

TestWritable v3 = new TestWritable(value);
context.write(k2, v3);
}

}

}

// main方法启动
public static void main(String[] args) throws IOException, Exception {
ToolRunner.run(new DateConversion(), args);
}

public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("mapred.textoutputformat.separator", ",");

String[] argArray = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job = Job.getInstance(conf, "Test");
FileSystem fs = FileSystem.get(new URI(args[1]), conf);
fs.delete(new Path(args[1]));
job.setJarByClass(DateConversion.class);
job.setMapperClass(TestMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TestWritable.class);
job.setReducerClass(TestReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TestWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}

static class TestWritable implements Writable {

String value;

public TestWritable(String value) {
this.value = value;

}

// 无参构造方法public class UserBean implements Writable
// 这个应该是在自定义writable的时候需要注意,反射过程中需要调用无参构造。
public TestWritable() {
}

public void readFields(DataInput in) throws IOException {

this.value = in.readUTF();

}

public void write(DataOutput out) throws IOException {

out.writeUTF(value);

}

public String toString() {
return value;
}

}

}

生成文件的形式为

1
2
3
4
5
6
7
8
9
10
000LOOK_TIMES_503003049082486784_503003525400236039,3
000LOOK_TIMES_503003049082486784_503003558279393283,1
001LOOK_TIMES_503003049099264000_503003517552689161,6
002LOOK_TIMES_503003049099264000_503003586590937094,2
002LOOK_TIMES_503003049099264000_503003611433799685,1
002LOOK_TIMES_503003049099264000_503003638604505093,1
003LOOK_TIMES_503003049099264000_503003646833725450,2
003LOOK_TIMES_503003049099264000_503254267948171264,2
003LOOK_TIMES_503003049099264000_504261308741332992,1
003LOOK_TIMES_503003049099264000_505510528722927616,1

在进行另一个MR操作将生成的文件导入到Hbase表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;





/**
*
* <p>Title: GeneratorHFile</p>
* <p>Description: 先把文件rowkey加盐成文件,根据rowkey ASC 排序 上传HDFS(排序在hive总进行)
* insert overwrite directory '/dmp_operator1/test/'
row format delimited
fields terminated by ','
select * from test order by id asc;
* </p>
* @author zhangshuai
* @date 2018年12月19日
*/
public class GeneratorHFile extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

private static Logger logger = Logger.getLogger(GeneratorHFile.class);



protected void map(LongWritable Key, Text Value,
Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)
throws IOException, InterruptedException {

//切分导入的数据
String Values=Value.toString();
String[] Lines=Values.split(",");


String Rowkey=Lines[0];

Long ColValue=Long.valueOf(Lines[1]);
//拼装rowkey和put;
ImmutableBytesWritable PutRowkey=new ImmutableBytesWritable(Bytes.toBytes(Rowkey));
Put put=new Put(Bytes.toBytes(Rowkey));
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("cnt"), Bytes.toBytes(ColValue));

context.write(PutRowkey,put);



}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.jiatui.bigdata;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;

import com.jiatui.bigdata.util.ScheduleKerBeros;

public class GenerateHFileDriver {

public static void main(String[] args) throws Exception {

ScheduleKerBeros scheduleKerBeros = new ScheduleKerBeros();
scheduleKerBeros.scheduled();

/**
* 获取Hbase配置,创建连接到目标表,表在Shell中已经创建好,建表语句create
* 'counter_sys.counter_tbl','cf1',这里注意HBase对大小写很敏感
*/

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.103.3,192.168.103.4,192.168.103.5");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.addResource("/home/dmp_operator1/tickets/hbase-site.xml");
conf.addResource("/home/dmp_operator1/tickets/core-site.xml");
conf.addResource("/home/dmp_operator1/tickets/hdfs-site.xml");
conf.set("zookeeper.znode.parent", "/hbase-secure");

conf.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(64 * 1024 * 1024));
conf.set("mapred.min.split.size", String.valueOf(64 * 1024 * 1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.node", String.valueOf(64 * 1024 * 1024));
conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack", String.valueOf(64 * 1024 * 1024));


Connection conn = ConnectionFactory.createConnection(conf);
Table table = conn.getTable(TableName.valueOf("counter_sys:counter_tbl"));
Admin admin = conn.getAdmin();

final String InputFile = "hdfs://DATASEA/user/hbase/test/input";
final String OutputFile = "hdfs://DATASEA/user/hbase/test/output";
final Path OutputPath = new Path(OutputFile);

// 设置相关类名
Job job = Job.getInstance(conf, "counter_sys:counter_tbl");
job.setJarByClass(GenerateHFileDriver.class);
job.setMapperClass(GeneratorHFile.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);

// 设置文件的输入路径和输出路径
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat2.class);
FileInputFormat.setInputPaths(job, InputFile);
FileOutputFormat.setOutputPath(job, OutputPath);

// 配置MapReduce作业,以执行增量加载到给定表中。
HFileOutputFormat2.configureIncrementalLoad(job, table,
conn.getRegionLocator(TableName.valueOf("counter_sys:counter_tbl")));

// MapReduce作业完成,告知RegionServers在哪里找到这些文件,将文件加载到HBase中
if (job.waitForCompletion(true)) {
LoadIncrementalHFiles Loader = new LoadIncrementalHFiles(conf);
Loader.doBulkLoad(OutputPath, admin, table,
conn.getRegionLocator(TableName.valueOf("counter_sys:counter_tbl")));
}

}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.sun.tools.extcheck.Main;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.time.LocalTime;

/**
*
* <p>
* Title: ScheduleTest
* </p>
* <p>
* Description:
* </p>
*
* @author zhangshuai
* @date 2018年12月6日
*/


public class ScheduleKerBeros {
private static final Logger logger = LoggerFactory.getLogger(ScheduleKerBeros.class);

/**
*
* <p>Title: scheduled</p>
* <p>Description: </p>
*/

public void scheduled() {

logger.info("开始授权...");
//System.setProperty("java.security.krb5.conf", this.getClass().getResource("/krb5.conf").getPath());
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
try {
logger.info("授权中1...");
UserGroupInformation.loginUserFromKeytab("hbase/dev-bg-m01@DATASEA.COM","/etc/security/keytabs/hbase.service.keytab");
//UserGroupInformation.loginUserFromKeytab("dmp_operator1@DATASEA.COM",this.getClass().getResource("/dmp_operator1.keytab").getPath());


logger.info("授权成功1...");

} catch (Exception e) {

}

}




}

Spring mvc 框架定时刷新kerberos认证票据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.XXX.counter.listener;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import java.util.Timer;
public class TicketScanerListener implements ServletContextListener {
private Object lock = new Object();
public void contextDestroyed(ServletContextEvent arg0) {
System.out.println("web应用关闭...");
}

public void contextInitialized(ServletContextEvent arg0) {
System.out.println("web应用初始化...");
// 创建定时器
Timer timer = new Timer();
// 每隔30秒就定时执行任务
timer.schedule(new TicketScanerTask(lock), 0, 1000 * 1000);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.xxx.counter.listener;

import java.util.TimerTask;

import org.apache.log4j.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.security.UserGroupInformation;
/**
* 定时器,定义定时任务的具体内容
*/
public class TicketScanerTask extends TimerTask{
private static Logger logger = Logger.getLogger(TicketScanerTask.class);
// 存储传递过来的锁
private Object lock;
// 构造方法
TicketScanerTask( Object lock){
this.lock = lock;
}

private static Configuration conf = null;
public static String principal = "dmp_operator1@DATASEA.COM";
public static String keytabPath = "dmp_operator1.keytab";
@Override
public void run() {
// 考虑到多线程的情况,这里必须要同步
synchronized (lock){

logger.info("TicketScanerTask......");

System.setProperty("java.security.krb5.conf",this.getClass().getResource("/krb5.conf").getPath());

try {
UserGroupInformation.loginUserFromKeytab("dmp_operator1@DATASEA.COM", this.getClass().getResource("/dmp_operator1.keytab").getPath());

} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("授权失败...");
}


}
}
}
1
2
3
<listener>
<listener-class>com.xxx.counter.listener.TicketScanerListener</listener-class>
</listener>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.xxx.counter.util;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HbaseConnection {
private static final Logger logger = LoggerFactory.getLogger(HbaseConnection.class);

public static Configuration conf = null;
public static Connection connection = null;

public Connection getConnection () throws IOException {
conf = HBaseConfiguration.create();

conf.addResource(this.getClass().getResource("/hbase-site.xml").getPath());
conf.addResource(this.getClass().getResource("/core-site.xml").getPath());
conf.addResource(this.getClass().getResource("/hdfs-site.xml").getPath());
conf.set("hbase.zookeeper.quorum", "192.168.103.3,192.168.103.4,192.168.103.5");
conf.set("hbase.zookeeper.property.clientPort", "2181");
conf.set("zookeeper.znode.parent", "/hbase-secure");
conf.setLong("hbase.rpc.timeout", 30000);
connection = ConnectionFactory.createConnection(conf);
return connection;
}
}

Spark Scala 获取自然日 属于每周的第一天和每周的最后一天,每月的第一天和每月的最后一天

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.xxxx.bigdata
import java.util.Calendar
import java.text.SimpleDateFormat
import org.apache.spark.sql.Row
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

object Date_week_month {
def getNowWeekStart(logdate: String) = {
var period: String = ""
var cal: Calendar = Calendar.getInstance();
var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");

cal.setTime(df.parse(logdate))

var d = 0
if (cal.get(Calendar.DAY_OF_WEEK) == 1) {
d = -6
} else {
d = 2 - cal.get(Calendar.DAY_OF_WEEK)
}

cal.add(Calendar.DAY_OF_WEEK, d)

val startdate = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())

cal.add(Calendar.DAY_OF_WEEK, 6)

val enddate = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime())

(startdate, enddate)
}

def getNowMonthStart(logdate: String) = {
var cal: Calendar = Calendar.getInstance();
var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
cal.setTime(df.parse(logdate))
cal.set(Calendar.DATE, 1)
val monthstart = df.format(cal.getTime())

cal.set(Calendar.DATE, 1)
cal.roll(Calendar.DATE, -1)
val monthend = df.format(cal.getTime())

(monthstart, monthend)

}
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("test").setMaster("local")
conf.set("spark.default.parallelism", "200")
conf.set("spark.sql.shuffle.partitions", "200")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("zookeeper.znode.parent", "/hbase-secure")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hiveContext.setConf("hive.exec.max.dynamic.partitions.pernode", "2000")
hiveContext.setConf("hive.exec.max.dynamic.partitions", "5000")
sc.setLogLevel("ERROR")
val startTime = "2018-01-01"

val endTime = "2021-12-31"

val dateFormat = new SimpleDateFormat("yyyy-MM-dd")

val dateFiled = Calendar.DAY_OF_MONTH

var beginDate = dateFormat.parse(startTime)

val endDate = dateFormat.parse(endTime)

val calendar = Calendar.getInstance()

calendar.setTime(beginDate)

val dateArray: ArrayBuffer[String] = ArrayBuffer()

val data = new ArrayBuffer[Row]
while (beginDate.compareTo(endDate) <= 0) {

val login_time = dateFormat.format(beginDate)

val starttime = getNowWeekStart(login_time)._1
val endtime = getNowWeekStart(login_time)._2

val monthstart = getNowMonthStart(login_time)._1
val monthend = getNowMonthStart(login_time)._2

data += Row(login_time, starttime, endtime, monthstart, monthend)

calendar.add(dateFiled, 1)
beginDate = calendar.getTime

}

val rdd = sc.parallelize(data)

val stuct = StructType(
Array(
StructField("login_time", StringType),
StructField("starttime", StringType),
StructField("endtime", StringType),
StructField("monthstart", StringType),
StructField("monthend", StringType)))

val test1 = hiveContext.createDataFrame(rdd, stuct)

val tmpTable = "dateTable"

test1.registerTempTable(tmpTable)

val sql = s"""
select count(*) from
$tmpTable
"""

println("sql = " + sql)

hiveContext.sql(sql).show();

sc.stop()
}
}

Hive SQL 实现以上方法先循环取出,日期之间所有的日期

1
2
3
4
5
6
7
8
9
10
START_DAY='2018-01-01'
END_DAY='2021-12-31'

initSec=`date -d $START_DAY +%s`
finiSec=`date -d $END_DAY +%s`
for ((i=$initSec; i <=$finiSec; i+=24 * 3600 ));do
cur_day=`date -d "1970-1-1 UTC $i seconds" +%F`
echo "$cur_day,,,,">> testa

done

生成了 这样的文件

1
2
3
4
5
6
7
8
2018-01-01,,,,
2018-01-02,,,,
2018-01-03,,,,
2018-01-04,,,,
2018-01-05,,,,
2018-01-06,,,,
2018-01-07,,,,
.............

然后load到Hive表中

1
CREATE TABLE test (dt String,WEEK_START string,WEEK_END string,MONTH_START string,MONTH_END string)row format delimited fields TERMINATED BY ',';

执行SQL

1
2
insert overwrite table test select  dt,date_sub(dt,cast(date_format(dt,'u') as int)-1) as start_time,
date_sub(dt,cast(date_format(dt,'u') as int)-7) as end_time,trunc(dt,'MM'),last_day(dt) from test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| 2021-02-24  | 2021-02-22  | 2021-02-28  | 2021-02-01  | 2021-02-28  |
| 2021-02-25 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-02-26 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-02-27 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-02-28 | 2021-02-22 | 2021-02-28 | 2021-02-01 | 2021-02-28 |
| 2021-03-01 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-02 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-03 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-04 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-05 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-06 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-07 | 2021-03-01 | 2021-03-07 | 2021-03-01 | 2021-03-31 |
| 2021-03-08 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-09 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-10 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-11 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-12 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-13 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-14 | 2021-03-08 | 2021-03-14 | 2021-03-01 | 2021-03-31 |
| 2021-03-15 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-16 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-17 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-18 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-19 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-20 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-21 | 2021-03-15 | 2021-03-21 | 2021-03-01 | 2021-03-31 |
| 2021-03-22 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-23 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-24 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-25 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-26 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-27 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-28 | 2021-03-22 | 2021-03-28 | 2021-03-01 | 2021-03-31 |
| 2021-03-29 | 2021-03-29 | 2021-04-04 | 2021-03-01 | 2021-03-31 |
| 2021-03-30 | 2021-03-29 | 2021-04-04 | 2021-03-01 | 2021-03-31 |
| 2021-03-31 | 2021-03-29 | 2021-04-04 | 2021-03-01 | 2021-03-31 |
| 2021-04-01 | 2021-03-29 | 2021-04-04 | 2021-04-01 | 2021-04-30 |

Spark1.X操作DataFrame示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{"id":1, "name":"Ganymede", "age":32}

{"id":2, "name":"Lilei", "age":19}

{"id":3, "name":"Lily", "age":25}

{"id":4, "name":"Hanmeimei", "age":25}

{"id":5, "name":"Lucy", "age":37}

{"id":6, "name":"Tom", "age":27}

1,Ganymede,32

2, Lilei, 19

3, Lily, 25

4, Hanmeimei, 25

5, Lucy, 37

6, wcc, 4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package DataCleaning


import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object DataFrameTest {
case class People(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameTest").setMaster("local")

val sc = new SparkContext(conf)

val sqlContxt = new SQLContext(sc)

val dfsql = sqlContxt.read.json("people.json")

val dftxt = sc.textFile("people.txt")



//映射表
dfsql.registerTempTable("people")

dfsql.show()
dfsql.printSchema()

dfsql.select(dfsql.col("id"),dfsql.col("name")).foreach( x =&gt; {

println(x.get(0),x.get(1))
})


dfsql.select(dfsql.col("id"),dfsql.col("name")).foreachPartition( Iterator =&gt;
Iterator.foreach(x =&gt; {
println(x.getAs("id"),x.getAs("name"))
}))


sqlContxt.sql("select id,name from people").foreach( x =&gt; {
println("SQL打印出来的````"+x.get(0),x.get(1))
})

sqlContxt.sql("select id,name from people").foreachPartition(Iterable =&gt; {
Iterable.foreach( x =&gt; {
println("SQL打印出来的````"+x.getAs("id"),x.getAs("name"))
})
})


val peopleRowRdd = dftxt.map(x =&gt; x.split(",")).map(data =&gt;{
val id = data(0).trim().toInt
val name = data(1).trim()
val age = data(2).trim().toInt
Row(id,name,age)

})
val structType = StructType(Array(
StructField("id", IntegerType, true),
StructField("name",StringType,true ),
StructField("age", IntegerType, true)
))

val df = sqlContxt.createDataFrame(peopleRowRdd, structType)

df.registerTempTable("people1")
println("-------------------")
df.show()
df.printSchema()



val people = sc.textFile("people.txt")

val peopleRDD = people.map { x =&gt; x.split(",") }.map ( data =&gt;
{
People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt)
})

//这里需要隐式转换一把
import sqlContxt.implicits._
val dfDF = peopleRDD.toDF()
dfDF.registerTempTable("people")

println("-------------case class反射来映射注册临时表-----------------------")
dfDF.show()
dfDF.printSchema()

}


}

Kafka HA Kafka一致性重要机制之ISR(kafka replica)

一、kafka replica

当某个topic的replication-factor为N且N大于1时,每个Partition都会有N个副本(Replica)。kafka的replica包含leader与follower。
Replica的个数小于等于Broker的个数,也就是说,对于每个Partition而言,每个Broker上最多只会有一个Replica,因此可以使用Broker id 指定Partition的Replica。
所有Partition的Replica默认情况会均匀分布到所有Broker上。

二、Data Replication如何Propagate(扩散出去)消息?

每个Partition有一个leader与多个follower,producer往某个Partition中写入数据是,只会往leader中写入数据,然后数据才会被复制进其他的Replica中。
数据是由leader push过去还是有flower pull过来?
kafka是由follower周期性或者尝试去pull(拉)过来(其实这个过程与consumer消费过程非常相似),写是都往leader上写,但是读并不是任意flower上读都行,读也只在leader上读,flower只是数据的一个备份,保证leader被挂掉后顶上来,并不往外提供服务。

三、Data Replication何时Commit?

同步复制: 只有所有的follower把数据拿过去后才commit,一致性好,可用性不高。
异步复制: 只要leader拿到数据立即commit,等follower慢慢去复制,可用性高,立即返回,一致性差一些。
Commit:是指leader告诉客户端,这条数据写成功了。kafka尽量保证commit后立即leader挂掉,其他flower都有该条数据

kafka不是完全同步,也不是完全异步,是一种ISR机制

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护
如果一个flower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除
当ISR中所有Replica都向Leader发送ACK时,leader才commit既然所有Replica都向Leader发送ACK时,leader才commit,那么flower怎么会leader落后太多?
producer往kafka中发送数据,不仅可以一次发送一条数据,还可以发送message的数组;批量发送,同步的时候批量发送,异步的时候本身就是就是批量;底层会有队列缓存起来,批量发送,对应broker而言,就会收到很多数据(假设1000),这时候leader发现自己有1000条数据,flower只有500条数据,落后了500条数据,就把它从ISR中移除出去,这时候发现其他的flower与他的差距都很小,就等待;如果因为内存等原因,差距很大,就把它从ISR中移除出去。

server配置

1
2
3
4
5
6
7
rerplica.lag.time.max.ms=10000
# 如果leader发现flower超过10秒没有向它发起fech请求,那么leader考虑这个flower是不是程序出了点问题
# 或者资源紧张调度不过来,它太慢了,不希望它拖慢后面的进度,就把它从ISR中移除。

rerplica.lag.max.messages=4000 # 相差4000条就移除
# flower慢的时候,保证高可用性,同时满足这两个条件后又加入ISR中,
# 在可用性与一致性做了动态平衡 亮点

topic配置

1
min.insync.replicas=1 # 需要保证ISR中至少有多少个replica

Producer配置

1
2
3
4
5
6
request.required.asks=0
# 0:相当于异步的,不需要leader给予回复,producer立即返回,发送就是成功,
那么发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2.Leader与Follower数据不同步),
既有可能丢失也可能会重发
# 1:当leader接收到消息之后发送ack,丢会重发,丢的概率很小
# -1:当所有的follower都同步消息成功后发送ack. 丢失消息可能性比较低

Data Replication如何处理Replica恢复

leader挂掉了,从它的follower中选举一个作为leader,并把挂掉的leader从ISR中移除,继续处理数据。一段时间后该leader重新启动了,它知道它之前的数据到哪里了,尝试获取它挂掉后leader处理的数据,获取完成后它就加入了ISR。

Data Replication如何处理Replica全部宕机

等待ISR中任一Replica恢复,并选它为Leader 等待时间较长,降低可用性或ISR中的所有Replica都无法恢复或者数据丢失,则该Partition将永不可用
选择第一个恢复的Replica为新的Leader,无论它是否在ISR中并未包含所有已被之前Leader Commit过的消息,因此会造成数据丢失可用性较高

Spark scala 抽取mysql数据 导入Hive

1
2
3
4
5
6
7
8
9
10
11
12
driverClass=com.mysql.jdbc.Driver
feeds.jdbcUrl=jdbc:mysql://172.16.XX.X:3306/feeds?useUnicode=true&characterEncoding=utf8&tinyInt1isBit=false&autoReconnect=true&useSSL=false
feeds.user=root
feeds.password=Xingrui@jiatuimysql

initialPoolSize=3
minPoolSize=3
acquireIncrement=3
maxPoolSize=15
maxIdleTime=10
acquireRetryAttempts=30
acquireRetryDelay=1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata_cardevent_etl</groupId>
<artifactId>bigdata_cardevent_etl</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>${project.artifactId}</name>


<properties>
<encoding>UTF-8</encoding>
<spark.version>1.6.3</spark.version>
<mysql.version>5.1.47</mysql.version>
<c3p0.version>0.9.5-pre4</c3p0.version>
<fastjson.version>1.2.47</fastjson.version>

</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>${c3p0.version}</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<source>1.8</source>
<target>1.8</target>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.jiatui.bigdata.format.sdklog.main.FormatSdkLogApp</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.jiatui.bigdata.util

import com.mchange.v2.c3p0.ComboPooledDataSource
import java.util.Properties
import java.io.File
import java.io.FileInputStream
import java.sql.Connection
import scala.collection.mutable.ArrayBuffer
import java.sql.ResultSet
/**
* c3p0连接
*/
class DBUtilC3p0() extends Serializable{
private val cpds: ComboPooledDataSource =new ComboPooledDataSource(true)

private val prop=new Properties

{
try {
val file: File = new File("c3p0.properties")
val in = new FileInputStream(file)
prop.load(in)
println("prop:" + prop)
cpds.setJdbcUrl(prop.getProperty("feeds.jdbcUrl"))
cpds.setDriverClass(prop.getProperty("driverClass"))
cpds.setUser(prop.getProperty("feeds.user"))
cpds.setPassword(prop.getProperty("feeds.password"))
cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize")))
cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize")))
cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement")))
cpds.setInitialPoolSize(Integer.valueOf(prop.getProperty("initialPoolSize")))
cpds.setMaxIdleTime(Integer.valueOf(prop.getProperty("maxIdleTime")))
} catch {
case ex: Exception => ex.printStackTrace()
}
}



def getConnection: Connection = {
try {
cpds.getConnection()
} catch {
case ex: Exception =>
ex.printStackTrace()
null
}
}
}
object DBUtilC3p0 {
var dbUtilC3p0: DBUtilC3p0 = _
def getDBUtilC3p0(): DBUtilC3p0 = {
synchronized {
if (dbUtilC3p0 == null) {
dbUtilC3p0 = new DBUtilC3p0()
}
}
dbUtilC3p0
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.jiatui.bigdata.util

import java.sql.ResultSet
import java.sql.Connection
import java.sql.PreparedStatement
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.sql.DriverManager
import jodd.util.PropertiesUtil
import scala.collection.mutable.ArrayBuffer

/*
* <p>Title: CardEventEtl</p>
*
* <p>Description:获取feeds所有数据表名 </p>
*
* @author zhangshuai
*
* @date 2019年3月1日
*/

object DBUtil {

def getFeedsTableNames() = {
var data = new ArrayBuffer[String]()
val conn = DBUtilC3p0.getDBUtilC3p0().getConnection
try {
val stat = conn.createStatement()
val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")

while (rs.next()) {
val tableName = rs.getString(1)

data += tableName

}
rs.close()
stat.close()
data
} catch {
case t: Exception =>
t.printStackTrace()
println("获取feeds表表名时候出错.")
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case t: Exception =>
t.printStackTrace()
}
}
data
}



// private var connection: Connection = _
//
// private var preparedStatement: PreparedStatement = _
//
// private var resultSet: ResultSet = _
//
// def getCardEventTableNames = {
// var data = new ArrayBuffer[String]()
// val conn = DBUtil.getConnection
// try {
// val stat = conn.createStatement()
// val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")
//
// while (rs.next()) {
// val tableName = rs.getString(1)
//
// data += tableName
//
// }
// rs.close()
// stat.close()
// data
// } catch {
// case t: Exception =>
// t.printStackTrace()
// println("获取统计表表名时候出错.")
// } finally {
//
// try {
// if (conn != null) {
// conn.close()
//
// }
//
// } catch {
// case t: Exception =>
// t.printStackTrace()
// }
//
// }
// data
//
// }
//
//
// def getFeedsConnection: Connection ={
// DBUtil.getConnection
// }
//
//
//
// // 数据库驱动类
//
// private val driverClass: String = ConfigurationManager.getProperty("driverClass")
// // 数据库连接地址
// private val url: String = ConfigurationManager.getProperty("feeds.jdbcUrl")
//
// // 数据库连接用户名
// private val username: String = ConfigurationManager.getProperty("feeds.user")
// // 数据库连接密码
// private val password: String = ConfigurationManager.getProperty("feeds.password")
//
// // 加载数据库驱动
// // Class.forName(driverClass)
// Class.forName("com.mysql.jdbc.Driver")
//
// // 连接池大小
// val poolSize: Int = ConfigurationManager.getProperty("poolSize").toInt
//
// // 连接池 - 同步队列
// private val pool: BlockingQueue[Connection] = new LinkedBlockingQueue[Connection]()
//
// /**
// * 初始化连接池
// */
// for (i <- 1 to poolSize) {
// DBUtil.pool.put(DriverManager.getConnection(url, username, password))
// }
//
// /**
// * 从连接池中获取一个Connection
// * @return
// */
// private def getConnection: Connection = {
// pool.take()
// }
//
// /**
// * 向连接池归还一个Connection
// * @param conn
// */
// private def returnConnection(conn: Connection): Unit = {
// DBUtil.pool.put(conn)
// }
//
// /**
// * 启动守护线程释放资源
// */
// def releaseResource() = {
// val thread = new Thread(new CloseRunnable)
// thread.setDaemon(true)
// thread.start()
// }
//
// /**
// * 关闭连接池连接资源类
// */
// class CloseRunnable extends Runnable {
// override def run(): Unit = {
// while (DBUtil.pool.size > 0) {
// try {
// // println(s"当前连接池大小: ${DBUtil.pool.size}")
// DBUtil.pool.take().close()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.jiatui.bigdata.util

import java.sql.ResultSet
import java.sql.Connection
import java.sql.PreparedStatement
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.sql.DriverManager
import jodd.util.PropertiesUtil
import scala.collection.mutable.ArrayBuffer

/*
* <p>Title: CardEventEtl</p>
*
* <p>Description:获取feeds所有数据表名 </p>
*
* @author zhangshuai
*
* @date 2019年3月1日
*/

object DBUtil {

def getFeedsTableNames() = {
var data = new ArrayBuffer[String]()
val conn = DBUtilC3p0.getDBUtilC3p0().getConnection
try {
val stat = conn.createStatement()
val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")

while (rs.next()) {
val tableName = rs.getString(1)

data += tableName

}
rs.close()
stat.close()
data
} catch {
case t: Exception =>
t.printStackTrace()
println("获取feeds表表名时候出错.")
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case t: Exception =>
t.printStackTrace()
}
}
data
}



// private var connection: Connection = _
//
// private var preparedStatement: PreparedStatement = _
//
// private var resultSet: ResultSet = _
//
// def getCardEventTableNames = {
// var data = new ArrayBuffer[String]()
// val conn = DBUtil.getConnection
// try {
// val stat = conn.createStatement()
// val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")
//
// while (rs.next()) {
// val tableName = rs.getString(1)
//
// data += tableName
//
// }
// rs.close()
// stat.close()
// data
// } catch {
// case t: Exception =>
// t.printStackTrace()
// println("获取统计表表名时候出错.")
// } finally {
//
// try {
// if (conn != null) {
// conn.close()
//
// }
//
// } catch {
// case t: Exception =>
// t.printStackTrace()
// }
//
// }
// data
//
// }
//
//
// def getFeedsConnection: Connection ={
// DBUtil.getConnection
// }
//
//
//
// // 数据库驱动类
//
// private val driverClass: String = ConfigurationManager.getProperty("driverClass")
// // 数据库连接地址
// private val url: String = ConfigurationManager.getProperty("feeds.jdbcUrl")
//
// // 数据库连接用户名
// private val username: String = ConfigurationManager.getProperty("feeds.user")
// // 数据库连接密码
// private val password: String = ConfigurationManager.getProperty("feeds.password")
//
// // 加载数据库驱动
// // Class.forName(driverClass)
// Class.forName("com.mysql.jdbc.Driver")
//
// // 连接池大小
// val poolSize: Int = ConfigurationManager.getProperty("poolSize").toInt
//
// // 连接池 - 同步队列
// private val pool: BlockingQueue[Connection] = new LinkedBlockingQueue[Connection]()
//
// /**
// * 初始化连接池
// */
// for (i <- 1 to poolSize) {
// DBUtil.pool.put(DriverManager.getConnection(url, username, password))
// }
//
// /**
// * 从连接池中获取一个Connection
// * @return
// */
// private def getConnection: Connection = {
// pool.take()
// }
//
// /**
// * 向连接池归还一个Connection
// * @param conn
// */
// private def returnConnection(conn: Connection): Unit = {
// DBUtil.pool.put(conn)
// }
//
// /**
// * 启动守护线程释放资源
// */
// def releaseResource() = {
// val thread = new Thread(new CloseRunnable)
// thread.setDaemon(true)
// thread.start()
// }
//
// /**
// * 关闭连接池连接资源类
// */
// class CloseRunnable extends Runnable {
// override def run(): Unit = {
// while (DBUtil.pool.size > 0) {
// try {
// // println(s"当前连接池大小: ${DBUtil.pool.size}")
// DBUtil.pool.take().close()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// }

}
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量