process.py
2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import sys
import pymysql
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
f = open('config.json')
config = json.load(f)
f.close()
s = config['database']['host'].split(':')
host = s[0]
port = 3306
if len(s) == 2:
port = int(s[1])
db = pymysql.connect(
user=config['database']['user'],
passwd=config['database']['password'],
host=host,
port=port,
db=config['database']['name'],
charset='utf8'
)
cursor = db.cursor()
ext_no = int(sys.argv[1])
cursor.execute("SELECT `type`, `number`, `duration` FROM calls WHERE `extraction_no`=%s", (ext_no))
calls = cursor.fetchall()
cursor.execute("SELECT `type`, `address` FROM messages WHERE `extraction_no`=%s", (ext_no))
messages = cursor.fetchall()
regions = {
'02': 'Seoul',
'031': 'Gyeonggi',
'032': 'Incheon',
'033': 'Gangwon',
'041': 'Chungnam',
'042': 'Daejeon',
'043': 'Chungbuk',
'044': 'Sejong',
'051': 'Busan',
'052': 'Ulsan',
'053': 'Daegu',
'054': 'Gyeongbuk',
'055': 'Gyeongnam',
'061': 'Jeonnam',
'062': 'Gwangju',
'063': 'Jeonbuk',
'064': 'Jeju'
}
spark = SparkSession.builder.getOrCreate()
cdf = spark.createDataFrame(list(calls), schema=['type', 'number', 'duration'])
mdf = spark.createDataFrame(list(messages), schema=['type', 'address'])
result = None
for key, val in regions.items():
crdf = cdf[cdf['number'][0:len(key)] == key]
mrdf = mdf[mdf['address'][0:len(key)] == key]
duration = crdf.select(F.sum('duration')).collect()[0][0]
if duration == None:
duration = 0
rdf = spark.createDataFrame(
[(
val,
crdf[crdf['type'] == 1].count(),
crdf[crdf['type'] == 2].count(),
duration,
mrdf[mrdf['type'] == 1].count(),
mrdf[mrdf['type'] == 2].count()
)],
schema=['region', 'incoming', 'outgoing', 'duration', 'receive', 'send'])
if result == None:
result = rdf
else:
result = result.union(rdf)
rows = result.collect()
for r in rows:
sql = "INSERT INTO region_stats VALUES (%s, %s, %s, %s, %s, %s, %s)"
cursor.execute(sql, (ext_no, r[0], r[1], r[2], r[3], r[4], r[5]))
db.commit()
db.close()
spark.stop()