Advertisement
Ladies_Man

#HADOOP Lab4 (PIG LATIN) COMPLETE

Nov 14th, 2015
278
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.67 KB | None | 0 0
  1. //hadoop lab4 (PIG LATIN)
  2. //задание:
  3. //Задача полностью аналогична задаче в 3 лабораторной работе.
  4. //Требуется связать наборы данных по коду аэропорта прибытия: DEST_AEROPORT_ID
  5. //Для каждого аэропорта требуется определить среднее, минимальное и максимальное время задержки для всех прибывающих рейсов.
  6.  
  7.  
  8. //запуск PIG LATIN:
  9. //mr-jobhistory-daemon.sh start historyserver
  10. //~/pig-0.15.0/bin/pig
  11.  
  12.  
  13.  
  14.  
  15. //generate airports data
  16. airports = LOAD 'L_AIRPORT_ID.csv' using PigStorage(',') AS (
  17. code:chararray,
  18. descr1:chararray,
  19. descr2:chararray);
  20.  
  21.  
  22. a_no_meta = FILTER airports BY code != 'Code';
  23.  
  24. //generate "airport_city" and "airport_name" from "description field"
  25. a_new = FOREACH a_no_meta GENERATE
  26. (INT)REPLACE(code, '\\"', '') as code,
  27. descr1 as a_city,
  28. descr2 as a_name;
  29.  
  30.  
  31.  
  32.  
  33.  
  34. //generate flights data
  35. flights = LOAD '664600583_T_ONTIME_sample.csv' using PigStorage(',') AS (
  36. year:int,
  37. quarter:int,
  38. month:int,
  39. day_of_month:int,
  40. day_of_week:int,
  41. fl_date:chararray,
  42. unique_carrier:chararray,
  43. airline_id:int,
  44. carrier:chararray,
  45. tail_num:chararray,
  46. fl_num:chararray,
  47. origin_airport_id:int,
  48. origin_airport_seq_id:int,
  49. origin_city_market_id:int,
  50. dest_airport_id:int,
  51. wheels_on:chararray,
  52. arr_time:chararray,
  53. arr_delay:float,
  54. arr_delay_new:float,
  55. cancelled:float,
  56. cancellation_code:chararray,
  57. air_time:float,
  58. distance:float);
  59.  
  60.  
  61. f_no_meta = FILTER flights BY (chararray)year != 'YEAR';
  62. f_not_cancelled = FILTER f_no_meta BY cancelled == 0;
  63. f_have_airp = FILTER f_not_cancelled BY dest_airport_id is not null;
  64. f_delayed = FILTER f_have_airp BY (arr_delay_new is not null) AND (arr_delay_new > 0);
  65.  
  66. f_new = FOREACH f_delayed GENERATE
  67. (INT)dest_airport_id,
  68. (FLOAT)arr_delay_new;
  69.  
  70.  
  71.  
  72.  
  73.  
  74.  
  75. joined = JOIN f_new by dest_airport_id, a_new by code;
  76. // joined:
  77. // | dest_airport_id | arr_delay_new | code | a_city | a_name |
  78. // | 123 | 13.37 | 123 | NY | JFK |
  79.  
  80. joined_brief = FOREACH joined GENERATE
  81. a_city,
  82. a_name,
  83. arr_delay_new as delay;
  84.  
  85. a_by_name = GROUP joined_brief BY (a_city, a_name);
  86.  
  87. final = FOREACH a_by_name GENERATE
  88. group as name,
  89. MIN(joined_brief.delay) as d_min,
  90. MAX(joined_brief.delay) as d_max,
  91. AVG(joined_brief.delay) as d_avg;
  92.  
  93. STORE final INTO 'pig_output' using PigStorage(',');
  94. dump final;
  95. quit;
  96.  
  97.  
  98.  
  99.  
  100. //Example:
  101. ("Cody, WY: Yellowstone Regional"),1.0,25.0,9.333333333333334
  102. ("Elko, NV: Elko Regional"),10.0,59.0,26.666666666666668
  103. ("Erie, PA: Erie International/Tom Ridge Field"),3.0,70.0,23.2
  104. ("Guam, TT: Guam International"),4.0,22.0,13.0
  105. ("Hays, KS: Hays Regional"),13.0,15.0,14.0
  106. ("Hilo, HI: Hilo International"),1.0,44.0,8.714285714285714
  107. ("Kona, HI: Kona International Airport at Keahole"),1.0,148.0,19.641025641025642
  108. ("Nome, AK: Nome Airport"),8.0,14.0,11.0
  109. ("Reno, NV: Reno/Tahoe International"),1.0,149.0,24.395833333333332
  110. ("Waco, TX: Waco Regional"),4.0,74.0,26.22222222222222
  111. ("Yuma, AZ: Yuma MCAS/Yuma International"),1.0,24.0,11.25
  112. ("Akron, OH: Akron-Canton Regional"),1.0,83.0,19.761904761904763
  113. ("Aspen, CO: Aspen Pitkin County Sardy Field"),5.0,136.0,37.74285714285714
  114.  
  115.  
  116.  
  117.  
  118.  
  119. //Extras:
  120. //For pair <origin_airport_id, dest_airport_id> find MAX value from arr_delay_new
  121.  
  122. f_new = FOREACH f_delayed GENERATE origin_airport_id, dest_airport_id, (FLOAT)arr_delay_new;
  123. f_by_bundle = GROUP f_new BY (origin_airport_id, dest_airport_id);
  124. final2 = FOREACH f_by_bundle GENERATE group as name, MAX(f_new.arr_delay_new);
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement