Spark scala 抽取mysql数据 导入Hive

1
2
3
4
5
6
7
8
9
10
11
12
driverClass=com.mysql.jdbc.Driver
feeds.jdbcUrl=jdbc:mysql://172.16.XX.X:3306/feeds?useUnicode=true&characterEncoding=utf8&tinyInt1isBit=false&autoReconnect=true&useSSL=false
feeds.user=root
feeds.password=Xingrui@jiatuimysql

initialPoolSize=3
minPoolSize=3
acquireIncrement=3
maxPoolSize=15
maxIdleTime=10
acquireRetryAttempts=30
acquireRetryDelay=1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>bigdata_cardevent_etl</groupId>
<artifactId>bigdata_cardevent_etl</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>${project.artifactId}</name>


<properties>
<encoding>UTF-8</encoding>
<spark.version>1.6.3</spark.version>
<mysql.version>5.1.47</mysql.version>
<c3p0.version>0.9.5-pre4</c3p0.version>
<fastjson.version>1.2.47</fastjson.version>

</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>${c3p0.version}</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>1.6</version>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<source>1.8</source>
<target>1.8</target>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.jiatui.bigdata.format.sdklog.main.FormatSdkLogApp</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.jiatui.bigdata.util

import com.mchange.v2.c3p0.ComboPooledDataSource
import java.util.Properties
import java.io.File
import java.io.FileInputStream
import java.sql.Connection
import scala.collection.mutable.ArrayBuffer
import java.sql.ResultSet
/**
* c3p0连接
*/
class DBUtilC3p0() extends Serializable{
private val cpds: ComboPooledDataSource =new ComboPooledDataSource(true)

private val prop=new Properties

{
try {
val file: File = new File("c3p0.properties")
val in = new FileInputStream(file)
prop.load(in)
println("prop:" + prop)
cpds.setJdbcUrl(prop.getProperty("feeds.jdbcUrl"))
cpds.setDriverClass(prop.getProperty("driverClass"))
cpds.setUser(prop.getProperty("feeds.user"))
cpds.setPassword(prop.getProperty("feeds.password"))
cpds.setMaxPoolSize(Integer.valueOf(prop.getProperty("maxPoolSize")))
cpds.setMinPoolSize(Integer.valueOf(prop.getProperty("minPoolSize")))
cpds.setAcquireIncrement(Integer.valueOf(prop.getProperty("acquireIncrement")))
cpds.setInitialPoolSize(Integer.valueOf(prop.getProperty("initialPoolSize")))
cpds.setMaxIdleTime(Integer.valueOf(prop.getProperty("maxIdleTime")))
} catch {
case ex: Exception => ex.printStackTrace()
}
}



def getConnection: Connection = {
try {
cpds.getConnection()
} catch {
case ex: Exception =>
ex.printStackTrace()
null
}
}
}
object DBUtilC3p0 {
var dbUtilC3p0: DBUtilC3p0 = _
def getDBUtilC3p0(): DBUtilC3p0 = {
synchronized {
if (dbUtilC3p0 == null) {
dbUtilC3p0 = new DBUtilC3p0()
}
}
dbUtilC3p0
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.jiatui.bigdata.util

import java.sql.ResultSet
import java.sql.Connection
import java.sql.PreparedStatement
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.sql.DriverManager
import jodd.util.PropertiesUtil
import scala.collection.mutable.ArrayBuffer

/*
* <p>Title: CardEventEtl</p>
*
* <p>Description:获取feeds所有数据表名 </p>
*
* @author zhangshuai
*
* @date 2019年3月1日
*/

object DBUtil {

def getFeedsTableNames() = {
var data = new ArrayBuffer[String]()
val conn = DBUtilC3p0.getDBUtilC3p0().getConnection
try {
val stat = conn.createStatement()
val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")

while (rs.next()) {
val tableName = rs.getString(1)

data += tableName

}
rs.close()
stat.close()
data
} catch {
case t: Exception =>
t.printStackTrace()
println("获取feeds表表名时候出错.")
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case t: Exception =>
t.printStackTrace()
}
}
data
}



// private var connection: Connection = _
//
// private var preparedStatement: PreparedStatement = _
//
// private var resultSet: ResultSet = _
//
// def getCardEventTableNames = {
// var data = new ArrayBuffer[String]()
// val conn = DBUtil.getConnection
// try {
// val stat = conn.createStatement()
// val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")
//
// while (rs.next()) {
// val tableName = rs.getString(1)
//
// data += tableName
//
// }
// rs.close()
// stat.close()
// data
// } catch {
// case t: Exception =>
// t.printStackTrace()
// println("获取统计表表名时候出错.")
// } finally {
//
// try {
// if (conn != null) {
// conn.close()
//
// }
//
// } catch {
// case t: Exception =>
// t.printStackTrace()
// }
//
// }
// data
//
// }
//
//
// def getFeedsConnection: Connection ={
// DBUtil.getConnection
// }
//
//
//
// // 数据库驱动类
//
// private val driverClass: String = ConfigurationManager.getProperty("driverClass")
// // 数据库连接地址
// private val url: String = ConfigurationManager.getProperty("feeds.jdbcUrl")
//
// // 数据库连接用户名
// private val username: String = ConfigurationManager.getProperty("feeds.user")
// // 数据库连接密码
// private val password: String = ConfigurationManager.getProperty("feeds.password")
//
// // 加载数据库驱动
// // Class.forName(driverClass)
// Class.forName("com.mysql.jdbc.Driver")
//
// // 连接池大小
// val poolSize: Int = ConfigurationManager.getProperty("poolSize").toInt
//
// // 连接池 - 同步队列
// private val pool: BlockingQueue[Connection] = new LinkedBlockingQueue[Connection]()
//
// /**
// * 初始化连接池
// */
// for (i <- 1 to poolSize) {
// DBUtil.pool.put(DriverManager.getConnection(url, username, password))
// }
//
// /**
// * 从连接池中获取一个Connection
// * @return
// */
// private def getConnection: Connection = {
// pool.take()
// }
//
// /**
// * 向连接池归还一个Connection
// * @param conn
// */
// private def returnConnection(conn: Connection): Unit = {
// DBUtil.pool.put(conn)
// }
//
// /**
// * 启动守护线程释放资源
// */
// def releaseResource() = {
// val thread = new Thread(new CloseRunnable)
// thread.setDaemon(true)
// thread.start()
// }
//
// /**
// * 关闭连接池连接资源类
// */
// class CloseRunnable extends Runnable {
// override def run(): Unit = {
// while (DBUtil.pool.size > 0) {
// try {
// // println(s"当前连接池大小: ${DBUtil.pool.size}")
// DBUtil.pool.take().close()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.jiatui.bigdata.util

import java.sql.ResultSet
import java.sql.Connection
import java.sql.PreparedStatement
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.sql.DriverManager
import jodd.util.PropertiesUtil
import scala.collection.mutable.ArrayBuffer

/*
* <p>Title: CardEventEtl</p>
*
* <p>Description:获取feeds所有数据表名 </p>
*
* @author zhangshuai
*
* @date 2019年3月1日
*/

object DBUtil {

def getFeedsTableNames() = {
var data = new ArrayBuffer[String]()
val conn = DBUtilC3p0.getDBUtilC3p0().getConnection
try {
val stat = conn.createStatement()
val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")

while (rs.next()) {
val tableName = rs.getString(1)

data += tableName

}
rs.close()
stat.close()
data
} catch {
case t: Exception =>
t.printStackTrace()
println("获取feeds表表名时候出错.")
} finally {
try {
if (conn != null) {
conn.close()
}
} catch {
case t: Exception =>
t.printStackTrace()
}
}
data
}



// private var connection: Connection = _
//
// private var preparedStatement: PreparedStatement = _
//
// private var resultSet: ResultSet = _
//
// def getCardEventTableNames = {
// var data = new ArrayBuffer[String]()
// val conn = DBUtil.getConnection
// try {
// val stat = conn.createStatement()
// val rs: ResultSet = stat.executeQuery("show tables like '%_card_event'")
//
// while (rs.next()) {
// val tableName = rs.getString(1)
//
// data += tableName
//
// }
// rs.close()
// stat.close()
// data
// } catch {
// case t: Exception =>
// t.printStackTrace()
// println("获取统计表表名时候出错.")
// } finally {
//
// try {
// if (conn != null) {
// conn.close()
//
// }
//
// } catch {
// case t: Exception =>
// t.printStackTrace()
// }
//
// }
// data
//
// }
//
//
// def getFeedsConnection: Connection ={
// DBUtil.getConnection
// }
//
//
//
// // 数据库驱动类
//
// private val driverClass: String = ConfigurationManager.getProperty("driverClass")
// // 数据库连接地址
// private val url: String = ConfigurationManager.getProperty("feeds.jdbcUrl")
//
// // 数据库连接用户名
// private val username: String = ConfigurationManager.getProperty("feeds.user")
// // 数据库连接密码
// private val password: String = ConfigurationManager.getProperty("feeds.password")
//
// // 加载数据库驱动
// // Class.forName(driverClass)
// Class.forName("com.mysql.jdbc.Driver")
//
// // 连接池大小
// val poolSize: Int = ConfigurationManager.getProperty("poolSize").toInt
//
// // 连接池 - 同步队列
// private val pool: BlockingQueue[Connection] = new LinkedBlockingQueue[Connection]()
//
// /**
// * 初始化连接池
// */
// for (i <- 1 to poolSize) {
// DBUtil.pool.put(DriverManager.getConnection(url, username, password))
// }
//
// /**
// * 从连接池中获取一个Connection
// * @return
// */
// private def getConnection: Connection = {
// pool.take()
// }
//
// /**
// * 向连接池归还一个Connection
// * @param conn
// */
// private def returnConnection(conn: Connection): Unit = {
// DBUtil.pool.put(conn)
// }
//
// /**
// * 启动守护线程释放资源
// */
// def releaseResource() = {
// val thread = new Thread(new CloseRunnable)
// thread.setDaemon(true)
// thread.start()
// }
//
// /**
// * 关闭连接池连接资源类
// */
// class CloseRunnable extends Runnable {
// override def run(): Unit = {
// while (DBUtil.pool.size > 0) {
// try {
// // println(s"当前连接池大小: ${DBUtil.pool.size}")
// DBUtil.pool.take().close()
// } catch {
// case e: Exception => e.printStackTrace()
// }
// }
// }
// }

}
文章目录
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量