Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # encoding: utf-8
- from urllib.request import urlopen
- from urllib.parse import urlparse, urlencode
- from http.client import HTTPException
- from datetime import datetime
- from sys import stdout, argv
- from os import getenv
- import logging
- import json
- API_KEY = getenv("TUMBLR_KEY", "8YUsKJvcJxo2MDwmWMDiXZGuMuIbeCwuQGP5ZHSEA4jBJPMnJT")
- PARAMS = urlencode({ "api_key": API_KEY })
- RFC3339_STRING = "%Y-%m-%dT%H:%M:%S.%fZ"
- def get_logger(name, level=logging.INFO):
- log_format = logging.Formatter('[%(asctime)s] (%(name)s) %(levelname)s: %(message)s')
- std_output = logging.StreamHandler(stdout)
- std_output.setFormatter(log_format)
- std_output.setLevel(level)
- logger = logging.getLogger(name)
- logger.setLevel(logging.DEBUG)
- logger.addHandler(std_output)
- return logger
- # Source file parsing
- def parse_list(filename):
- result = []
- with open(filename, 'r', encoding="UTF-8") as reader:
- for line in reader:
- clean_line = line.strip()
- if clean_line:
- result.append(urlparse(clean_line).netloc or clean_line)
- return frozenset(result)
- def parse_blog(blog_url):
- log = get_logger(blog_url)
- request = "https://api.tumblr.com/v2/blog/{uri}/info?{params}".format(
- params = PARAMS,
- uri = blog_url
- )
- try:
- response = urlopen(request)
- except (HTTPException, OSError) as err:
- log.error("Error fetching blog info: {err}".format(err=err))
- return None
- content_type = response.info().get_content_type()
- if content_type != "application/json":
- log.error("Unexpected Content-Type: {type}".format(type=content_type))
- return None
- try:
- data = json.load(response)
- except JSONDecodeError as err:
- log.error("Error parsing blog info: {err}".format(err=err))
- return None
- status = data.get("meta", {}).get("status", response.getcode())
- if status != 200:
- log.error("Error processing {url}: {data}".format(url=url, data=data))
- return None
- info = data.get("response", {}).get("blog", {})
- post_count = info.get("posts", 0)
- if post_count < 1:
- return None
- return ','.join([
- blog_url,
- str(post_count),
- datetime.utcfromtimestamp(int(info.get("updated", 0))).strftime(RFC3339_STRING)
- ])
- if __name__ == "__main__":
- # Parsing arguments
- try:
- source_file = argv[1]
- except IndexError:
- source_file = "tumblrs_raw.txt"
- try:
- dest_file = argv[2]
- except IndexError:
- dest_file = "tumblrs_alive.txt"
- # Checking blogs
- with open(dest_file, 'a', encoding="UTF-8") as out_file:
- print("url,post_count,last_updated", file=out_file)
- for url in sorted(parse_list(source_file)):
- if url:
- csv_line = parse_blog(url)
- if csv_line:
- print(blog_url, file=out_file)
Advertisement
Add Comment
Please, Sign In to add comment