hyungyun.Moon

read data

<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
<module type="JAVA_MODULE" version="4" />
\ No newline at end of file
......
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/Detecting_fraud_clicks.iml" filepath="$PROJECT_DIR$/.idea/Detecting_fraud_clicks.iml" />
</modules>
</component>
</project>
\ No newline at end of file
......@@ -16,6 +16,11 @@
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
......
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
/**
* Calendar 객체 관련 기능들을 모아놓은 유틸리티 클래스
*
* @author croute
* @since 2011.02.10
*/
public class DateUtil
{
/**
* 캘린더 객체를 yyyy-MM-dd HH:mm:ss 형태의 문자열로 변환합니다.
*
* @param cal 캘린더 객체
* @return 변환된 문자열
*/
public static String StringFromCalendar(Calendar cal)
{
// 날짜를 통신용 문자열로 변경
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return formatter.format(cal.getTime());
}
/**
* 캘린더 객체를 yyyy-MM-dd형태의 문자열로 변환합니다.
*
* @param cal 캘린더 객체
* @return 변환된 문자열
*/
public static String StringSimpleFromCalendar(Calendar cal)
{
// 날짜를 통신용 문자열로 변경
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
return formatter.format(cal.getTime());
}
/**
* yyyy-MM-dd HH:mm:ss 형태의 문자열을 캘린더 객체로 변환합니다.
* 만약 변환에 실패할 경우 오늘 날짜를 반환합니다.
*
* @param date 날짜를 나타내는 문자열
* @return 변환된 캘린더 객체
*/
public static Calendar CalendarFromString(String date)
{
if (date.length() == 0)
return null;
Calendar cal = Calendar.getInstance();
try
{
//String oldstring = "2011-01-18 00:00:00.0";
// Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").parse(oldstring);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
cal.setTime(formatter.parse(date));
}
catch(ParseException e)
{
e.printStackTrace();
}
return cal;
}
/**
* yyyy-MM-dd 형태의 문자열을 캘린더 객체로 변환합니다.
* 만약 변환에 실패할 경우 오늘 날짜를 반환합니다.
*
* @param date 날짜를 나타내는 문자열
* @return 변환된 캘린더 객체
*/
public static Calendar CalendarFromStringSimple(String date)
{
Calendar cal = Calendar.getInstance();
try
{
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
cal.setTime(formatter.parse(date));
}
catch(ParseException e)
{
e.printStackTrace();
}
return cal;
}
}
\ No newline at end of file
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import scala.Serializable;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Calendar;
import java.util.List;
//ip,app,device,os,channel,click_time,attributed_time,is_attributed
//87540,12,1,13,497,2017-11-07 09:30:38,,0
class Record implements Serializable {
int ip;
int app;
int device;
int os;
int channel;
Calendar clickTime;
Calendar attributedTime;
boolean isAttributed;
// constructor , getters and setters
public Record(int pIp, int pApp, int pDevice, int pOs, int pChannel, Calendar pClickTime, Calendar pAttributedTime, boolean pIsAttributed) {
ip = pIp;
app = pApp;
device = pDevice;
os = pOs;
channel = pChannel;
clickTime = pClickTime;
attributedTime = pAttributedTime;
isAttributed = pIsAttributed;
}
}
public class MapExample {
static SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Cesco");
static JavaSparkContext sc = new JavaSparkContext(conf);
static SQLContext sqlContext = new SQLContext(sc);
public static void main(String[] args) throws Exception {
JavaRDD<String> file = sc.textFile("/Users/hyeongyunmun/Dropbox/DetectFraudClick/data/train.csv");
// Parallelized with 2 partitions
JavaRDD<String> x = sc.parallelize(
Arrays.asList("spark", "rdd", "example", "sample", "example"),
2);
// Word Count Map Example
JavaRDD<Tuple2<String, Integer>> y1 = x.map(e -> new Tuple2<>(e, 1));
List<Tuple2<String, Integer>> list1 = y1.collect();
// Another example of making tuple with string and it's length
JavaRDD<Tuple2<String, Integer>> y2 = x.map(e -> new Tuple2<>(e, e.length()));
List<Tuple2<String, Integer>> list2 = y2.collect();
final String header = file.first();
JavaRDD<String> data = file.filter(line -> !line.equalsIgnoreCase(header));
System.out.println(list1);
JavaRDD<Record> records = data.map((line) -> {
String[] fields = line.split(",");
Record sd = new Record(Integer.parseInt(fields[0]), Integer.parseInt(fields[1]), Integer.parseInt(fields[2]), Integer.parseInt(fields[3]), Integer.parseInt(fields[4]), DateUtil.CalendarFromString(fields[5]), DateUtil.CalendarFromString(fields[6]), "1".equalsIgnoreCase(fields[7].trim()));
return sd;
});
}
}
......