신은섭(Shin Eun Seop)

refactoring aggregation functions

close Java-Cesco/Detecting_fraud_clicks#3
This diff could not be displayed because it is too large.
1 -ip,app,device,os,channel,click_time,attributed_time,is_attributed
2 -117898,12,1,13,497,2017-11-07 09:30:38,,0
3 -117898,12,1,13,497,2017-11-07 09:30:38,,0
4 -117898,12,1,13,497,2017-11-07 09:31:38,,0
5 -117898,12,1,13,497,2017-11-07 09:31:38,,0
6 -117898,12,1,13,497,2017-11-07 09:31:38,,0
7 -117898,12,1,13,497,2017-11-07 09:39:38,,0
8 -117898,12,1,13,497,2017-11-07 09:40:38,,0
...\ No newline at end of file ...\ No newline at end of file
...@@ -19,25 +19,26 @@ public class Aggregation { ...@@ -19,25 +19,26 @@ public class Aggregation {
19 .master("local") 19 .master("local")
20 .getOrCreate(); 20 .getOrCreate();
21 21
22 + // Aggregation
22 Aggregation agg = new Aggregation(); 23 Aggregation agg = new Aggregation();
23 24
24 Dataset<Row> dataset = agg.loadCSVDataSet("./train_sample.csv", spark); 25 Dataset<Row> dataset = agg.loadCSVDataSet("./train_sample.csv", spark);
25 dataset = agg.changeTimestempToLong(dataset); 26 dataset = agg.changeTimestempToLong(dataset);
26 dataset = agg.averageValidClickCount(dataset); 27 dataset = agg.averageValidClickCount(dataset);
27 dataset = agg.clickTimeDelta(dataset); 28 dataset = agg.clickTimeDelta(dataset);
29 + dataset = agg.countClickInTenMinutes(dataset);
28 30
29 - dataset.where("ip == '5348' and app == '19'").show(); 31 + //test
30 - 32 + dataset.where("ip == '5348' and app == '19'").show(10);
31 } 33 }
32 34
33 35
34 private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){ 36 private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
35 // Read SCV to DataSet 37 // Read SCV to DataSet
36 - Dataset<Row> dataset = spark.read().format("csv") 38 + return spark.read().format("csv")
37 .option("inferSchema", "true") 39 .option("inferSchema", "true")
38 .option("header", "true") 40 .option("header", "true")
39 - .load("train_sample.csv"); 41 + .load(path);
40 - return dataset;
41 } 42 }
42 43
43 private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){ 44 private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
...@@ -73,4 +74,14 @@ public class Aggregation { ...@@ -73,4 +74,14 @@ public class Aggregation {
73 newDF = newDF.drop("lag(utc_click_time)"); 74 newDF = newDF.drop("lag(utc_click_time)");
74 return newDF; 75 return newDF;
75 } 76 }
77 +
78 + private Dataset<Row> countClickInTenMinutes(Dataset<Row> dataset){
79 + WindowSpec w = Window.partitionBy("ip")
80 + .orderBy("utc_click_time")
81 + .rangeBetween(Window.currentRow(),Window.currentRow()+600);
82 +
83 + Dataset<Row> newDF = dataset.withColumn("count_click_in_ten_mins",
84 + (count("utc_click_time").over(w)).minus(1)); //TODO 본인것 포함할 것인지 정해야함.
85 + return newDF;
86 + }
76 } 87 }
......
1 -import org.apache.spark.sql.Dataset;
2 -import org.apache.spark.sql.Row;
3 -import org.apache.spark.sql.SparkSession;
4 -import org.apache.spark.sql.expressions.Window;
5 -import org.apache.spark.sql.expressions.WindowSpec;
6 -
7 -import static org.apache.spark.sql.functions.col;
8 -import static org.apache.spark.sql.functions.count;
9 -import static org.apache.spark.sql.functions.sum;
10 -
11 -
12 -public class AvgAdvTime {
13 -
14 - public static void main(String[] args) throws Exception {
15 -
16 - // Start Spark Session
17 - SparkSession spark = SparkSession
18 - .builder()
19 - .master("local")
20 - .appName("Java Spark SQL basic example")
21 - .getOrCreate();
22 -
23 - // Read SCV to DataSet
24 - Dataset<Row> df = spark.read().format("csv")
25 - .option("inferSchema", "true")
26 - .option("header", "true")
27 - .load("train_sample.csv");
28 -
29 - // cast timestamp to long
30 - Dataset<Row> newdf = df.withColumn("utc_click_time", df.col("click_time").cast("long"));
31 - newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long"));
32 - newdf = newdf.drop("click_time").drop("attributed_time");
33 -
34 - // set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
35 - WindowSpec w = Window.partitionBy("ip", "app")
36 - .orderBy("utc_click_time")
37 - .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
38 -
39 - // aggregation
40 - newdf = newdf.withColumn("cum_count_click", count("utc_click_time").over(w));
41 - newdf = newdf.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
42 - newdf = newdf.withColumn("avg_efficient", col("cum_sum_attributed").divide(col("cum_count_click")));
43 -
44 - // print example
45 - newdf.where("ip == '5348' and app == '19'").show();
46 - newdf.printSchema();
47 -
48 - }
49 -}
...\ No newline at end of file ...\ No newline at end of file
1 -import org.apache.spark.sql.Column;
2 -import org.apache.spark.sql.Dataset;
3 -import org.apache.spark.sql.Row;
4 -import org.apache.spark.sql.SparkSession;
5 -import org.apache.spark.sql.expressions.Window;
6 -import org.apache.spark.sql.expressions.WindowSpec;
7 -
8 -import static org.apache.spark.sql.functions.*;
9 -
10 -
11 -public class CountTen {
12 -
13 - public static void main(String[] args) throws Exception {
14 - SparkSession spark = SparkSession
15 - .builder()
16 - .master("local")
17 - .appName("Java Spark SQL basic example")
18 - .getOrCreate();
19 -
20 - Dataset<Row> df = spark.read().format("csv")
21 - .option("inferSchema", "true")
22 - .option("header", "true")
23 - .load("./data/train_.csv");
24 -
25 - // cast timestamp to long
26 - Dataset<Row> newdf = df.withColumn("utc_click_time", df.col("click_time").cast("long"));
27 - newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long"));
28 - newdf = newdf.drop("click_time").drop("attributed_time");
29 -
30 - WindowSpec w = Window.partitionBy("ip")
31 - .orderBy("utc_click_time")
32 - .rangeBetween(Window.currentRow(),Window.currentRow()+600);
33 -// .rowsBetween(Window.currentRow(), Window.unboundedPreceding()); //Boundary end is not a valid integer: -9223372036854775808
34 -
35 - newdf = newdf.withColumn("is_clicked_in_ten_mins",
36 - (count("utc_click_time").over(w)).minus(1)); //본인것 포함할 것인지 정해야함.
37 -// newdf = newdf.withColumn("is_clicked_in_ten_mins",
38 -// (lead(col("utc_click_time"),1).over(w).minus(col("utc_click_time")).lt((long)600)).cast("long"));
39 -
40 - newdf.where("ip == '117898'").show(false);
41 - }
42 -}
...\ No newline at end of file ...\ No newline at end of file
1 -import org.apache.spark.SparkConf;
2 -import org.apache.spark.api.java.JavaSparkContext;
3 -import org.apache.spark.sql.Dataset;
4 -import org.apache.spark.sql.Row;
5 -import org.apache.spark.sql.SparkSession;
6 -import org.apache.spark.sql.expressions.Window;
7 -import org.apache.spark.sql.expressions.WindowSpec;
8 -
9 -import javax.xml.crypto.Data;
10 -
11 -import static org.apache.spark.sql.functions.*;
12 -
13 -public class calForwardTimeDelta {
14 - static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
15 - static JavaSparkContext sc = new JavaSparkContext(conf);
16 -
17 - public static void main(String[] args) throws Exception{
18 - //Create Session
19 - SparkSession spark = SparkSession
20 - .builder()
21 - .appName("Detecting Fraud Clicks")
22 - .getOrCreate();
23 -
24 - //run methods hereu
25 - calcDelta(spark);
26 - }
27 -
28 - private static void calcDelta(SparkSession spark){
29 - // put the path the file you gonna deal with being placed
30 - String filepath = "train_sample.csv";
31 -
32 - // create Dataset from files
33 - Dataset<Row> logDF = spark.read()
34 - .format("csv")
35 - .option("inferSchema", "true")
36 - .option("header","true")
37 - .load(filepath);
38 -
39 - // cast timestamp(click_time, attributed_time) type to long type
40 -
41 - //add column for long(click_time)
42 - Dataset<Row> newDF = logDF.withColumn("utc_click_time", logDF.col("click_time").cast("long"));
43 - //add column for long(attributed_time)
44 - newDF = newDF.withColumn("utc_attributed_time", logDF.col("attributed_time").cast("long"));
45 - //drop timestamp type columns
46 - newDF = newDF.drop("click_time").drop("attributed_time");
47 - newDF.createOrReplaceTempView("logs");
48 -
49 - WindowSpec w = Window.partitionBy ("ip")
50 - .orderBy("utc_click_time");
51 -
52 - newDF = newDF.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
53 - newDF.where("ip=10").show();
54 - newDF = newDF.withColumn("delta", when(col("lag(utc_click_time)").isNull(),lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),lit(0)).otherwise(col("lag(utc_click_time)"))));
55 - //newDF = newDF.withColumn("delta", datediff());
56 - newDF = newDF.drop("lag(utc_click_time)");
57 - newDF = newDF.orderBy("ip");
58 -
59 - newDF.show();
60 - }
61 -
62 -}