process.py 2.21 KB
import sys
import pymysql
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

f = open('config.json')
config = json.load(f)
f.close()

s = config['database']['host'].split(':')
host = s[0]
port = 3306
if len(s) == 2:
    port = int(s[1])

db = pymysql.connect(
    user=config['database']['user'],
    passwd=config['database']['password'],
    host=host,
    port=port,
    db=config['database']['name'],
    charset='utf8'
)

cursor = db.cursor()

ext_no = int(sys.argv[1])

cursor.execute("SELECT `type`, `number`, `duration` FROM calls WHERE `extraction_no`=%s", (ext_no))
calls = cursor.fetchall()

cursor.execute("SELECT `type`, `address` FROM messages WHERE `extraction_no`=%s", (ext_no))
messages = cursor.fetchall()

regions = {
    '02': 'Seoul',
    '031': 'Gyeonggi',
    '032': 'Incheon',
    '033': 'Gangwon',
    '041': 'Chungnam',
    '042': 'Daejeon',
    '043': 'Chungbuk',
    '044': 'Sejong',
    '051': 'Busan',
    '052': 'Ulsan',
    '053': 'Daegu',
    '054': 'Gyeongbuk',
    '055': 'Gyeongnam',
    '061': 'Jeonnam',
    '062': 'Gwangju',
    '063': 'Jeonbuk',
    '064': 'Jeju'
}

spark = SparkSession.builder.getOrCreate()

cdf = spark.createDataFrame(list(calls), schema=['type', 'number', 'duration'])
mdf = spark.createDataFrame(list(messages), schema=['type', 'address'])

result = None
for key, val in regions.items():
    crdf = cdf[cdf['number'][0:len(key)] == key]
    mrdf = mdf[mdf['address'][0:len(key)] == key]

    duration = crdf.select(F.sum('duration')).collect()[0][0]
    if duration == None:
        duration = 0

    rdf = spark.createDataFrame(
        [(
            val,
            crdf[crdf['type'] == 1].count(),
            crdf[crdf['type'] == 2].count(),
            duration,
            mrdf[mrdf['type'] == 1].count(),
            mrdf[mrdf['type'] == 2].count()
        )],
        schema=['region', 'incoming', 'outgoing', 'duration', 'receive', 'send'])
    if result == None:
        result = rdf
    else:
        result = result.union(rdf)

rows = result.collect()
for r in rows:
    sql = "INSERT INTO region_stats VALUES (%s, %s, %s, %s, %s, %s, %s)"
    cursor.execute(sql, (ext_no, r[0], r[1], r[2], r[3], r[4], r[5]))

db.commit()
db.close()

spark.stop()