python用map-reduce(IP地址库匹配省份和城市)

IP地址库文件为city.txt大致内容如下

1
2
3
4
5
6
7
8
9
10
11
12
708104192|708112383|辽宁|葫芦岛
708112384|708116479|辽宁|朝阳
708116480|708124671|辽宁|营口
708124672|708132863|辽宁|阜新
708132864|708141055|辽宁|阜新
708141056|708145151|辽宁|锦州
708145152|708149247|辽宁|葫芦岛
708149248|708153343|辽宁|葫芦岛
708153344|708157439|辽宁|盘锦
708157440|708161535|辽宁|盘锦
708161536|708165631|辽宁|朝阳
708165632|708182015|辽宁|沈阳

经过数据清洗后得出真实的手机号码并插入当前hive表中

1
2
3
4
5
6
7
8
9
INSERT overwrite table true_flase_phone_month select loginname,ipcount,user_ip,loginname regexp '^((13[0-9])|(14[5|7])|(15([0-3]|[5-9]))|(18[0,5-9]))\\d{8}$',user_ip regexp '^(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9])\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[1-9]|0)\.(25[0-5]|2[0-4][0-9]|[0-1]{1}[0-9]{2}|[1-9]{1}[0-9]{1}|[0-9])$' from log_login_info_desc_month;
CREATE TABLE true_phone_month
(
loginname STRING,
ipcount BIGINT,
user_ip STRING
)ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
INSERT overwrite table true_phone_month
select loginname,ipcount,user_ip from true_flase_phone_month where login_true_false='true' and user_ip_true_false='true';
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
python中map和reduce代码如下
---map
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
import re
import socket
import struct
import os
import gc

i=1
filepath = os.environ['mapreduce_map_input_file']

filename = os.path.split(filepath)[-1]
for line in sys.stdin:
if line.strip()=="":
continue
if filename == 'city.txt':
fields = line[:-1].split("|")
ip_1 = fields[0]
ip_2 = fields[1]
prov = fields[2]
area = fields[3]
print ("%s|%s|%s|%s|%s" % (ip_1,1,i,prov,area))
print ("%s|%s|%s|%s|%s" % (ip_2,1,i,prov,area))
i+=1
else:
fields = line[:-1].split(",")
try:
logon_no=fields[0]
logon_cnt=fields[1]
logon_ip=fields[2]
if logon_cnt and logon_no and logon_ip:
ip_s=socket.ntohl(struct.unpack("I",socket.inet_aton(str(logon_ip)))[0])
print ("%s|%s|%s|%s" % (ip_s,0,logon_no,logon_cnt))
except:
continue
---reduce
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import sys
prov=0
area=0
ip_no1=0
ip_no2=0
for line in sys.stdin:
if line.strip()=="":
continue
a=line[:-1].split("|")
try:
if int(a[1])==1:
ip_no1=a[2]
if ip_no1==ip_no2:
prov=0
area=0
else:
prov=a[3].strip()
area=a[4].strip()
ip_no2=ip_no1

if int(a[1])==0:
logon_time=a[3].strip()
logon_no=a[2].strip()
print ("%s|%s|%s|%s|%s" % (logon_time,logon_no,prov,area,a[0].strip()))
except:
pass
1
hadoop jar /usr/hdp/2.5.0.0-1245/hadoop-mapreduce/hadoop-streaming.jar -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options="-n" -D mapred.map.tasks=10 -D mapred.reduce.tasks=1 -input /data139/ods/ip_address/city.txt -input /apps/hive/warehouse/dw_ods.db/true_phone_month/* -output /data139/bill_tmp/ip_change/${month_start} -file /home/hdfs/ip_change_script/my_map.py -file /home/hdfs/ip_change_script/my_reduce.py -mapper "python my_map.py" -reducer "python my_reduce.py"

HDFS 常用 shell 命令

HDFS 常用 shell 命令

  1. 显示当前目录结构

    1
    2
    3
    4
    5
    6
    # 显示当前目录结构
    hadoop fs -ls <path>
    # 递归显示当前目录结构
    hadoop fs -ls -R <path>
    # 显示根目录下内容
    hadoop fs -ls /
  2. 创建目录

    1
    2
    3
    4
    # 创建目录
    hadoop fs -mkdir <path>
    # 递归创建目录
    hadoop fs -mkdir -p <path>
  3. 删除操作

    1
    2
    3
    4
    # 删除文件
    hadoop fs -rm <path>
    # 递归删除目录和文件
    hadoop fs -rm -R <path>
  4. 从本地加载文件到 HDFS

    1
    2
    3
    # 二选一执行即可
    hadoop fs -put [localsrc] [dst]
    hadoop fs - copyFromLocal [localsrc] [dst]
  5. 从 HDFS 导出文件到本地

    1
    2
    3
    # 二选一执行即可
    hadoop fs -get [dst] [localsrc]
    hadoop fs -copyToLocal [dst] [localsrc]
  6. 查看文件内容

    1
    2
    3
    # 二选一执行即可
    hadoop fs -text <path>
    hadoop fs -cat <path>
  7. 显示文件的最后一千字节

    1
    2
    3
    hadoop fs -tail  <path> 
    # 和Linux下一样,会持续监听文件内容变化 并显示文件的最后一千字节
    hadoop fs -tail -f <path>
  8. 拷贝文件

    1
    hadoop fs -cp [src] [dst]
  9. 移动文件

    1
    hadoop fs -mv [src] [dst]
  10. 统计当前目录下各文件大小
    默认单位字节

-s : 显示所有文件大小总和,
-h : 将以更友好的方式显示文件大小(例如 64.0m 而不是 67108864)

1
hadoop fs -du  <path>
  1. 合并下载多个文件

-nl 在每个文件的末尾添加换行符(LF)
-skip-empty-file 跳过空文件

1
2
3
hadoop fs -getmerge
# 示例 将HDFS上的hbase-policy.xml和hbase-site.xml文件合并后下载到本地的/usr/test.xml
hadoop fs -getmerge -nl /test/hbase-policy.xml /test/hbase-site.xml /usr/test.xml
  1. 统计文件系统的可用空间信息
    1
    hadoop fs -df -h /
© 2015-2020 zhangdeshuai 粤ICP备15075505号 本站总访问量