hyungyun.Moon

count click in 10mins

1 import org.apache.spark.SparkConf; 1 import org.apache.spark.SparkConf;
2 +import org.apache.spark.api.java.JavaPairRDD;
2 import org.apache.spark.api.java.JavaRDD; 3 import org.apache.spark.api.java.JavaRDD;
3 import org.apache.spark.api.java.JavaSparkContext; 4 import org.apache.spark.api.java.JavaSparkContext;
4 import org.apache.spark.api.java.function.Function; 5 import org.apache.spark.api.java.function.Function;
...@@ -10,32 +11,57 @@ import org.apache.spark.sql.types.StructType; ...@@ -10,32 +11,57 @@ import org.apache.spark.sql.types.StructType;
10 import scala.Serializable; 11 import scala.Serializable;
11 import scala.Tuple2; 12 import scala.Tuple2;
12 13
13 -import java.util.Arrays; 14 +import java.util.*;
14 -import java.util.Calendar;
15 -import java.util.List;
16 15
17 //ip,app,device,os,channel,click_time,attributed_time,is_attributed 16 //ip,app,device,os,channel,click_time,attributed_time,is_attributed
18 //87540,12,1,13,497,2017-11-07 09:30:38,,0 17 //87540,12,1,13,497,2017-11-07 09:30:38,,0
19 class Record implements Serializable { 18 class Record implements Serializable {
20 - int ip; 19 + Integer ip;
21 - int app; 20 + Integer app;
22 - int device; 21 + Integer device;
23 - int os; 22 + Integer os;
24 - int channel; 23 + Integer channel;
25 Calendar clickTime; 24 Calendar clickTime;
26 Calendar attributedTime; 25 Calendar attributedTime;
27 - boolean isAttributed; 26 + Boolean isAttributed;
27 + Integer clickInTenMins;
28 28
29 // constructor , getters and setters 29 // constructor , getters and setters
30 public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, Calendar pClickTime, Calendar pAttributedTime, boolean pIsAttributed) { 30 public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, Calendar pClickTime, Calendar pAttributedTime, boolean pIsAttributed) {
31 - ip = pIp; 31 + ip = new Integer(pIp);
32 - app = pApp; 32 + app = new Integer(pApp);
33 - device = pDevice; 33 + device = new Integer(pDevice);
34 - os = pOs; 34 + os = new Integer(pOs);
35 - channel = pChannel; 35 + channel = new Integer(pChannel);
36 clickTime = pClickTime; 36 clickTime = pClickTime;
37 attributedTime = pAttributedTime; 37 attributedTime = pAttributedTime;
38 - isAttributed = pIsAttributed; 38 + isAttributed = new Boolean(pIsAttributed);
39 + clickInTenMins = new Integer(0);
40 + }
41 +
42 + public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, Calendar pClickTime, Calendar pAttributedTime, boolean pIsAttributed, int pClickInTenMins) {
43 + ip = new Integer(pIp);
44 + app = new Integer(pApp);
45 + device = new Integer(pDevice);
46 + os = new Integer(pOs);
47 + channel = new Integer(pChannel);
48 + clickTime = pClickTime;
49 + attributedTime = pAttributedTime;
50 + isAttributed = new Boolean(pIsAttributed);
51 + clickInTenMins = new Integer(pClickInTenMins);
52 + }
53 +}
54 +
55 +class RecordComparator implements Comparator<Record> {
56 + @Override
57 + public int compare(Record v1 , Record v2) {
58 +// if(a.ano < b.ano) return -1;
59 +// else if(a.ano == b.ano) return 0;
60 +// else return 1;
61 + if (v1.ip.compareTo(v2.ip) == 0) {
62 + return v1.clickTime.compareTo(v2.clickTime);
63 + }
64 + return v1.ip.compareTo(v2.ip);
39 } 65 }
40 } 66 }
41 67
...@@ -46,15 +72,112 @@ public class MapExample { ...@@ -46,15 +72,112 @@ public class MapExample {
46 static SQLContext sqlContext = new SQLContext(sc); 72 static SQLContext sqlContext = new SQLContext(sc);
47 73
48 public static void main(String[] args) throws Exception { 74 public static void main(String[] args) throws Exception {
49 - JavaRDD<String> file = sc.textFile("/Users/hyeongyunmun/Dropbox/DetectFraudClick/data/train.csv"); 75 + JavaRDD<String> file = sc.textFile("/Users/hyeongyunmun/Dropbox/DetectFraudClick/data/train.csv", 1);
50 76
51 final String header = file.first(); 77 final String header = file.first();
52 JavaRDD<String> data = file.filter(line -> !line.equalsIgnoreCase(header)); 78 JavaRDD<String> data = file.filter(line -> !line.equalsIgnoreCase(header));
53 79
54 - JavaRDD<Record> records = data.map((line) -> { 80 + JavaRDD<Record> records = data.map(line -> {
55 String[] fields = line.split(","); 81 String[] fields = line.split(",");
56 Record sd = new Record(Integer.parseInt(fields[0]), Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), Integer.parseInt(fields[4]), DateUtil.CalendarFromString(fields[5]), DateUtil.CalendarFromString(fields[6]), "1".equalsIgnoreCase(fields[7].trim())); 82 Record sd = new Record(Integer.parseInt(fields[0]), Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), Integer.parseInt(fields[4]), DateUtil.CalendarFromString(fields[5]), DateUtil.CalendarFromString(fields[6]), "1".equalsIgnoreCase(fields[7].trim()));
57 return sd; 83 return sd;
58 }); 84 });
85 +
86 +// JavaRDD<Tuple4<Integer,Double,Long,Integer>> secondSortRDD = firstSortRDD.keyBy(new Function<Tuple4<Integer, Double, Long, Integer>, Tuple2<Double, Long>>(){
87 +// @Override
88 +// public Tuple2<Double, Long> call(Tuple4<Integer, Double, Long, Integer> value) throws Exception {
89 +// return new Tuple2(value._2(),value._3());
90 +// }}).sortByKey(new TupleComparator()).values();
91 +
92 + JavaRDD<Record> firstSorted = records.sortBy(new Function<Record, Calendar>() {
93 + @Override
94 + public Calendar call(Record record) throws Exception {
95 + return record.clickTime;
96 + }
97 + }, true, 1);
98 +
99 + JavaRDD<Record> sortedRecords = firstSorted.sortBy(new Function<Record, Integer>() {
100 + @Override
101 + public Integer call(Record record) throws Exception {
102 + return record.ip.intValue();
103 + }
104 + }, true, 1);
105 +
106 +
107 + /*
108 + //두개를 한번에 정렬해보려 했지만 실패
109 + JavaRDD<Record> sortedRecords = records.keyBy(new Function<Record, Record>(){
110 + @Override
111 + public Record call(Record record) throws Exception {
112 + return new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed);
113 + }}).sortByKey(new RecordComparator()).values();
114 + */
115 +
116 +// System.out.println("sortedRecords");
117 +// sortedRecords.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime());});
118 +
119 +// System.out.println("make result");
120 + /*
121 + //map의 다음것을 가져오려했지만 실패
122 + JavaRDD<Record> result = sortedRecords.map(record -> {
123 + System.out.println("make addTen");
124 + Calendar addTen = Calendar.getInstance();
125 + addTen.setTime(record.clickTime.getTime());
126 + addTen.add(Calendar.MINUTE, 10);
127 +
128 + System.out.println("make count");
129 + int count = 0;
130 + for (Record temp: sortedRecords.collect()) {
131 + if (temp.ip.compareTo(record.ip) == 0 && temp.clickTime.compareTo(record.clickTime) > 0 && temp.clickTime.compareTo(addTen)< 0)
132 + count++;
133 + }
134 +
135 + return new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed, count);
136 + });
137 + */
138 +// System.out.println("result");
139 +// result.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime());});
140 +
141 + /*
142 +
143 + for (final ListIterator<String> it = list.listIterator(); it.hasNext();) {
144 + final String s = it.next();
145 + System.out.println(it.previousIndex() + ": " + s);
146 + }
147 +
148 + for (ListIterator<Record> it = sortedRecords.collect().listIterator(); it.hasNext(); it = it.nextIndex()) {
149 + it.
150 + if (temp.ip.compareTo(record.ip) == 0 && temp.clickTime.compareTo(record.clickTime) > 0 && temp.clickTime.compareTo(addTen)< 0)
151 + count++;
152 + }
153 + */
154 +
155 +
156 + List<Record> list = sortedRecords.collect();
157 +
158 + List<Record> resultList = new ArrayList<Record>();
159 + for (int i = 0; i < list.size(); i++) {
160 + //System.out.println(list.get(i).ip);
161 +
162 + Record record = list.get(i);
163 +
164 + Calendar addTen = Calendar.getInstance();
165 + addTen.setTime(record.clickTime.getTime());
166 + addTen.add(Calendar.MINUTE, 10);
167 +
168 + int count = 0;
169 +
170 + for (int j = i+1; j < list.size() && list.get(j).ip.compareTo(record.ip) == 0
171 + && list.get(j).clickTime.compareTo(record.clickTime) > 0 &&list.get(j).clickTime.compareTo(addTen) < 0; j++)
172 + count++;
173 +
174 + resultList.add(new Record(record.ip, record.app, record.device, record.os, record.channel, record.clickTime, record.attributedTime, record.isAttributed, count));
175 +
176 + }
177 +
178 +
179 + JavaRDD<Record> result = sc.parallelize(resultList);
180 + result.foreach(record -> {System.out.println(record.ip + " " + record.clickTime.getTime() + " " + record.clickInTenMins);});
181 +
59 } 182 }
60 } 183 }
......