Advertisement
Guest User

Untitled

a guest
Dec 18th, 2017
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.76 KB | None | 0 0
  1. ACCESS_LOG_REGEX = re.compile(r'(?P<ip>[\d\.:]+) - - \[(?P<time>\S+ [^"]+)\] "(\w+) (?P<page>[^"]+) (HTTP/[\d\.]+)" (?P<code>\d+) (?P<size>\d+) "(?P<ref>[^"]+)" "(?P<browser>[^"]+)"')
  2. ID_REGEX = re.compile(r'/(?P<id>id[\d]+)')
  3.  
  4.  
  5. def parse_row(row_str):
  6.     row_match = ACCESS_LOG_REGEX.match(row_str)
  7.     if row_match is None:
  8.         return None
  9.     row = row_match.groupdict()
  10.  
  11.     if row['code'] != '200':
  12.         return None
  13.  
  14.     try:
  15.         row['country'] = ip2country(ip2int(row['ip']))
  16.     except Exception:
  17.         return None
  18.     if row['country'] == '-':
  19.         return None
  20.  
  21.     row['time'] = row['time'][:-6]
  22.  
  23.     return row
  24.  
  25.  
  26. def is_not_none(obj):
  27.     return obj is not None
  28.  
  29.  
  30. def get_id_hour_ip(row):
  31.     id_match = ID_REGEX.match(row['page'])
  32.     if id_match is None:
  33.         return None
  34.     try:
  35.         hour = datetime.strptime(row['time'], TIME_FORMAT).hour
  36.     except Exception:
  37.         return None
  38.     else:
  39.         return (id_match.groupdict()['id'], hour), row['ip']
  40.  
  41.  
  42. def incrementer(count, _):
  43.     return count + 1
  44.  
  45.  
  46. def reformat(row):
  47.     # (id, hour), count --> id, (hour, count)
  48.     return row[0][0], (row[0][1], row[1])
  49.  
  50.  
  51. def counts_by_hour_mapper(hours_and_counts):
  52.     d = dict(hours_and_counts)
  53.     return [d.get(x, 0) for x in range(24)]
  54.  
  55.  
  56. def get_counts_by_hour(rdd):
  57.     return (
  58.         rdd
  59.         .foldByKey(0, incrementer)
  60.         .map(reformat)
  61.         .groupByKey().mapValues(counts_by_hour_mapper)
  62.     )
  63.  
  64.  
  65. def calculate_profile_stats(sc, hdfs_path):
  66.     id_hour_ip = (
  67.         sc.textFile(hdfs_path)
  68.         .map(utils.parse_row).filter(is_not_none)
  69.         .map(get_id_hour_ip).filter(is_not_none)
  70.     )
  71.     return get_counts_by_hour(id_hour_ip), get_counts_by_hour(id_hour_ip.distinct())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement