신은섭(Shin Eun Seop)

add average ad efficient field

closed Java-Cesco/Detecting_fraud_clicks#3
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
12 </content> 12 </content>
13 <orderEntry type="inheritedJdk" /> 13 <orderEntry type="inheritedJdk" />
14 <orderEntry type="sourceFolder" forTests="false" /> 14 <orderEntry type="sourceFolder" forTests="false" />
15 + <orderEntry type="library" name="Maven: com.databricks:spark-csv_2.11:1.5.0" level="project" />
16 + <orderEntry type="library" name="Maven: org.apache.commons:commons-csv:1.1" level="project" />
15 <orderEntry type="library" name="Maven: org.apache.spark:spark-core_2.11:2.3.0" level="project" /> 17 <orderEntry type="library" name="Maven: org.apache.spark:spark-core_2.11:2.3.0" level="project" />
16 <orderEntry type="library" name="Maven: org.apache.avro:avro:1.7.7" level="project" /> 18 <orderEntry type="library" name="Maven: org.apache.avro:avro:1.7.7" level="project" />
17 <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" /> 19 <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.9.13" level="project" />
......
...@@ -3,6 +3,13 @@ ...@@ -3,6 +3,13 @@
3 <component name="JavaScriptSettings"> 3 <component name="JavaScriptSettings">
4 <option name="languageLevel" value="ES6" /> 4 <option name="languageLevel" value="ES6" />
5 </component> 5 </component>
6 + <component name="MavenProjectsManager">
7 + <option name="originalFiles">
8 + <list>
9 + <option value="$PROJECT_DIR$/pom.xml" />
10 + </list>
11 + </option>
12 + </component>
6 <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK"> 13 <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK">
7 <output url="file:///tmp" /> 14 <output url="file:///tmp" />
8 </component> 15 </component>
......
1 -import org.apache.commons.net.ntp.TimeStamp; 1 +import org.apache.spark.sql.Dataset;
2 -import org.apache.spark.Aggregator;
3 -import org.apache.spark.SparkConf;
4 -import org.apache.spark.api.java.JavaSparkContext;
5 -import org.apache.spark.api.java.function.MapFunction;
6 -import org.apache.spark.sql.*;
7 import org.apache.spark.sql.Row; 2 import org.apache.spark.sql.Row;
8 -import org.apache.spark.sql.types.IntegerType; 3 +import org.apache.spark.sql.SparkSession;
9 -import org.apache.spark.sql.types.LongType; 4 +import org.apache.spark.sql.expressions.Window;
5 +import org.apache.spark.sql.expressions.WindowSpec;
10 6
11 -import java.io.Serializable; 7 +import static org.apache.spark.sql.functions.col;
12 -import java.sql.Time; 8 +import static org.apache.spark.sql.functions.count;
13 -import java.sql.Timestamp; 9 +import static org.apache.spark.sql.functions.sum;
14 -
15 -import static org.apache.spark.sql.functions.unix_timestamp;
16 10
17 11
18 public class AvgAdvTime { 12 public class AvgAdvTime {
...@@ -29,11 +23,21 @@ public class AvgAdvTime { ...@@ -29,11 +23,21 @@ public class AvgAdvTime {
29 .option("inferSchema", "true") 23 .option("inferSchema", "true")
30 .option("header", "true") 24 .option("header", "true")
31 .load("train_sample.csv"); 25 .load("train_sample.csv");
32 - df.printSchema();
33 26
34 // cast timestamp to long 27 // cast timestamp to long
35 Dataset<Row> newdf = df.withColumn("utc_click_time", df.col("click_time").cast("long")); 28 Dataset<Row> newdf = df.withColumn("utc_click_time", df.col("click_time").cast("long"));
36 newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long")); 29 newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long"));
37 - newdf.show(); 30 + newdf = newdf.drop("click_time").drop("attributed_time");
31 +
32 + WindowSpec w = Window.partitionBy("ip", "app")
33 + .orderBy("utc_click_time")
34 + .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
35 +
36 + newdf = newdf.withColumn("cum_count_click", count("utc_click_time").over(w));
37 + newdf = newdf.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
38 + newdf = newdf.withColumn("avg_efficient", col("cum_sum_attributed").divide(col("cum_count_click")));
39 + newdf.where("ip == '5348' and app == '19'").show();
40 + newdf.printSchema();
41 +
38 } 42 }
39 } 43 }
...\ No newline at end of file ...\ No newline at end of file
......