Kafka to Flink - HDFS

kafka制造数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.flink.etl;

import lombok.AllArgsConstructor;
import lombok.ToString;

@lombok.Data
@ToString

public class Data {

public long getTimestamp() {
return timestamp;
}

public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}

public String getUuid() {
return uuid;
}

public void setUuid(String uuid) {
this.uuid = uuid;
}

private long timestamp;
private String event;
private String uuid;

public Data(long timestamp, String event, String uuid) {

this.timestamp=timestamp;
this.event=event;
this.uuid=uuid;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.flink.etl;

import com.alibaba.fastjson.JSON;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;



public class DataGenerator {

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.3.122:9092");
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
List<String> events = Arrays.asList("page_view", "adv_click", "thumbs_up");
Random random = new Random();
Data data = null;
ProducerRecord<String, String> record = null;
try {
while (true) {
long timestamp = System.currentTimeMillis();
String event = events.get(random.nextInt(events.size()));
String uuid = UUID.randomUUID().toString();
data = new Data(timestamp, event, uuid);
record = new ProducerRecord<>("data-collection-topic", JSON.toJSONString(data));
producer.send(record);
Thread.sleep(3000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}

}

StreamingJob

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.flink.etl;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.3.122:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(
"data-collection-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
DefaultRollingPolicy rollingPolicy = DefaultRollingPolicy
.create()
.withMaxPartSize(1024*1024*128) // 设置每个文件的最大大小 ,默认是128M。这里设置为128M
.withRolloverInterval(TimeUnit.MINUTES.toMillis(86400)) // 滚动写入新文件的时间,默认60s。这里设置为无限大
.withInactivityInterval(TimeUnit.MINUTES.toMillis(60)) // 60s空闲,就滚动写入新的文件
.build();
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("hdfs://DATASEA/home/dmp_operator1/bpointdata"), new SimpleStringEncoder<String>())
.withBucketAssigner(new EventTimeBucketAssigner()).withRollingPolicy(rollingPolicy).build();
stream.addSink(sink);
env.enableCheckpointing(10_000);
env.setParallelism(1);
env.setStateBackend((StateBackend) new FsStateBackend("hdfs://DATASEA/home/dmp_operator1/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}

EventTimeBucketAssigner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.flink.etl;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EventTimeBucketAssigner implements BucketAssigner<String, String> {
private ObjectMapper mapper = new ObjectMapper();

@Override
public String getBucketId(String element, Context context) {
JsonNode node = null;
long date = System.currentTimeMillis();
try {
node = mapper.readTree(element);
// date = (long) (node.path("timestamp").floatValue() * 1000);
date = (long) (node.path("timestamp").floatValue());
} catch (IOException e) {
e.printStackTrace();
}
// String partitionValue = new SimpleDateFormat("yyyyMMddHHmm").format(new Date(date));
String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));
return "dt=" + partitionValue;
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
}
文章目录
  1. 1. kafka制造数据
  2. 2. StreamingJob
  3. 3. EventTimeBucketAssigner
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量