

1 -import;
2 -import org.apache.spark.sql.Dataset;
3 -import org.apache.spark.sql.Encoders;
4 -import org.apache.spark.sql.Row;
5 -import org.apache.spark.sql.SparkSession;
6 -import org.apache.spark.sql.expressions.Window;
7 -import org.apache.spark.sql.expressions.WindowSpec;
8 -
9 -import java.util.ArrayList;
10 -import java.util.List;
11 -
12 -import static org.apache.spark.sql.functions.*;
13 -import static org.apache.spark.sql.functions.lit;
14 -import static org.apache.spark.sql.functions.when;
15 -
16 -public class Aggregation {
17 -
18 - public static void main(String[] args) throws Exception {
19 -
20 - //Create Session
21 - SparkSession spark = SparkSession
22 - .builder()
23 - .appName("Detecting Fraud Clicks")
24 - .master("local")
25 - .getOrCreate();
26 -
27 - // Aggregation
28 - Aggregation agg = new Aggregation();
29 -
30 - Dataset<Row> dataset = agg.loadCSVDataSet("/home/chris/.kaggle/competitions/talkingdata-adtracking-fraud-detection/mnt/ssd/kaggle-talkingdata2/competition_files/train_sample.csv", spark);
31 - dataset = agg.changeTimestempToLong(dataset);
32 - dataset = agg.averageValidClickCount(dataset);
33 - dataset = agg.clickTimeDelta(dataset);
34 - dataset = agg.countClickInTenMinutes(dataset);
35 -
36 - long start = System.currentTimeMillis();
37 -
38 - List<String> logs_with_features =>row.toString(), Encoders.STRING()).collectAsList();
39 - String[][] contents = new String[(int)dataset.count()][11];
40 - for (int i =0; i<logs_with_features.size();i++){
41 - String str_to_split = logs_with_features.get(i);
42 - String[] tmp = str_to_split.substring(1,str_to_split.length()-1).split(",");
43 - contents[i] = tmp;
44 - }
45 -
46 - long end = System.currentTimeMillis();
47 - System.out.println("JK's Procedure time elapsed : " + (end-start)/1000.0);
48 -
49 - start = System.currentTimeMillis();
50 - List<String> stringDataset = dataset.toJSON().collectAsList();
51 - end = System.currentTimeMillis();
52 - System.out.println("Steve's Procedure 1 time elapsed : " + (end-start)/1000.0);
53 - new GUI(stringDataset, contents);
54 -
55 -
56 - }
57 -
58 -
59 - private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
60 - // Read SCV to DataSet
61 - return"csv")
62 - .option("inferSchema", "true")
63 - .option("header", "true")
64 - .load(path);
65 - }
66 -
67 - private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
68 - // cast timestamp to long
69 - Dataset<Row> newDF = dataset.withColumn("utc_click_time", dataset.col("click_time").cast("long"));
70 - newDF = newDF.withColumn("utc_attributed_time", dataset.col("attributed_time").cast("long"));
71 - newDF = newDF.drop("click_time").drop("attributed_time");
72 - return newDF;
73 - }
74 -
75 - private Dataset<Row> averageValidClickCount(Dataset<Row> dataset){
76 - // set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
77 - WindowSpec w = Window.partitionBy("ip", "app")
78 - .orderBy("utc_click_time")
79 - .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
80 -
81 - // aggregation
82 - Dataset<Row> newDF = dataset.withColumn("cum_count_click", count("utc_click_time").over(w));
83 - newDF = newDF.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
84 - newDF = newDF.withColumn("avg_valid_click_count", col("cum_sum_attributed").divide(col("cum_count_click")));
85 - newDF = newDF.drop("cum_count_click", "cum_sum_attributed");
86 - return newDF;
87 - }
88 -
89 - private Dataset<Row> clickTimeDelta(Dataset<Row> dataset){
90 - WindowSpec w = Window.partitionBy ("ip")
91 - .orderBy("utc_click_time");
92 -
93 - Dataset<Row> newDF = dataset.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
94 - newDF = newDF.withColumn("click_time_delta", when(col("lag(utc_click_time)").isNull(),
95 - lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),
96 - lit(0)).otherwise(col("lag(utc_click_time)"))));
97 - newDF = newDF.drop("lag(utc_click_time)");
98 - return newDF;
99 - }
100 -
101 - private Dataset<Row> countClickInTenMinutes(Dataset<Row> dataset){
102 - WindowSpec w = Window.partitionBy("ip")
103 - .orderBy("utc_click_time")
104 - .rangeBetween(Window.currentRow(),Window.currentRow()+600);
105 -
106 - Dataset<Row> newDF = dataset.withColumn("count_click_in_ten_mins",
107 - (count("utc_click_time").over(w)).minus(1)); //TODO 본인것 포함할 것인지 정해야함.
108 - return newDF;
109 - }
110 -}
...\ No newline at end of file ...\ No newline at end of file
This diff is collapsed. Click to expand it.