Advertisement
napiii

multiprocessing error

Jul 19th, 2018
180
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.15 KB | None | 0 0
  1. from __future__ import division
  2. from datetime import datetime
  3. from bs4 import BeautifulSoup
  4. from coala_utils.decorators import generate_ordering
  5. from functools import partial
  6. from multiprocessing.pool import Pool
  7. from textblob import TextBlob
  8. from pyTweetCleaner import TweetCleaner
  9.  
  10. import pandas as pd
  11. import string
  12. import logging
  13. import random
  14. import requests
  15. import datetime as dt
  16. import json
  17.  
  18. #tweets
  19. @generate_ordering('timestamp', 'id', 'text', 'user', 'replies', 'retweets', 'likes')
  20. class Tweet:
  21.     def __init__(self, user, fullname, id, url, timestamp, text, replies, retweets, likes, html):
  22.         self.user = user.strip('\@')
  23.         self.fullname = fullname
  24.         self.id = id
  25.         self.url = url
  26.         self.timestamp = timestamp
  27.         self.text = text
  28.         self.replies = replies
  29.         self.retweets = retweets
  30.         self.likes = likes
  31.         self.html = html
  32.  
  33.     @classmethod
  34.     def from_soup(cls, tweet):
  35.         return cls(
  36.             user=tweet.find('span', 'username').text or "",
  37.             fullname=tweet.find('strong', 'fullname').text or "",
  38.             id=tweet['data-item-id'] or "",
  39.             url = tweet.find('div', 'tweet')['data-permalink-path'] or "",
  40.             timestamp=datetime.utcfromtimestamp(
  41.                 int(tweet.find('span', '_timestamp')['data-time'])),
  42.             text=tweet.find('p', 'tweet-text').text or "",
  43.             replies = tweet.find(
  44.                 'span', 'ProfileTweet-action--reply u-hiddenVisually').find(
  45.                     'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
  46.             retweets = tweet.find(
  47.                 'span', 'ProfileTweet-action--retweet u-hiddenVisually').find(
  48.                     'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
  49.             likes = tweet.find(
  50.                 'span', 'ProfileTweet-action--favorite u-hiddenVisually').find(
  51.                     'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
  52.             html=str(tweet.find('p', 'tweet-text')) or "",
  53.         )
  54.  
  55.     @classmethod
  56.     def from_html(cls, html):
  57.         soup = BeautifulSoup(html, "lxml")
  58.         tweets = soup.find_all('li', 'js-stream-item')
  59.         if tweets:
  60.             for tweet in tweets:
  61.                 try:
  62.                     yield cls.from_soup(tweet)
  63.                 except AttributeError:
  64.                     pass  # Incomplete info? Discard!
  65.  
  66. #logging
  67. logger = logging.getLogger('twitterscraper')
  68.  
  69. formatter = logging.Formatter('%(levelname)s: %(message)s')
  70. handler = logging.StreamHandler()
  71. handler.setFormatter(formatter)
  72. logger.addHandler(handler)
  73.  
  74. level = logging.INFO
  75. logger.setLevel(level)
  76.  
  77. #query
  78. HEADERS_LIST = [
  79.     'Mozilla/5.0 (Windows; U; Windows NT 6.1; x64; fr; rv:1.9.2.13) Gecko/20101203 Firebird/3.6.13',
  80.     'Mozilla/5.0 (compatible, MSIE 11, Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko',
  81.     'Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201',
  82.     'Opera/9.80 (X11; Linux i686; Ubuntu/14.10) Presto/2.12.388 Version/12.16',
  83.     'Mozilla/5.0 (Windows NT 5.2; RW; rv:7.0a1) Gecko/20091211 SeaMonkey/9.23a1pre'
  84. ]
  85.  
  86. HEADER = {'User-Agent': random.choice(HEADERS_LIST)}
  87.  
  88. INIT_URL = 'https://twitter.com/search?f=tweets&vertical=default&q={q}&l={lang}'
  89. RELOAD_URL = 'https://twitter.com/i/search/timeline?f=tweets&vertical=' \
  90.              'default&include_available_features=1&include_entities=1&' \
  91.              'reset_error_state=false&src=typd&max_position={pos}&q={q}&l={lang}'
  92.  
  93.  
  94. def linspace(start, stop, n):
  95.     if n == 1:
  96.         yield stop
  97.         return
  98.     h = (stop - start) / (n - 1)
  99.     for i in range(n):
  100.         yield start + h * i
  101.  
  102.  
  103. def query_single_page(url, html_response=True, retry=10):
  104.     """
  105.    Returns tweets from the given URL.
  106.    :param url: The URL to get the tweets from
  107.    :param html_response: False, if the HTML is embedded in a JSON
  108.    :param retry: Number of retries if something goes wrong.
  109.    :return: The list of tweets, the pos argument for getting the next page.
  110.    """
  111.  
  112.     try:
  113.         response = requests.get(url, headers=HEADER)
  114.         if html_response:
  115.             html = response.text or ''
  116.         else:
  117.             html = ''
  118.             try:
  119.                 json_resp = json.loads(response.text)
  120.                 html = json_resp['items_html'] or ''
  121.             except ValueError as e:
  122.                 logger.exception('Failed to parse JSON "{}" while requesting "{}"'.format(e, url))
  123.  
  124.         tweets = list(Tweet.from_html(html))
  125.  
  126.         if not tweets:
  127.             return [], None
  128.  
  129.         if not html_response:
  130.             return tweets, json_resp['min_position']
  131.  
  132.         return tweets, 'TWEET-{}-{}'.format(tweets[-1].id, tweets[0].id)
  133.     except requests.exceptions.HTTPError as e:
  134.         logger.exception('HTTPError {} while requesting "{}"'.format(
  135.             e, url))
  136.     except requests.exceptions.ConnectionError as e:
  137.         logger.exception('ConnectionError {} while requesting "{}"'.format(
  138.             e, url))
  139.     except requests.exceptions.Timeout as e:
  140.         logger.exception('TimeOut {} while requesting "{}"'.format(
  141.             e, url))
  142.     except json.decoder.JSONDecodeError as e:
  143.         logger.exception('Failed to parse JSON "{}" while requesting "{}".'.format(
  144.             e, url))
  145.  
  146.     if retry > 0:
  147.         logger.info('Retrying... (Attempts left: {})'.format(retry))
  148.         return query_single_page(url, html_response, retry-1)
  149.  
  150.     logger.error('Giving up.')
  151.     return [], None
  152.  
  153.  
  154. def query_tweets_once_generator(query, limit=None, lang=''):
  155.     """
  156.    Queries twitter for all the tweets you want! It will load all pages it gets
  157.    from twitter. However, twitter might out of a sudden stop serving new pages,
  158.    in that case, use the `query_tweets` method.
  159.    Note that this function catches the KeyboardInterrupt so it can return
  160.    tweets on incomplete queries if the user decides to abort.
  161.    :param query: Any advanced query you want to do! Compile it at
  162.                  https://twitter.com/search-advanced and just copy the query!
  163.    :param limit: Scraping will be stopped when at least ``limit`` number of
  164.                  items are fetched.
  165.    :param num_tweets: Number of tweets fetched outside this function.
  166.    :return:      A list of twitterscraper.Tweet objects. You will get at least
  167.                  ``limit`` number of items.
  168.    """
  169.     logger.info('Querying {}'.format(query))
  170.     query = query.replace(' ', '%20').replace('#', '%23').replace(':', '%3A')
  171.     pos = None
  172.     num_tweets = 0
  173.     try:
  174.         while True:
  175.             new_tweets, pos = query_single_page(
  176.                 INIT_URL.format(q=query, lang=lang) if pos is None
  177.                 else RELOAD_URL.format(q=query, pos=pos, lang=lang),
  178.                 pos is None
  179.             )
  180.             if len(new_tweets) == 0:
  181.                 logger.info('Got {} tweets for {}.'.format(
  182.                     num_tweets, query))
  183.                 return
  184.  
  185.             for t in new_tweets:
  186.                 yield t, pos
  187.  
  188.             num_tweets += len(new_tweets)
  189.  
  190.             if limit and num_tweets >= limit:
  191.                 logger.info('Got {} tweets for {}.'.format(
  192.                     num_tweets, query))
  193.                 return
  194.  
  195.     except KeyboardInterrupt:
  196.         logger.info('Program interrupted by user. Returning tweets gathered '
  197.                      'so far...')
  198.     except BaseException:
  199.         logger.exception('An unknown error occurred! Returning tweets '
  200.                           'gathered so far.')
  201.     logger.info('Got {} tweets for {}.'.format(
  202.         num_tweets, query))
  203.  
  204.  
  205. def query_tweets_once(*args, **kwargs):
  206.     res = list(query_tweets_once_generator(*args, **kwargs))
  207.     if res:
  208.         tweets, positions = zip(*res)
  209.         return tweets
  210.     else:
  211.         return []
  212.  
  213.  
  214. def query_tweets(query, limit=None, begindate=dt.date(2006,3,21), enddate=dt.date.today(), poolsize=20, lang=''):
  215.     no_days = (enddate - begindate).days
  216.     if poolsize > no_days:
  217.         # Since we are assigning each pool a range of dates to query,
  218.         # the number of pools should not exceed the number of dates.
  219.         poolsize = no_days
  220.     dateranges = [begindate + dt.timedelta(days=elem) for elem in linspace(0, no_days, poolsize+1)]
  221.  
  222.     if limit:
  223.         limit_per_pool = (limit // poolsize)+1
  224.     else:
  225.         limit_per_pool = None
  226.  
  227.     queries = ['{} since:{} until:{}'.format(query, since, until)
  228.                for since, until in zip(dateranges[:-1], dateranges[1:])]
  229.  
  230.     all_tweets = []
  231.     try:
  232.         pool = Pool(poolsize)
  233.         logger.info('queries: {}'.format(queries))
  234.         try:
  235.             for new_tweets in pool.imap_unordered(partial(query_tweets_once, limit=limit_per_pool, lang=lang), queries):
  236.                 all_tweets.extend(new_tweets)
  237.                 logger.info('Got {} tweets ({} new).'.format(
  238.                     len(all_tweets), len(new_tweets)))
  239.         except KeyboardInterrupt:
  240.             logger.info('Program interrupted by user. Returning all tweets '
  241.                          'gathered so far.')
  242.     finally:
  243.         pool.close()
  244.         pool.join()
  245.  
  246.     return all_tweets
  247.  
  248. query_tweets('ford', 5)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement