diff --git a/.gitignore b/.gitignore index 66791da..f168b6b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ ./COO Design Pattern/*/.project CPE-Lyon/JEE2/.idea/* #CPE-Lyon/JEE2/*/target/* + +CPE-Lyon/Big\ Data/connect.sh diff --git a/CPE-Lyon/Big Data/TODO.md b/CPE-Lyon/Big Data/TODO.md new file mode 100644 index 0000000..1c5087d --- /dev/null +++ b/CPE-Lyon/Big Data/TODO.md @@ -0,0 +1,2 @@ +- Ajouter une étape de suppression des valeurs aberrantes +- Ajouter la géoloc diff --git a/CPE-Lyon/Big Data/create_results.sql b/CPE-Lyon/Big Data/create_results.sql new file mode 100644 index 0000000..56d898a --- /dev/null +++ b/CPE-Lyon/Big Data/create_results.sql @@ -0,0 +1,35 @@ +-- chose schema +use formation50; +-- drop table +drop table if exists results_nbviolationstaxi; +-- create table +create table results_nbviolationstaxi +( + IssueDate date, + NbViolations double, + NbPuNorm double +) +stored as orc; +insert into results_nbviolationstaxi +-- select +-- all data matching left and left+right +select + vi.issuedate, + vi.nbviolations, + ta.nb_pu_norm +from refine2_nbviolations vi + left join refine2_nbtaxi ta + on vi.issuedate = ta.taxi_ride_date +-- all data matching only right +union +select + ta.taxi_ride_date, + vi.nbviolations, + ta.nb_pu_norm +from refine2_nbviolations vi + right join refine2_nbtaxi ta + on vi.issuedate = ta.taxi_ride_date +where vi.issuedate is null; + +-- count to check +select count(1) from results_nbviolationstaxi; diff --git a/CPE-Lyon/Big Data/get_taxi_dropoffzone.scala b/CPE-Lyon/Big Data/get_taxi_dropoffzone.scala new file mode 100644 index 0000000..62ec95e --- /dev/null +++ b/CPE-Lyon/Big Data/get_taxi_dropoffzone.scala @@ -0,0 +1,53 @@ +/** + add dropoff zone location column to the parking violations file + Command : spark-shell --master yarn --conf spark.ui.port=4050 +*/ + +/* +|-- geometry: struct (nullable = true) +| |-- coordinates: array (nullable = true) +| | |-- element: array (containsNull = true) +| | | |-- element: array (containsNull = true) +| | | | |-- element: array (containsNull = true) +| | | | | |-- element: double (containsNull = true) +| |-- type: string (nullable = true) +|-- geometry_name: string (nullable = true) +|-- id: string (nullable = true) +|-- properties: struct (nullable = true) +| |-- bbox: array (nullable = true) +| | |-- element: double (containsNull = true) +| |-- borough: string (nullable = true) +| |-- locationid: long (nullable = true) +| |-- objectid: long (nullable = true) +| |-- shape_area: double (nullable = true) +| |-- shape_leng: double (nullable = true) +| |-- zone: string (nullable = true) +|-- type: string (nullable = true) +*/ + +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions._ + +/* load files */ +val JSON_FILE_NAME = "taxi-zones-geojson2.json" +val CSV_FILE_NAME = "2016_yellow_trip.csv" +val csvDf = spark.read.format("csv").option("header", "true").load(CSV_FILE_NAME) +val jsonSchema = new StructType().add("geometry", StringType).add("geometry_name", StringType).add("id",StringType).add("properties", StringType).add("type", StringType) +val jsonDf = spark.read.option("type", "geojson").schema(jsonSchema).json(JSON_FILE_NAME) + +// update id colum to match source id +val cleanId = udf((id: String) => { + id.replace("nyu_2451_36743.", "") +}) + +// create new column in dataFrame with udf +val updatedJsonDf = jsonDf.withColumn("id", cleanId($"id")) +updatedJsonDf.select($"id").show() + +// join result with csv +val updatedCsv = csvDf.join( + updatedJsonDf, csvDf.col("DOLocationID") === updatedJsonDf.col("id"), "left_outer" +) + +// write output csv file +updatedCsv.coalesce(1).write.mode("overwrite").csv("csv_export_taxi") diff --git a/CPE-Lyon/Big Data/get_violation_location.scala b/CPE-Lyon/Big Data/get_violation_location.scala new file mode 100644 index 0000000..c1a8841 --- /dev/null +++ b/CPE-Lyon/Big Data/get_violation_location.scala @@ -0,0 +1,52 @@ +/** + add location column to the parking violations file + Command : spark-shell --master yarn --conf spark.ui.port=4050 +*/ + +/* open csv data file and put it in a dataFrame */ +val VIOLATIONS_FILE_2016 = "Parking_Violations_Issued_-_Fiscal_Year_2016.csv" +val VIOLATIONS_FILE_2017 = "Parking_Violations_Issued_-_Fiscal_Year_2017.csv" +val df2016 = spark.read.format("csv").option("header", "true").load(VIOLATIONS_FILE_2016) +val df2017 = spark.read.format("csv").option("header", "true").load(VIOLATIONS_FILE_2017) +val refinedDf2016 = df2016.select( + $"Plate ID", $"Registration State", $"Issue Date", $"Violation Code", + $"House Number", $"Street Name", $"Intersecting Street" +) +val refinedDf2017 = df2017.select( + $"Plate ID", $"Registration State", $"Issue Date", $"Violation Code", + $"House Number", $"Street Name", $"Intersecting Street" +) +val df = refinedDf2016.unionAll(refinedDf2017) + +/* map columns to a function */ +def getUrlQuery(houseNumber: String = "", streetName: String = "", intersectingStreet: String = "") : String = { + s"${houseNumber.replace(" ","+").replace("&", "")}+,+${streetName.replace(" ", "+").replace("&", "")}+,+${intersectingStreet.replace(" ", "+").replace("&", "")}+,+NEW-YORK" +} +/* build url from query and limit */ +def getLocationUrl(query: String = "", limit : Int = 0) : String = { + s"http://178.33.122.183:2322/api/?q=${query}&limit=${limit}" +} + +/* user defined function to retrieve new column value */ +val findLocation = udf((houseNumber: String, streetName: String, intersectingStreet: String) => { + scala.io.Source + .fromURL( + getLocationUrl( + getUrlQuery( + (if (houseNumber != null) houseNumber else "" ), + (if (streetName != null) streetName else ""), + (if (intersectingStreet != null) intersectingStreet else "") + ) , 1 + ) + ) + .mkString +}) + +/* create new column in dataFrame with udf */ +val updatedDf = df.withColumn("location", findLocation( + $"House Number", $"Street Name", $"Intersecting Street" +)) +updatedDf.show() + +/* write output csv file */ +updatedDf.coalesce(1).write.mode("overwrite").csv("csv_export_violations") diff --git a/CPE-Lyon/Big Data/pipeline_taxi.sql b/CPE-Lyon/Big Data/pipeline_taxi.sql new file mode 100644 index 0000000..d637464 --- /dev/null +++ b/CPE-Lyon/Big Data/pipeline_taxi.sql @@ -0,0 +1,138 @@ +-- create +create external table raw_taxi ( + VendorID int, + tpep_pickup_datetime string, + tpep_dropoff_datetime string, + passenger_count int, + trip_distance int, + pickup_longitude decimal, + pickup_latitude decimal, + RatecodeID int, + store_and_fwd_flag string, + dropoff_longitude decimal, + dropoff_latitude decimal, + payment_type int, + fare_amount int, + extra int, + mta_tax int, + tip_amount int, + tolls_amount int, + improvement_surcharge int, + total_amount int, + PULocationID int, + DOLocationID int +) +row format delimited fields terminated by ',' +stored as textfile +location '/user/formation35/taxi/' +tblproperties ("skip.header.line.count"="1"); + +-- reduce the number of rows +create table refine1_taxi( + VendorID int, + tpep_pickup_datetime string, + tpep_dropoff_datetime string, + passenger_count int, + trip_distance int, + pickup_longitude decimal, + pickup_latitude decimal, + RatecodeID int, + store_and_fwd_flag string, + dropoff_longitude decimal, + dropoff_latitude decimal, + payment_type int, + fare_amount int, + extra int, + mta_tax int, + tip_amount int, + tolls_amount int, + improvement_surcharge int, + total_amount int, + PULocationID int, + DOLocationID int +) +stored as orc; +insert into table refine1_taxi +select * +from raw_taxi limit 100000; + +-- reduce the number of columns +create table refine2_taxi( + tpep_pickup_datetime string, + tpep_dropoff_datetime string, + PULocationID int, + DOLocationID int +) +stored as orc; +insert into table refine2_taxi +select tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID, DOLocationID +from refine1_taxi; + +-- clean (remove null pk) +create table refine3_taxi( + tpep_pickup_datetime string, + tpep_dropoff_datetime string, + PULocationID int, + DOLocationID int +) +stored as orc; +insert into table refine3_taxi +select * +from refine2_taxi +where tpep_dropoff_datetime is not null; + +-- format +create table refine4_taxi( + tpep_pickup_datetime date, + tpep_dropoff_datetime date, + PULocationID int, + DOLocationID int +) +stored as orc; +insert into table refine4_taxi +select + to_date(from_unixtime(UNIX_TIMESTAMP(SUBSTR(tpep_pickup_datetime,0,10), 'MM/dd/yyyy'))), + to_date(from_unixtime(UNIX_TIMESTAMP(SUBSTR(tpep_dropoff_datetime,0,10), 'MM/dd/yyyy'))), + PULocationID, + DOLocationID +from refine3_taxi; + +-- agregate +create table refine1_nbtaxi( + taxi_ride_date date, + nb_pu int, + nb_do int +) +stored as orc; +insert into table refine1_nbtaxi +select + tpep_pickup_datetime, + count(tpep_pickup_datetime) as nb_pu, + count(tpep_dropoff_datetime) as nb_do +from refine4_taxi +group by tpep_pickup_datetime; + +/* +// normalize [select (value-MIN) / MAX-MIN)] + +//