Showing
6 changed files
with
113 additions
and
21 deletions
.idea/compiler.xml
0 → 100644
1 | +<?xml version="1.0" encoding="UTF-8"?> | ||
2 | +<project version="4"> | ||
3 | + <component name="CompilerConfiguration"> | ||
4 | + <annotationProcessing> | ||
5 | + <profile name="Maven default annotation processors profile" enabled="true"> | ||
6 | + <sourceOutputDir name="target/generated-sources/annotations" /> | ||
7 | + <sourceTestOutputDir name="target/generated-test-sources/test-annotations" /> | ||
8 | + <outputRelativeToContentRoot value="true" /> | ||
9 | + </profile> | ||
10 | + </annotationProcessing> | ||
11 | + </component> | ||
12 | +</project> | ||
... | \ No newline at end of file | ... | \ No newline at end of file |
1 | <?xml version="1.0" encoding="UTF-8"?> | 1 | <?xml version="1.0" encoding="UTF-8"?> |
2 | <project version="4"> | 2 | <project version="4"> |
3 | - <component name="JavaScriptSettings"> | 3 | + <component name="ExternalStorageConfigurationManager" enabled="true" /> |
4 | - <option name="languageLevel" value="ES6" /> | 4 | + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK"> |
5 | + <output url="file://$PROJECT_DIR$/out" /> | ||
5 | </component> | 6 | </component> |
6 | </project> | 7 | </project> |
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
1 | <?xml version="1.0" encoding="UTF-8"?> | 1 | <?xml version="1.0" encoding="UTF-8"?> |
2 | <project version="4"> | 2 | <project version="4"> |
3 | <component name="VcsDirectoryMappings"> | 3 | <component name="VcsDirectoryMappings"> |
4 | - <mapping directory="" vcs="Git" /> | 4 | + <mapping directory="$PROJECT_DIR$" vcs="Git" /> |
5 | </component> | 5 | </component> |
6 | </project> | 6 | </project> |
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
... | @@ -16,6 +16,13 @@ | ... | @@ -16,6 +16,13 @@ |
16 | <artifactId>spark-core_2.11</artifactId> | 16 | <artifactId>spark-core_2.11</artifactId> |
17 | <version>2.3.0</version> | 17 | <version>2.3.0</version> |
18 | </dependency> | 18 | </dependency> |
19 | + <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> | ||
20 | + <dependency> | ||
21 | + <groupId>org.apache.spark</groupId> | ||
22 | + <artifactId>spark-sql_2.11</artifactId> | ||
23 | + <version>2.3.0</version> | ||
24 | + </dependency> | ||
25 | + | ||
19 | 26 | ||
20 | </dependencies> | 27 | </dependencies> |
21 | 28 | ... | ... |
1 | +import com.oracle.jrockit.jfr.DataType; | ||
1 | import org.apache.spark.SparkConf; | 2 | import org.apache.spark.SparkConf; |
2 | import org.apache.spark.api.java.JavaRDD; | 3 | import org.apache.spark.api.java.JavaRDD; |
3 | import org.apache.spark.api.java.JavaSparkContext; | 4 | import org.apache.spark.api.java.JavaSparkContext; |
4 | -import scala.Tuple2; | 5 | +import org.apache.spark.sql.api.java.UDF1; |
6 | +import org.apache.spark.sql.types.DataTypes; | ||
7 | +import org.apache.spark.sql.*; | ||
8 | +import org.apache.spark.sql.types.LongType; | ||
9 | +import org.apache.spark.sql.types.StructType; | ||
10 | +import org.apache.spark.sql.types.TimestampType; | ||
11 | +import org.apache.spark.sql.SparkSession; | ||
12 | +import org.apache.spark.sql.Dataset; | ||
13 | +import org.apache.spark.sql.Row; | ||
14 | +import org.apache.spark.sql.functions.*; | ||
15 | +import org.apache.spark.sql.Column; | ||
16 | +import org.apache.spark.sql.SparkSession; | ||
17 | + | ||
18 | + | ||
19 | +import java.util.Calendar; | ||
20 | + | ||
21 | +import java.sql.Time; | ||
22 | +import java.sql.Timestamp; | ||
5 | 23 | ||
6 | import java.util.Arrays; | 24 | import java.util.Arrays; |
7 | import java.util.List; | 25 | import java.util.List; |
8 | 26 | ||
27 | +class data { | ||
28 | + private int ip; | ||
29 | + private int app; | ||
30 | + private int device; | ||
31 | + private int os; | ||
32 | + private int channel; | ||
33 | + //private int date click_time; | ||
34 | + //private int date a | ||
35 | +} | ||
36 | + | ||
9 | public class MapExample { | 37 | public class MapExample { |
10 | 38 | ||
11 | static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco"); | 39 | static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco"); |
12 | static JavaSparkContext sc = new JavaSparkContext(conf); | 40 | static JavaSparkContext sc = new JavaSparkContext(conf); |
13 | - | 41 | + |
14 | public static void main(String[] args) throws Exception { | 42 | public static void main(String[] args) throws Exception { |
15 | - | 43 | + |
16 | - // Parallelized with 2 partitions | 44 | + SparkSession spark = SparkSession |
17 | - JavaRDD<String> x = sc.parallelize( | 45 | + .builder() |
18 | - Arrays.asList("spark", "rdd", "example", "sample", "example"), | 46 | + .appName("Java Spark SQL basic example") |
19 | - 2); | 47 | + .config("spark.some.config.option", "some-value") |
20 | - | 48 | + .getOrCreate(); |
21 | - // Word Count Map Example | 49 | + |
22 | - JavaRDD<Tuple2<String, Integer>> y1 = x.map(e -> new Tuple2<>(e, 1)); | 50 | + |
23 | - List<Tuple2<String, Integer>> list1 = y1.collect(); | 51 | + /*StructType schema = new StructType() |
24 | - | 52 | + .add("ip","int") |
25 | - // Another example of making tuple with string and it's length | 53 | + .add("app","int") |
26 | - JavaRDD<Tuple2<String, Integer>> y2 = x.map(e -> new Tuple2<>(e, e.length())); | 54 | + .add("device","int") |
27 | - List<Tuple2<String, Integer>> list2 = y2.collect(); | 55 | + .add("os","int") |
28 | - | 56 | + .add("channel","int") |
29 | - System.out.println(list1); | 57 | + .add("click_time","datetime2") |
58 | + .add("attributed_time","datetime2") | ||
59 | + .add("is_attributed","int");*/ | ||
60 | + Dataset<Row> df2 = spark.read().format("csv") | ||
61 | + .option("sep", ",") | ||
62 | + .option("inferSchema","true") | ||
63 | + .option("header", "true") | ||
64 | + .load("train_sample.csv"); | ||
65 | + | ||
66 | + Dataset<Row> df=df2.select("ip","click_time"); | ||
67 | + df.createOrReplaceTempView("data"); | ||
68 | + | ||
69 | + Dataset<Row> mindf = spark.sql("select ip, min(click_time) as first_click_time from data group by ip order by ip"); | ||
70 | + mindf.createOrReplaceTempView("mindf"); | ||
71 | + | ||
72 | + Dataset<Row> df3 = spark.sql("select * from data natural join mindf order by ip,click_time"); | ||
73 | + df3.createOrReplaceTempView("df3"); | ||
74 | + | ||
75 | + //df3.na().fill("2020-01-01 00:00"); | ||
76 | + Dataset<Row> newdf = df3.withColumn("utc_click_time",df3.col("click_time").cast("long")); | ||
77 | + newdf = newdf.withColumn("utc_fclick_time",df3.col("first_click_time").cast("long")); | ||
78 | + newdf=newdf.drop("click_time"); newdf=newdf.drop("first_click_time"); | ||
79 | + | ||
80 | + newdf = newdf.withColumn("within_ten",((newdf.col("utc_click_time").minus(newdf.col("utc_fclick_time"))).divide(60))); | ||
81 | + | ||
82 | + newdf = newdf.withColumn("check_ten",newdf.col("within_ten").gt((long)600)); | ||
83 | + Dataset<Row> newdf2=newdf.select("ip","check_ten"); | ||
84 | + | ||
85 | + //newdf = newdf.withColumn("check_ten",newdf.col("within_ten").notEqual((long)0)); | ||
86 | + //newdf = newdf.withColumn("check_ten",(newdf.col("within_ten") -> newdf.col("within_ten") < 11 ? 1 : 0)); | ||
87 | + | ||
88 | + newdf2.show(); | ||
89 | + | ||
90 | + /*Dataset<Row> df4= spark.sql("select ip, cast((convert(bigint,click_time)) as decimal) from df3"); | ||
91 | + spark.udf().register("timestamp_diff",new UDF1<DataTypes.LongType,DataTypes.LongType>(){ | ||
92 | + public long call(long arg1,long arg2) throws Exception{ | ||
93 | + long arg3 = arg1-arg2; | ||
94 | + return arg3; | ||
95 | + } | ||
96 | + }, DataTypes.LongType); | ||
97 | + | ||
98 | + df3.withColumn("term",df3.col("click_time").cast("timestamp")-df3.col("first_click_time").cast("timestamp")); | ||
99 | + Dataset<Row> df4=df3.toDF(); | ||
100 | + df4 = df4.withColumn("Check10Min", df3.select(df4("click_time").minus(df4("first_click_time")));*/ | ||
101 | + | ||
30 | } | 102 | } |
31 | -} | 103 | +} |
... | \ No newline at end of file | ... | \ No newline at end of file | ... | ... |
train_sample.csv
0 → 100644
This diff could not be displayed because it is too large.
-
Please register or login to post a comment