Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //hadoop lab4 (PIG LATIN)
- //задание:
- //Задача полностью аналогична задаче в 3 лабораторной работе.
- //Требуется связать наборы данных по коду аэропорта прибытия: DEST_AEROPORT_ID
- //Для каждого аэропорта требуется определить среднее, минимальное и максимальное время задержки для всех прибывающих рейсов.
- //запуск PIG LATIN:
- //mr-jobhistory-daemon.sh start historyserver
- //~/pig-0.15.0/bin/pig
- //generate airports data
- airports = LOAD 'L_AIRPORT_ID.csv' using PigStorage(',') AS (
- code:chararray,
- descr1:chararray,
- descr2:chararray);
- a_no_meta = FILTER airports BY code != 'Code';
- //generate "airport_city" and "airport_name" from "description field"
- a_new = FOREACH a_no_meta GENERATE
- (INT)REPLACE(code, '\\"', '') as code,
- descr1 as a_city,
- descr2 as a_name;
- //generate flights data
- flights = LOAD '664600583_T_ONTIME_sample.csv' using PigStorage(',') AS (
- year:int,
- quarter:int,
- month:int,
- day_of_month:int,
- day_of_week:int,
- fl_date:chararray,
- unique_carrier:chararray,
- airline_id:int,
- carrier:chararray,
- tail_num:chararray,
- fl_num:chararray,
- origin_airport_id:int,
- origin_airport_seq_id:int,
- origin_city_market_id:int,
- dest_airport_id:int,
- wheels_on:chararray,
- arr_time:chararray,
- arr_delay:float,
- arr_delay_new:float,
- cancelled:float,
- cancellation_code:chararray,
- air_time:float,
- distance:float);
- f_no_meta = FILTER flights BY (chararray)year != 'YEAR';
- f_not_cancelled = FILTER f_no_meta BY cancelled == 0;
- f_have_airp = FILTER f_not_cancelled BY dest_airport_id is not null;
- f_delayed = FILTER f_have_airp BY (arr_delay_new is not null) AND (arr_delay_new > 0);
- f_new = FOREACH f_delayed GENERATE
- (INT)dest_airport_id,
- (FLOAT)arr_delay_new;
- joined = JOIN f_new by dest_airport_id, a_new by code;
- // joined:
- // | dest_airport_id | arr_delay_new | code | a_city | a_name |
- // | 123 | 13.37 | 123 | NY | JFK |
- joined_brief = FOREACH joined GENERATE
- a_city,
- a_name,
- arr_delay_new as delay;
- a_by_name = GROUP joined_brief BY (a_city, a_name);
- final = FOREACH a_by_name GENERATE
- group as name,
- MIN(joined_brief.delay) as d_min,
- MAX(joined_brief.delay) as d_max,
- AVG(joined_brief.delay) as d_avg;
- STORE final INTO 'pig_output' using PigStorage(',');
- dump final;
- quit;
- //Example:
- ("Cody, WY: Yellowstone Regional"),1.0,25.0,9.333333333333334
- ("Elko, NV: Elko Regional"),10.0,59.0,26.666666666666668
- ("Erie, PA: Erie International/Tom Ridge Field"),3.0,70.0,23.2
- ("Guam, TT: Guam International"),4.0,22.0,13.0
- ("Hays, KS: Hays Regional"),13.0,15.0,14.0
- ("Hilo, HI: Hilo International"),1.0,44.0,8.714285714285714
- ("Kona, HI: Kona International Airport at Keahole"),1.0,148.0,19.641025641025642
- ("Nome, AK: Nome Airport"),8.0,14.0,11.0
- ("Reno, NV: Reno/Tahoe International"),1.0,149.0,24.395833333333332
- ("Waco, TX: Waco Regional"),4.0,74.0,26.22222222222222
- ("Yuma, AZ: Yuma MCAS/Yuma International"),1.0,24.0,11.25
- ("Akron, OH: Akron-Canton Regional"),1.0,83.0,19.761904761904763
- ("Aspen, CO: Aspen Pitkin County Sardy Field"),5.0,136.0,37.74285714285714
- //Extras:
- //For pair <origin_airport_id, dest_airport_id> find MAX value from arr_delay_new
- f_new = FOREACH f_delayed GENERATE origin_airport_id, dest_airport_id, (FLOAT)arr_delay_new;
- f_by_bundle = GROUP f_new BY (origin_airport_id, dest_airport_id);
- final2 = FOREACH f_by_bundle GENERATE group as name, MAX(f_new.arr_delay_new);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement