Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
./COO Design Pattern/*/.project
CPE-Lyon/JEE2/.idea/*
#CPE-Lyon/JEE2/*/target/*

CPE-Lyon/Big\ Data/connect.sh
2 changes: 2 additions & 0 deletions CPE-Lyon/Big Data/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Ajouter une étape de suppression des valeurs aberrantes
- Ajouter la géoloc
35 changes: 35 additions & 0 deletions CPE-Lyon/Big Data/create_results.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- chose schema
use formation50;
-- drop table
drop table if exists results_nbviolationstaxi;
-- create table
create table results_nbviolationstaxi
(
IssueDate date,
NbViolations double,
NbPuNorm double
)
stored as orc;
insert into results_nbviolationstaxi
-- select
-- all data matching left and left+right
select
vi.issuedate,
vi.nbviolations,
ta.nb_pu_norm
from refine2_nbviolations vi
left join refine2_nbtaxi ta
on vi.issuedate = ta.taxi_ride_date
-- all data matching only right
union
select
ta.taxi_ride_date,
vi.nbviolations,
ta.nb_pu_norm
from refine2_nbviolations vi
right join refine2_nbtaxi ta
on vi.issuedate = ta.taxi_ride_date
where vi.issuedate is null;

-- count to check
select count(1) from results_nbviolationstaxi;
53 changes: 53 additions & 0 deletions CPE-Lyon/Big Data/get_taxi_dropoffzone.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
add dropoff zone location column to the parking violations file
Command : spark-shell --master yarn --conf spark.ui.port=4050
*/

/*
|-- geometry: struct (nullable = true)
| |-- coordinates: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: array (containsNull = true)
| | | | |-- element: array (containsNull = true)
| | | | | |-- element: double (containsNull = true)
| |-- type: string (nullable = true)
|-- geometry_name: string (nullable = true)
|-- id: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- bbox: array (nullable = true)
| | |-- element: double (containsNull = true)
| |-- borough: string (nullable = true)
| |-- locationid: long (nullable = true)
| |-- objectid: long (nullable = true)
| |-- shape_area: double (nullable = true)
| |-- shape_leng: double (nullable = true)
| |-- zone: string (nullable = true)
|-- type: string (nullable = true)
*/

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

/* load files */
val JSON_FILE_NAME = "taxi-zones-geojson2.json"
val CSV_FILE_NAME = "2016_yellow_trip.csv"
val csvDf = spark.read.format("csv").option("header", "true").load(CSV_FILE_NAME)
val jsonSchema = new StructType().add("geometry", StringType).add("geometry_name", StringType).add("id",StringType).add("properties", StringType).add("type", StringType)
val jsonDf = spark.read.option("type", "geojson").schema(jsonSchema).json(JSON_FILE_NAME)

// update id colum to match source id
val cleanId = udf((id: String) => {
id.replace("nyu_2451_36743.", "")
})

// create new column in dataFrame with udf
val updatedJsonDf = jsonDf.withColumn("id", cleanId($"id"))
updatedJsonDf.select($"id").show()

// join result with csv
val updatedCsv = csvDf.join(
updatedJsonDf, csvDf.col("DOLocationID") === updatedJsonDf.col("id"), "left_outer"
)

// write output csv file
updatedCsv.coalesce(1).write.mode("overwrite").csv("csv_export_taxi")
52 changes: 52 additions & 0 deletions CPE-Lyon/Big Data/get_violation_location.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
add location column to the parking violations file
Command : spark-shell --master yarn --conf spark.ui.port=4050
*/

/* open csv data file and put it in a dataFrame */
val VIOLATIONS_FILE_2016 = "Parking_Violations_Issued_-_Fiscal_Year_2016.csv"
val VIOLATIONS_FILE_2017 = "Parking_Violations_Issued_-_Fiscal_Year_2017.csv"
val df2016 = spark.read.format("csv").option("header", "true").load(VIOLATIONS_FILE_2016)
val df2017 = spark.read.format("csv").option("header", "true").load(VIOLATIONS_FILE_2017)
val refinedDf2016 = df2016.select(
$"Plate ID", $"Registration State", $"Issue Date", $"Violation Code",
$"House Number", $"Street Name", $"Intersecting Street"
)
val refinedDf2017 = df2017.select(
$"Plate ID", $"Registration State", $"Issue Date", $"Violation Code",
$"House Number", $"Street Name", $"Intersecting Street"
)
val df = refinedDf2016.unionAll(refinedDf2017)

/* map columns to a function */
def getUrlQuery(houseNumber: String = "", streetName: String = "", intersectingStreet: String = "") : String = {
s"${houseNumber.replace(" ","+").replace("&", "")}+,+${streetName.replace(" ", "+").replace("&", "")}+,+${intersectingStreet.replace(" ", "+").replace("&", "")}+,+NEW-YORK"
}
/* build url from query and limit */
def getLocationUrl(query: String = "", limit : Int = 0) : String = {
s"http://178.33.122.183:2322/api/?q=${query}&limit=${limit}"
}

