Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ACCESS_LOG_REGEX = re.compile(r'(?P<ip>[\d\.:]+) - - \[(?P<time>\S+ [^"]+)\] "(\w+) (?P<page>[^"]+) (HTTP/[\d\.]+)" (?P<code>\d+) (?P<size>\d+) "(?P<ref>[^"]+)" "(?P<browser>[^"]+)"')
- ID_REGEX = re.compile(r'/(?P<id>id[\d]+)')
- def parse_row(row_str):
- row_match = ACCESS_LOG_REGEX.match(row_str)
- if row_match is None:
- return None
- row = row_match.groupdict()
- if row['code'] != '200':
- return None
- try:
- row['country'] = ip2country(ip2int(row['ip']))
- except Exception:
- return None
- if row['country'] == '-':
- return None
- row['time'] = row['time'][:-6]
- return row
- def is_not_none(obj):
- return obj is not None
- def get_id_hour_ip(row):
- id_match = ID_REGEX.match(row['page'])
- if id_match is None:
- return None
- try:
- hour = datetime.strptime(row['time'], TIME_FORMAT).hour
- except Exception:
- return None
- else:
- return (id_match.groupdict()['id'], hour), row['ip']
- def incrementer(count, _):
- return count + 1
- def reformat(row):
- # (id, hour), count --> id, (hour, count)
- return row[0][0], (row[0][1], row[1])
- def counts_by_hour_mapper(hours_and_counts):
- d = dict(hours_and_counts)
- return [d.get(x, 0) for x in range(24)]
- def get_counts_by_hour(rdd):
- return (
- rdd
- .foldByKey(0, incrementer)
- .map(reformat)
- .groupByKey().mapValues(counts_by_hour_mapper)
- )
- def calculate_profile_stats(sc, hdfs_path):
- id_hour_ip = (
- sc.textFile(hdfs_path)
- .map(utils.parse_row).filter(is_not_none)
- .map(get_id_hour_ip).filter(is_not_none)
- )
- return get_counts_by_hour(id_hour_ip), get_counts_by_hour(id_hour_ip.distinct())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement