신은섭(Shin Eun Seop)
Committed by GitHub

Merge pull request #18 from Java-Cesco/feature/#3

Aggregation
File mode changed
1 +Detecting_fraud_clicks
...\ No newline at end of file ...\ No newline at end of file
This diff is collapsed. Click to expand it.
1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<project version="4">
3 + <component name="CompilerConfiguration">
4 + <annotationProcessing>
5 + <profile name="Maven default annotation processors profile" enabled="true">
6 + <sourceOutputDir name="target/generated-sources/annotations" />
7 + <sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
8 + <outputRelativeToContentRoot value="true" />
9 + <module name="Detecting_fraud_clicks" />
10 + </profile>
11 + </annotationProcessing>
12 + </component>
13 +</project>
...\ No newline at end of file ...\ No newline at end of file
1 +<?xml version="1.0" encoding="UTF-8"?>
2 +<project version="4">
3 + <component name="MarkdownExportedFiles">
4 + <htmlFiles />
5 + <imageFiles />
6 + <otherFiles />
7 + </component>
8 +</project>
...\ No newline at end of file ...\ No newline at end of file
1 <?xml version="1.0" encoding="UTF-8"?> 1 <?xml version="1.0" encoding="UTF-8"?>
2 <project version="4"> 2 <project version="4">
3 - <component name="JavaScriptSettings"> 3 + <component name="ExternalStorageConfigurationManager" enabled="true" />
4 - <option name="languageLevel" value="ES6" /> 4 + <component name="MavenProjectsManager">
5 + <option name="originalFiles">
6 + <list>
7 + <option value="$PROJECT_DIR$/pom.xml" />
8 + </list>
9 + </option>
10 + </component>
11 + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
12 + <output url="file://$PROJECT_DIR$/out" />
13 + </component>
14 + <component name="MavenProjectsManager">
15 + <option name="originalFiles">
16 + <list>
17 + <option value="$PROJECT_DIR$/pom.xml" />
18 + </list>
19 + </option>
20 + </component>
21 + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" default="false" project-jdk-name="1.8" project-jdk-type="JavaSDK">
22 + <output url="file:///tmp" />
5 </component> 23 </component>
6 </project> 24 </project>
...\ No newline at end of file ...\ No newline at end of file
......
1 -<?xml version="1.0" encoding="UTF-8"?>
2 -<project version="4">
3 - <component name="ProjectModuleManager">
4 - <modules>
5 - <module fileurl="file://$PROJECT_DIR$/.idea/Detecting_fraud_clicks.iml" filepath="$PROJECT_DIR$/.idea/Detecting_fraud_clicks.iml" />
6 - </modules>
7 - </component>
8 -</project>
...\ No newline at end of file ...\ No newline at end of file
1 <?xml version="1.0" encoding="UTF-8"?> 1 <?xml version="1.0" encoding="UTF-8"?>
2 <project version="4"> 2 <project version="4">
3 <component name="VcsDirectoryMappings"> 3 <component name="VcsDirectoryMappings">
4 - <mapping directory="" vcs="Git" /> 4 + <mapping directory="$PROJECT_DIR$" vcs="Git" />
5 </component> 5 </component>
6 </project> 6 </project>
...\ No newline at end of file ...\ No newline at end of file
......
File mode changed
File mode changed
...@@ -16,7 +16,35 @@ ...@@ -16,7 +16,35 @@
16 <artifactId>spark-core_2.11</artifactId> 16 <artifactId>spark-core_2.11</artifactId>
17 <version>2.3.0</version> 17 <version>2.3.0</version>
18 </dependency> 18 </dependency>
19 - 19 + <dependency>
20 + <groupId>org.apache.spark</groupId>
21 + <artifactId>spark-sql_2.11</artifactId>
22 + <version>2.2.0</version>
23 + </dependency>
24 + <dependency>
25 + <groupId>org.apache.spark</groupId>
26 + <artifactId>spark-sql_2.11</artifactId>
27 + <version>2.3.0</version>
28 + </dependency>
29 + <dependency>
30 + <groupId>com.databricks</groupId>
31 + <artifactId>spark-csv_2.11</artifactId>
32 + <version>1.5.0</version>
33 + </dependency>
20 </dependencies> 34 </dependencies>
21 35
36 + <build>
37 + <plugins>
38 + <plugin>
39 + <groupId>org.apache.maven.plugins</groupId>
40 + <artifactId>maven-compiler-plugin</artifactId>
41 + <version>3.6.1</version>
42 + <configuration>
43 + <source>1.8</source>
44 + <target>1.8</target>
45 + </configuration>
46 + </plugin>
47 + </plugins>
48 + </build>
49 +
22 </project> 50 </project>
...\ No newline at end of file ...\ No newline at end of file
......
1 +import org.apache.spark.sql.Dataset;
2 +import org.apache.spark.sql.Row;
3 +import org.apache.spark.sql.SparkSession;
4 +import org.apache.spark.sql.expressions.Window;
5 +import org.apache.spark.sql.expressions.WindowSpec;
6 +
7 +import static org.apache.spark.sql.functions.*;
8 +import static org.apache.spark.sql.functions.lit;
9 +import static org.apache.spark.sql.functions.when;
10 +
11 +public class Aggregation {
12 +
13 + public static void main(String[] args) throws Exception {
14 +
15 + //Create Session
16 + SparkSession spark = SparkSession
17 + .builder()
18 + .appName("Detecting Fraud Clicks")
19 + .master("local")
20 + .getOrCreate();
21 +
22 + // Aggregation
23 + Aggregation agg = new Aggregation();
24 +
25 + Dataset<Row> dataset = agg.loadCSVDataSet("./train_sample.csv", spark);
26 + dataset = agg.changeTimestempToLong(dataset);
27 + dataset = agg.averageValidClickCount(dataset);
28 + dataset = agg.clickTimeDelta(dataset);
29 + dataset = agg.countClickInTenMinutes(dataset);
30 +
31 + //test
32 + dataset.where("ip == '5348' and app == '19'").show(10);
33 + }
34 +
35 +
36 + private Dataset<Row> loadCSVDataSet(String path, SparkSession spark){
37 + // Read SCV to DataSet
38 + return spark.read().format("csv")
39 + .option("inferSchema", "true")
40 + .option("header", "true")
41 + .load(path);
42 + }
43 +
44 + private Dataset<Row> changeTimestempToLong(Dataset<Row> dataset){
45 + // cast timestamp to long
46 + Dataset<Row> newDF = dataset.withColumn("utc_click_time", dataset.col("click_time").cast("long"));
47 + newDF = newDF.withColumn("utc_attributed_time", dataset.col("attributed_time").cast("long"));
48 + newDF = newDF.drop("click_time").drop("attributed_time");
49 + return newDF;
50 + }
51 +
52 + private Dataset<Row> averageValidClickCount(Dataset<Row> dataset){
53 + // set Window partition by 'ip' and 'app' order by 'utc_click_time' select rows between 1st row to current row
54 + WindowSpec w = Window.partitionBy("ip", "app")
55 + .orderBy("utc_click_time")
56 + .rowsBetween(Window.unboundedPreceding(), Window.currentRow());
57 +
58 + // aggregation
59 + Dataset<Row> newDF = dataset.withColumn("cum_count_click", count("utc_click_time").over(w));
60 + newDF = newDF.withColumn("cum_sum_attributed", sum("is_attributed").over(w));
61 + newDF = newDF.withColumn("avg_valid_click_count", col("cum_sum_attributed").divide(col("cum_count_click")));
62 + newDF = newDF.drop("cum_count_click", "cum_sum_attributed");
63 + return newDF;
64 + }
65 +
66 + private Dataset<Row> clickTimeDelta(Dataset<Row> dataset){
67 + WindowSpec w = Window.partitionBy ("ip")
68 + .orderBy("utc_click_time");
69 +
70 + Dataset<Row> newDF = dataset.withColumn("lag(utc_click_time)", lag("utc_click_time",1).over(w));
71 + newDF = newDF.withColumn("click_time_delta", when(col("lag(utc_click_time)").isNull(),
72 + lit(0)).otherwise(col("utc_click_time")).minus(when(col("lag(utc_click_time)").isNull(),
73 + lit(0)).otherwise(col("lag(utc_click_time)"))));
74 + newDF = newDF.drop("lag(utc_click_time)");
75 + return newDF;
76 + }
77 +
78 + private Dataset<Row> countClickInTenMinutes(Dataset<Row> dataset){
79 + WindowSpec w = Window.partitionBy("ip")
80 + .orderBy("utc_click_time")
81 + .rangeBetween(Window.currentRow(),Window.currentRow()+600);
82 +
83 + Dataset<Row> newDF = dataset.withColumn("count_click_in_ten_mins",
84 + (count("utc_click_time").over(w)).minus(1)); //TODO 본인것 포함할 것인지 정해야함.
85 + return newDF;
86 + }
87 +}
1 -import org.apache.spark.SparkConf;
2 -import org.apache.spark.api.java.JavaRDD;
3 -import org.apache.spark.api.java.JavaSparkContext;
4 -import scala.Tuple2;
5 -
6 -import java.util.Arrays;
7 -import java.util.List;
8 -
9 -public class MapExample {
10 -
11 - static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
12 - static JavaSparkContext sc = new JavaSparkContext(conf);
13 -
14 - public static void main(String[] args) throws Exception {
15 -
16 - // Parallelized with 2 partitions
17 - JavaRDD<String> x = sc.parallelize(
18 - Arrays.asList("spark", "rdd", "example", "sample", "example"),
19 - 2);
20 -
21 - // Word Count Map Example
22 - JavaRDD<Tuple2<String, Integer>> y1 = x.map(e -> new Tuple2<>(e, 1));
23 - List<Tuple2<String, Integer>> list1 = y1.collect();
24 -
25 - // Another example of making tuple with string and it's length
26 - JavaRDD<Tuple2<String, Integer>> y2 = x.map(e -> new Tuple2<>(e, e.length()));
27 - List<Tuple2<String, Integer>> list2 = y2.collect();
28 -
29 - System.out.println(list1);
30 - }
31 -}
1 -public class valid {
2 - private int x;
3 -
4 - valid() {
5 - x = 0;
6 - }
7 -
8 - void printX(){
9 - System.out.println(x);
10 - }
11 -
12 - public static void main(String[] args){
13 - valid v = new valid();
14 - v.printX();
15 - }
16 -
17 -}
File mode changed
This diff could not be displayed because it is too large.