신은섭(Shin Eun Seop)

Merge branch 'develop' into ml

File mode changed
This diff is collapsed. Click to expand it.
1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<project version="4">
3 + <component name="MarkdownExportedFiles">
4 + <htmlFiles />
5 + <imageFiles />
6 + <otherFiles />
7 + </component>
8 +</project>
...\ No newline at end of file ...\ No newline at end of file
...@@ -8,7 +8,17 @@ ...@@ -8,7 +8,17 @@
8 </list> 8 </list>
9 </option> 9 </option>
10 </component> 10 </component>
11 - <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="true" project-jdk-name="1.8" project-jdk-type="JavaSDK"> 11 + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
12 <output url="file://$PROJECT_DIR$/out" /> 12 <output url="file://$PROJECT_DIR$/out" />
13 </component> 13 </component>
14 + <component name="MavenProjectsManager">
15 + <option name="originalFiles">
16 + <list>
17 + <option value="$PROJECT_DIR$/pom.xml" />
18 + </list>
19 + </option>
20 + </component>
21 + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK">
22 + <output url="file:///tmp" />
23 + </component>
14 </project> 24 </project>
...\ No newline at end of file ...\ No newline at end of file
......
File mode changed
File mode changed
...@@ -26,7 +26,30 @@ ...@@ -26,7 +26,30 @@
26 <artifactId>spark-mllib_2.11</artifactId> 26 <artifactId>spark-mllib_2.11</artifactId>
27 <version>2.3.0</version> 27 <version>2.3.0</version>
28 </dependency> 28 </dependency>
29 - 29 + <dependency>
30 + <groupId>org.apache.spark</groupId>
31 + <artifactId>spark-sql_2.11</artifactId>
32 + <version>2.3.0</version>
33 + </dependency>
34 + <dependency>
35 + <groupId>com.databricks</groupId>
36 + <artifactId>spark-csv_2.11</artifactId>
37 + <version>1.5.0</version>
38 + </dependency>
30 </dependencies> 39 </dependencies>
40 +
41 + <build>
42 + <plugins>
43 + <plugin>
44 + <groupId>org.apache.maven.plugins</groupId>
45 + <artifactId>maven-compiler-plugin</artifactId>
46 + <version>3.6.1</version>
47 + <configuration>
48 + <source>1.8</source>
49 + <target>1.8</target>
50 + </configuration>
51 + </plugin>
52 + </plugins>
53 + </build>
31 54
32 </project> 55 </project>
...\ No newline at end of file ...\ No newline at end of file
......
1 +import org.apache.spark.sql.Dataset;
2 +import org.apache.spark.sql.Row;
3 +import org.apache.spark.sql.SparkSession;
4 +import org.apache.spark.sql.expressions.Window;
5 +import org.apache.spark.sql.expressions.WindowSpec;
6 +
7 +import static org.apache.spark.sql.functions.*;
8 +import static org.apache.spark.sql.functions.lit;
9 +import static org.apache.spark.sql.functions.when;
10 +
11 +public class Aggregation {
12 +
13 + public static void main(String[] args) throws Exception {
14 +
15 + //Create Session
16 + SparkSession spark = SparkSession
17 + .builder()
18 + .appName("Detecting Fraud Clicks")
19 + .master("local")
20 + .getOrCreate();
21 +
22 + // Aggregation
23 + Aggregation agg = new Aggregation();
24 +
25 + Dataset<Row> dataset = agg.loadCSVDataSet("./train_sample.csv", spark);
26 + dataset = agg.changeTimestempToLong(dataset);
27 + dataset = agg.averageValidClickCount(dataset);
28 + dataset = agg.clickTimeDelta(dataset);
29 + dataset = agg.countClickInTenMinutes(dataset);
30 +
31 + //test
32 + dataset.where("ip == '5348' and app == '19'").show(10);
33 + }
34 +
35 +
36 + private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
37 + // Read SCV to DataSet
38 + return spark.read().format("csv")
39 + .option("inferSchema", "true")
40 + .option("header", "true")
41 + .load(path);
42 + }
43 +
44 + private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
45 + // cast timestamp to long
46 + Dataset<Row> newDF = dataset.withColumn("utc_click_time", dataset.col("click_time").cast("long"));
47 + newDF = newDF.withColumn("utc_attributed_time", dataset.col("attributed_time").cast("long"));
48 + newDF = newDF.drop("click_time").drop("attributed_time");
49 + return newDF;
50 + }
51 +
52 + private Dataset<Row> averageValidClickCount(Dataset<Row> dataset){
53 + // set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
54 + WindowSpec w = Window.partitionBy("ip", "app")
55 + .orderBy("utc_click_time")
56 + .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
57 +
58 + // aggregation
59 + Dataset<Row> newDF = dataset.withColumn("cum_count_click", count("utc_click_time").over(w));
60 + newDF = newDF.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
61 + newDF = newDF.withColumn("avg_valid_click_count", col("cum_sum_attributed").divide(col("cum_count_click")));
62 + newDF = newDF.drop("cum_count_click", "cum_sum_attributed");
63 + return newDF;
64 + }
65 +
66 + private Dataset<Row> clickTimeDelta(Dataset<Row> dataset){
67 + WindowSpec w = Window.partitionBy ("ip")
68 + .orderBy("utc_click_time");
69 +
70 + Dataset<Row> newDF = dataset.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
71 + newDF = newDF.withColumn("click_time_delta", when(col("lag(utc_click_time)").isNull(),
72 + lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),
73 + lit(0)).otherwise(col("lag(utc_click_time)"))));
74 + newDF = newDF.drop("lag(utc_click_time)");
75 + return newDF;
76 + }
77 +
78 + private Dataset<Row> countClickInTenMinutes(Dataset<Row> dataset){
79 + WindowSpec w = Window.partitionBy("ip")
80 + .orderBy("utc_click_time")
81 + .rangeBetween(Window.currentRow(),Window.currentRow()+600);
82 +
83 + Dataset<Row> newDF = dataset.withColumn("count_click_in_ten_mins",
84 + (count("utc_click_time").over(w)).minus(1)); //TODO 본인것 포함할 것인지 정해야함.
85 + return newDF;
86 + }
87 +}
1 -import java.text.ParseException;
2 -import java.text.SimpleDateFormat;
3 -import java.util.Calendar;
4 -
5 -/**
6 - * Calendar 객체 관련 기능들을 모아놓은 유틸리티 클래스
7 - *
8 - * @author croute
9 - * @since 2011.02.10
10 - */
11 -public class DateUtil
12 -{
13 -
14 - /**
15 - * 캘린더 객체를 yyyy-MM-dd HH:mm:ss 형태의 문자열로 변환합니다.
16 - *
17 - * @param cal 캘린더 객체
18 - * @return 변환된 문자열
19 - */
20 - public static String StringFromCalendar(Calendar cal)
21 - {
22 - // 날짜를 통신용 문자열로 변경
23 - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
24 - return formatter.format(cal.getTime());
25 - }
26 -
27 - /**
28 - * 캘린더 객체를 yyyy-MM-dd형태의 문자열로 변환합니다.
29 - *
30 - * @param cal 캘린더 객체
31 - * @return 변환된 문자열
32 - */
33 - public static String StringSimpleFromCalendar(Calendar cal)
34 - {
35 - // 날짜를 통신용 문자열로 변경
36 - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
37 - return formatter.format(cal.getTime());
38 - }
39 -
40 - /**
41 - * yyyy-MM-dd HH:mm:ss 형태의 문자열을 캘린더 객체로 변환합니다.
42 - * 만약 변환에 실패할 경우 오늘 날짜를 반환합니다.
43 - *
44 - * @param date 날짜를 나타내는 문자열
45 - * @return 변환된 캘린더 객체
46 - */
47 - public static Calendar CalendarFromString(String date)
48 - {
49 - if (date.length() == 0)
50 - return null;
51 - Calendar cal = Calendar.getInstance();
52 - try
53 - {
54 - //String oldstring = "2011-01-18 00:00:00.0";
55 - // Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").parse(oldstring);
56 - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
57 - cal.setTime(formatter.parse(date));
58 - }
59 - catch(ParseException e)
60 - {
61 - e.printStackTrace();
62 - }
63 - return cal;
64 - }
65 -
66 - /**
67 - * yyyy-MM-dd 형태의 문자열을 캘린더 객체로 변환합니다.
68 - * 만약 변환에 실패할 경우 오늘 날짜를 반환합니다.
69 - *
70 - * @param date 날짜를 나타내는 문자열
71 - * @return 변환된 캘린더 객체
72 - */
73 - public static Calendar CalendarFromStringSimple(String date)
74 - {
75 - Calendar cal = Calendar.getInstance();
76 -
77 - try
78 - {
79 - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
80 - cal.setTime(formatter.parse(date));
81 - }
82 - catch(ParseException e)
83 - {
84 - e.printStackTrace();
85 - }
86 - return cal;
87 - }
88 -}
...\ No newline at end of file ...\ No newline at end of file
...@@ -22,22 +22,6 @@ import java.util.*; ...@@ -22,22 +22,6 @@ import java.util.*;
22 22
23 // ml 23 // ml
24 24
25 -//ip,app,device,os,channel,click_time,attributed_time,is_attributed
26 -//87540,12,1,13,497,2017-11-07 09:30:38,,0
27 -
28 -class RecordComparator implements Comparator<Record> {
29 - @Override
30 - public int compare(Record v1 , Record v2) {
31 -// if(a.ano < b.ano) return -1;
32 -// else if(a.ano == b.ano) return 0;
33 -// else return 1;
34 - if (v1.ip.compareTo(v2.ip) == 0) {
35 - return v1.clickTime.compareTo(v2.clickTime);
36 - }
37 - return v1.ip.compareTo(v2.ip);
38 - }
39 -}
40 -
41 public class MapExample { 25 public class MapExample {
42 26
43 static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco"); 27 static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
...@@ -45,122 +29,10 @@ public class MapExample { ...@@ -45,122 +29,10 @@ public class MapExample {
45 static SQLContext sqlContext = new SQLContext(sc); 29 static SQLContext sqlContext = new SQLContext(sc);
46 30
47 public static void main(String[] args) throws Exception { 31 public static void main(String[] args) throws Exception {
48 - JavaRDD<String> file = sc.textFile("data/train.csv", 1); 32 +
49 -
50 - final String header = file.first();
51 - JavaRDD<String> data = file.filter(line -> !line.equalsIgnoreCase(header));
52 -
53 - JavaRDD<Record> records = data.map(line -> {
54 - String[] fields = line.split(",");
55 - Record sd = new Record(Integer.parseInt(fields[0]), Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), Integer.parseInt(fields[4]), fields[5], fields[6], Integer.parseInt(fields[7].trim()));
56 - return sd;
57 - });
58 -
59 -// JavaRDD<Tuple4<Integer,Double,Long,Integer>> secondSortRDD = firstSortRDD.keyBy(new Function<Tuple4<Integer, Double, Long, Integer>, Tuple2<Double, Long>>(){
60 -// @Override
61 -// public Tuple2<Double, Long> call(Tuple4<Integer, Double, Long, Integer> value) throws Exception {
62 -// return new Tuple2(value._2(),value._3());
63 -// }}).sortByKey(new TupleComparator()).values();
64 -
65 - JavaRDD<Record> firstSorted = records.sortBy(new Function<Record, String>() {
66 - @Override
67 - public String call(Record record) throws Exception {
68 - return record.clickTime;
69 - }
70 - }, true, 1);
71 -
72 - JavaRDD<Record> sortedRecords = firstSorted.sortBy(new Function<Record, Integer>() {
73 - @Override
74 - public Integer call(Record record) throws Exception {
75 - return record.ip.intValue();
76 - }
77 - }, true, 1);
78 -
79 -
80 - /*
81 - //두개를 한번에 정렬해보려 했지만 실패
82 - JavaRDD<Record> sortedRecords = records.keyBy(new Function<Record, Record>(){
83 - @Override
84 - public Record call(Record record) throws Exception {
85 - return new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed);
86 - }}).sortByKey(new RecordComparator()).values();
87 - */
88 -
89 -// System.out.println("sortedRecords");
90 -// sortedRecords.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime());});
91 -
92 -// System.out.println("make result");
93 - /*
94 - //map의 다음것을 가져오려했지만 실패
95 - JavaRDD<Record> result = sortedRecords.map(record -> {
96 - System.out.println("make addTen");
97 - Calendar addTen = Calendar.getInstance();
98 - addTen.setTime(record.clickTime.getTime());
99 - addTen.add(Calendar.MINUTE, 10);
100 -
101 - System.out.println("make count");
102 - int count = 0;
103 - for (Record temp: sortedRecords.collect()) {
104 - if (temp.ip.compareTo(record.ip) == 0 && temp.clickTime.compareTo(record.clickTime) > 0 && temp.clickTime.compareTo(addTen)< 0)
105 - count++;
106 - }
107 -
108 - return new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed, count);
109 - });
110 - */
111 -// System.out.println("result");
112 -// result.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime());});
113 -
114 - /*
115 -
116 - for (final ListIterator<String> it = list.listIterator(); it.hasNext();) {
117 - final String s = it.next();
118 - System.out.println(it.previousIndex() + ": " + s);
119 - }
120 -
121 - for (ListIterator<Record> it = sortedRecords.collect().listIterator(); it.hasNext(); it = it.nextIndex()) {
122 - it.
123 - if (temp.ip.compareTo(record.ip) == 0 && temp.clickTime.compareTo(record.clickTime) > 0 && temp.clickTime.compareTo(addTen)< 0)
124 - count++;
125 - }
126 - */
127 -
128 -
129 - List<Record> list = sortedRecords.collect();
130 -
131 - List<Record> resultList = new ArrayList<Record>();
132 - for (int i = 0; i < list.size(); i++) {
133 - //System.out.println(list.get(i).ip);
134 -
135 - Record record = list.get(i);
136 -
137 - Calendar recordI = DateUtil.CalendarFromString(record.clickTime);
138 -
139 - Calendar addTen = Calendar.getInstance();
140 - addTen.setTime(recordI.getTime());
141 - addTen.add(Calendar.MINUTE, 10);
142 -
143 - int count = 0;
144 -
145 - for (int j = i+1; j < list.size() && list.get(j).ip.compareTo(record.ip) == 0; j++) {
146 - Calendar recordJ = DateUtil.CalendarFromString(list.get(j).clickTime);
147 - if (recordJ.compareTo(recordI) > 0 && recordJ.compareTo(addTen) < 0) {
148 - count++;
149 - } else {
150 - break;
151 - }
152 - }
153 -
154 - resultList.add(new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed, count));
155 -
156 - }
157 -
158 - JavaRDD<Record> result = sc.parallelize(resultList);
159 -// result.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime() + " " + record.clickInTenMins);});
160 -
161 // Automatically identify categorical features, and index them. 33 // Automatically identify categorical features, and index them.
162 // Set maxCategories so features with > 4 distinct values are treated as continuous. 34 // Set maxCategories so features with > 4 distinct values are treated as continuous.
163 - Dataset<Row> resultds = sqlContext.createDataFrame(result, Record.class); 35 + Dataset<Row> resultds = sqlContext.createDataFrame(result);
164 36
165 System.out.println("schema start"); 37 System.out.println("schema start");
166 resultds.printSchema(); 38 resultds.printSchema();
......
1 -import scala.Serializable;
2 -
3 -public class Record implements Serializable {
4 - Integer ip;
5 - Integer app;
6 - Integer device;
7 - Integer os;
8 - Integer channel;
9 - String clickTime;
10 - String attributedTime;
11 - Integer isAttributed;
12 - Integer clickInTenMins;
13 -
14 - // constructor , getters and setters
15 - public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, String pClickTime, String pAttributedTime, Integer pIsAttributed) {
16 - ip = new Integer(pIp);
17 - app = new Integer(pApp);
18 - device = new Integer(pDevice);
19 - os = new Integer(pOs);
20 - channel = new Integer(pChannel);
21 - clickTime = pClickTime;
22 - attributedTime = pAttributedTime;
23 - isAttributed = new Integer(pIsAttributed);
24 - clickInTenMins = new Integer(0);
25 - }
26 -
27 - public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, String pClickTime, String pAttributedTime, Integer pIsAttributed, int pClickInTenMins) {
28 - ip = new Integer(pIp);
29 - app = new Integer(pApp);
30 - device = new Integer(pDevice);
31 - os = new Integer(pOs);
32 - channel = new Integer(pChannel);
33 - clickTime = pClickTime;
34 - attributedTime = pAttributedTime;
35 - isAttributed = new Integer(pIsAttributed);
36 - clickInTenMins = new Integer(pClickInTenMins);
37 - }
38 -
39 - public Integer getIp() {
40 - return ip;
41 - }
42 -
43 - public void setIp(Integer ip) {
44 - this.ip = ip;
45 - }
46 -
47 - public Integer getApp() {
48 - return app;
49 - }
50 -
51 - public void setApp(Integer app) {
52 - this.app = app;
53 - }
54 -
55 - public Integer getDevice() {
56 - return device;
57 - }
58 -
59 - public void setDevice(Integer device) {
60 - this.device = device;
61 - }
62 -
63 - public Integer getOs() {
64 - return os;
65 - }
66 -
67 - public void setOs(Integer os) {
68 - this.os = os;
69 - }
70 -
71 - public Integer getChannel() {
72 - return channel;
73 - }
74 -
75 - public void setChannel(Integer channel) {
76 - this.channel = channel;
77 - }
78 -
79 - public String getClickTime() {
80 - return clickTime;
81 - }
82 -
83 - public void setClickTime(String clickTime) {
84 - this.clickTime = clickTime;
85 - }
86 -
87 - public String getAttributedTime() {
88 - return attributedTime;
89 - }
90 -
91 - public void setAttributedTime(String attributedTime) {
92 - this.attributedTime = attributedTime;
93 - }
94 -
95 - public Integer getAttributed() {
96 - return isAttributed;
97 - }
98 -
99 - public void setAttributed(Integer attributed) {
100 - isAttributed = attributed;
101 - }
102 -
103 - public Integer getClickInTenMins() {
104 - return clickInTenMins;
105 - }
106 -
107 - public void setClickInTenMins(Integer clickInTenMins) {
108 - this.clickInTenMins = clickInTenMins;
109 - }
110 -}
...\ No newline at end of file ...\ No newline at end of file
1 -public class valid {
2 - private int x;
3 -
4 - valid() {
5 - x = 0;
6 - }
7 -
8 - void printX(){
9 - System.out.println(x);
10 - }
11 -
12 - public static void main(String[] args){
13 - valid v = new valid();
14 - v.printX();
15 - }
16 -
17 -}
File mode changed