/* user defined function to retrieve new column value */
val findLocation = udf((houseNumber: String, streetName: String, intersectingStreet: String) => {
scala.io.Source
.fromURL(
getLocationUrl(
getUrlQuery(
(if (houseNumber != null) houseNumber else "" ),
(if (streetName != null) streetName else ""),
(if (intersectingStreet != null) intersectingStreet else "")
) , 1
)
)
.mkString
})

/* create new column in dataFrame with udf */
val updatedDf = df.withColumn("location", findLocation(
$"House Number", $"Street Name", $"Intersecting Street"
))
updatedDf.show()

/* write output csv file */
updatedDf.coalesce(1).write.mode("overwrite").csv("csv_export_violations")
138 changes: 138 additions & 0 deletions CPE-Lyon/Big Data/pipeline_taxi.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
-- create
create external table raw_taxi (
VendorID int,
tpep_pickup_datetime string,
tpep_dropoff_datetime string,
passenger_count int,
trip_distance int,
pickup_longitude decimal,
pickup_latitude decimal,
RatecodeID int,
store_and_fwd_flag string,
dropoff_longitude decimal,
dropoff_latitude decimal,
payment_type int,
fare_amount int,
extra int,
mta_tax int,
tip_amount int,
tolls_amount int,
improvement_surcharge int,
total_amount int,
PULocationID int,
DOLocationID int
)
row format delimited fields terminated by ','
stored as textfile
location '/user/formation35/taxi/'
tblproperties ("skip.header.line.count"="1");

-- reduce the number of rows
create table refine1_taxi(
VendorID int,
tpep_pickup_datetime string,
tpep_dropoff_datetime string,
passenger_count int,
trip_distance int,
pickup_longitude decimal,
pickup_latitude decimal,
RatecodeID int,
store_and_fwd_flag string,
dropoff_longitude decimal,
dropoff_latitude decimal,
payment_type int,
fare_amount int,
extra int,
mta_tax int,
tip_amount int,
tolls_amount int,
improvement_surcharge int,
total_amount int,
PULocationID int,
DOLocationID int
)
stored as orc;
insert into table refine1_taxi
select *
from raw_taxi limit 100000;

-- reduce the number of columns
create table refine2_taxi(
tpep_pickup_datetime string,
tpep_dropoff_datetime string,
PULocationID int,
DOLocationID int
)
stored as orc;
insert into table refine2_taxi
select tpep_pickup_datetime, tpep_dropoff_datetime, PULocationID, DOLocationID
from refine1_taxi;

-- clean (remove null pk)
create table refine3_taxi(
tpep_pickup_datetime string,
tpep_dropoff_datetime string,
PULocationID int,
DOLocationID int
)
stored as orc;
insert into table refine3_taxi
select *
from refine2_taxi
where tpep_dropoff_datetime is not null;

-- format
create table refine4_taxi(
tpep_pickup_datetime date,
tpep_dropoff_datetime date,
PULocationID int,
DOLocationID int
)
stored as orc;
insert into table refine4_taxi
select
to_date(from_unixtime(UNIX_TIMESTAMP(SUBSTR(tpep_pickup_datetime,0,10), 'MM/dd/yyyy'))),
to_date(from_unixtime(UNIX_TIMESTAMP(SUBSTR(tpep_dropoff_datetime,0,10), 'MM/dd/yyyy'))),
PULocationID,
DOLocationID
from refine3_taxi;

-- agregate
create table refine1_nbtaxi(
taxi_ride_date date,
nb_pu int,
nb_do int
)
stored as orc;
insert into table refine1_nbtaxi
select
tpep_pickup_datetime,
count(tpep_pickup_datetime) as nb_pu,
count(tpep_dropoff_datetime) as nb_do
from refine4_taxi
group by tpep_pickup_datetime;

/*
// normalize [select (value-MIN) / MAX-MIN)]

// <script>
// max = select max(nb_pu) from refine1_nbtaxi | select max(nb_do) from refine1_nbtaxi
// max' = max

// min = select min(nb_pu) from refine1_nbtaxi | select min(nb_do) from refine1_nbtaxi
*/

create table refine2_nbtaxi(
taxi_ride_date date,
nb_pu_norm double,
nb_do_norm double
)
stored as orc;
insert into table refine2_nbtaxi
select
taxi_ride_date,
(nb_pu-1)/(3-1) as nb_pu_norm,
(nb_do-1)/(3-1) as nb_do_norm
from refine1_nbtaxi;

-- SELECT cast(date_format('2018-06-05 15:25:42.23','yyyy-MM-dd') as date);
Loading