신은섭(Shin Eun Seop)

add comment

...@@ -12,13 +12,15 @@ import static org.apache.spark.sql.functions.sum; ...@@ -12,13 +12,15 @@ import static org.apache.spark.sql.functions.sum;
12 public class AvgAdvTime { 12 public class AvgAdvTime {
13 13
14 public static void main(String[] args) throws Exception { 14 public static void main(String[] args) throws Exception {
15 - 15 +
16 + // Start Spark Session
16 SparkSession spark = SparkSession 17 SparkSession spark = SparkSession
17 .builder() 18 .builder()
18 .master("local") 19 .master("local")
19 .appName("Java Spark SQL basic example") 20 .appName("Java Spark SQL basic example")
20 .getOrCreate(); 21 .getOrCreate();
21 22
23 + // Read SCV to DataSet
22 Dataset<Row> df = spark.read().format("csv") 24 Dataset<Row> df = spark.read().format("csv")
23 .option("inferSchema", "true") 25 .option("inferSchema", "true")
24 .option("header", "true") 26 .option("header", "true")
...@@ -29,13 +31,17 @@ public class AvgAdvTime { ...@@ -29,13 +31,17 @@ public class AvgAdvTime {
29 newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long")); 31 newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long"));
30 newdf = newdf.drop("click_time").drop("attributed_time"); 32 newdf = newdf.drop("click_time").drop("attributed_time");
31 33
34 + // set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
32 WindowSpec w = Window.partitionBy("ip", "app") 35 WindowSpec w = Window.partitionBy("ip", "app")
33 .orderBy("utc_click_time") 36 .orderBy("utc_click_time")
34 .rowsBetween(Window.unboundedPreceding(), Window.currentRow()); 37 .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
35 38
39 + // aggregation
36 newdf = newdf.withColumn("cum_count_click", count("utc_click_time").over(w)); 40 newdf = newdf.withColumn("cum_count_click", count("utc_click_time").over(w));
37 newdf = newdf.withColumn("cum_sum_attributed", sum("is_attributed").over(w)); 41 newdf = newdf.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
38 newdf = newdf.withColumn("avg_efficient", col("cum_sum_attributed").divide(col("cum_count_click"))); 42 newdf = newdf.withColumn("avg_efficient", col("cum_sum_attributed").divide(col("cum_count_click")));
43 +
44 + // print example
39 newdf.where("ip == '5348' and app == '19'").show(); 45 newdf.where("ip == '5348' and app == '19'").show();
40 newdf.printSchema(); 46 newdf.printSchema();
41 47
......