hyungyun.Moon

change to window function

...@@ -21,7 +21,17 @@ ...@@ -21,7 +21,17 @@
21 <artifactId>spark-sql_2.11</artifactId> 21 <artifactId>spark-sql_2.11</artifactId>
22 <version>2.2.0</version> 22 <version>2.2.0</version>
23 </dependency> 23 </dependency>
24 + <dependency>
25 + <groupId>org.apache.spark</groupId>
26 + <artifactId>spark-sql_2.11</artifactId>
27 + <version>2.3.0</version>
28 + </dependency>
29 + <dependency>
30 + <groupId>com.databricks</groupId>
31 + <artifactId>spark-csv_2.11</artifactId>
32 + <version>1.5.0</version>
33 + </dependency>
24 34
25 </dependencies> 35 </dependencies>
26 - 36 +
27 </project> 37 </project>
...\ No newline at end of file ...\ No newline at end of file
......
1 +import org.apache.spark.sql.Column;
2 +import org.apache.spark.sql.Dataset;
3 +import org.apache.spark.sql.Row;
4 +import org.apache.spark.sql.SparkSession;
5 +import org.apache.spark.sql.expressions.Window;
6 +import org.apache.spark.sql.expressions.WindowSpec;
7 +
8 +import static org.apache.spark.sql.functions.*;
9 +
10 +
11 +public class CountTen {
12 +
13 + public static void main(String[] args) throws Exception {
14 + SparkSession spark = SparkSession
15 + .builder()
16 + .master("local")
17 + .appName("Java Spark SQL basic example")
18 + .getOrCreate();
19 +
20 + Dataset<Row> df = spark.read().format("csv")
21 + .option("inferSchema", "true")
22 + .option("header", "true")
23 + .load("./data/train.csv");
24 +
25 + // cast timestamp to long
26 + Dataset<Row> newdf = df.withColumn("utc_click_time", df.col("click_time").cast("long"));
27 + newdf = newdf.withColumn("utc_attributed_time", df.col("attributed_time").cast("long"));
28 + newdf = newdf.drop("click_time").drop("attributed_time");
29 +
30 + WindowSpec w = Window.partitionBy("ip")
31 + .orderBy("utc_click_time");
32 +// .rowsBetween(Window.currentRow(), Window.unboundedPreceding()); //Boundary end is not a valid integer: -9223372036854775808
33 +
34 + newdf = newdf.withColumn("is_clicked_in_ten_mins",
35 + (lead(col("utc_click_time"),1).over(w).minus(col("utc_click_time")).lt((long)600)).cast("long"));
36 +
37 + newdf.where("ip == '117898'").show(false);
38 + }
39 +}
...\ No newline at end of file ...\ No newline at end of file
...@@ -70,7 +70,7 @@ public class MapExample { ...@@ -70,7 +70,7 @@ public class MapExample {
70 static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco"); 70 static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
71 static JavaSparkContext sc = new JavaSparkContext(conf); 71 static JavaSparkContext sc = new JavaSparkContext(conf);
72 static SQLContext sqlContext = new SQLContext(sc); 72 static SQLContext sqlContext = new SQLContext(sc);
73 - 73 +
74 public static void main(String[] args) throws Exception { 74 public static void main(String[] args) throws Exception {
75 JavaRDD<String> file = sc.textFile("/Users/hyeongyunmun/Dropbox/DetectFraudClick/data/train.csv", 1); 75 JavaRDD<String> file = sc.textFile("/Users/hyeongyunmun/Dropbox/DetectFraudClick/data/train.csv", 1);
76 76
......