Advertisement
napiii

no multiprocessing error

Jul 19th, 2018
110
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 9.05 KB | None | 0 0
  1. from __future__ import division
  2. from datetime import datetime
  3. from bs4 import BeautifulSoup
  4. from coala_utils.decorators import generate_ordering
  5. from functools import partial
  6. from multiprocessing.pool import Pool
  7.  
  8. import logging
  9. import random
  10. import requests
  11. import datetime as dt
  12. import json
  13.  
  14. #tweets
  15. @generate_ordering('timestamp', 'id', 'text', 'user', 'replies', 'retweets', 'likes')
  16. class Tweet:
  17. def __init__(self, user, fullname, id, url, timestamp, text, replies, retweets, likes, html):
  18. self.user = user.strip('\@')
  19. self.fullname = fullname
  20. self.id = id
  21. self.url = url
  22. self.timestamp = timestamp
  23. self.text = text
  24. self.replies = replies
  25. self.retweets = retweets
  26. self.likes = likes
  27. self.html = html
  28.  
  29. @classmethod
  30. def from_soup(cls, tweet):
  31. return cls(
  32. user=tweet.find('span', 'username').text or "",
  33. fullname=tweet.find('strong', 'fullname').text or "",
  34. id=tweet['data-item-id'] or "",
  35. url = tweet.find('div', 'tweet')['data-permalink-path'] or "",
  36. timestamp=datetime.utcfromtimestamp(
  37. int(tweet.find('span', '_timestamp')['data-time'])),
  38. text=tweet.find('p', 'tweet-text').text or "",
  39. replies = tweet.find(
  40. 'span', 'ProfileTweet-action--reply u-hiddenVisually').find(
  41. 'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
  42. retweets = tweet.find(
  43. 'span', 'ProfileTweet-action--retweet u-hiddenVisually').find(
  44. 'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
  45. likes = tweet.find(
  46. 'span', 'ProfileTweet-action--favorite u-hiddenVisually').find(
  47. 'span', 'ProfileTweet-actionCount')['data-tweet-stat-count'] or '0',
  48. html=str(tweet.find('p', 'tweet-text')) or "",
  49. )
  50.  
  51. @classmethod
  52. def from_html(cls, html):
  53. soup = BeautifulSoup(html, "lxml")
  54. tweets = soup.find_all('li', 'js-stream-item')
  55. if tweets:
  56. for tweet in tweets:
  57. try:
  58. yield cls.from_soup(tweet)
  59. except AttributeError:
  60. pass # Incomplete info? Discard!
  61.  
  62. #logging
  63. logger = logging.getLogger('twitterscraper')
  64.  
  65. formatter = logging.Formatter('%(levelname)s: %(message)s')
  66. handler = logging.StreamHandler()
  67. handler.setFormatter(formatter)
  68. logger.addHandler(handler)
  69.  
  70. level = logging.INFO
  71. logger.setLevel(level)
  72.  
  73. #query
  74. HEADERS_LIST = [
  75. 'Mozilla/5.0 (Windows; U; Windows NT 6.1; x64; fr; rv:1.9.2.13) Gecko/20101203 Firebird/3.6.13',
  76. 'Mozilla/5.0 (compatible, MSIE 11, Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko',
  77. 'Mozilla/5.0 (Windows; U; Windows NT 6.1; rv:2.2) Gecko/20110201',
  78. 'Opera/9.80 (X11; Linux i686; Ubuntu/14.10) Presto/2.12.388 Version/12.16',
  79. 'Mozilla/5.0 (Windows NT 5.2; RW; rv:7.0a1) Gecko/20091211 SeaMonkey/9.23a1pre'
  80. ]
  81.  
  82. HEADER = {'User-Agent': random.choice(HEADERS_LIST)}
  83.  
  84. INIT_URL = 'https://twitter.com/search?f=tweets&vertical=default&q={q}&l={lang}'
  85. RELOAD_URL = 'https://twitter.com/i/search/timeline?f=tweets&vertical=' \
  86. 'default&include_available_features=1&include_entities=1&' \
  87. 'reset_error_state=false&src=typd&max_position={pos}&q={q}&l={lang}'
  88.  
  89.  
  90. def linspace(start, stop, n):
  91. if n == 1:
  92. yield stop
  93. return
  94. h = (stop - start) / (n - 1)
  95. for i in range(n):
  96. yield start + h * i
  97.  
  98.  
  99. def query_single_page(url, html_response=True, retry=10):
  100. """
  101. Returns tweets from the given URL.
  102. :param url: The URL to get the tweets from
  103. :param html_response: False, if the HTML is embedded in a JSON
  104. :param retry: Number of retries if something goes wrong.
  105. :return: The list of tweets, the pos argument for getting the next page.
  106. """
  107.  
  108. try:
  109. response = requests.get(url, headers=HEADER)
  110. if html_response:
  111. html = response.text or ''
  112. else:
  113. html = ''
  114. try:
  115. json_resp = json.loads(response.text)
  116. html = json_resp['items_html'] or ''
  117. except ValueError as e:
  118. logger.exception('Failed to parse JSON "{}" while requesting "{}"'.format(e, url))
  119.  
  120. tweets = list(Tweet.from_html(html))
  121.  
  122. if not tweets:
  123. return [], None
  124.  
  125. if not html_response:
  126. return tweets, json_resp['min_position']
  127.  
  128. return tweets, 'TWEET-{}-{}'.format(tweets[-1].id, tweets[0].id)
  129. except requests.exceptions.HTTPError as e:
  130. logger.exception('HTTPError {} while requesting "{}"'.format(
  131. e, url))
  132. except requests.exceptions.ConnectionError as e:
  133. logger.exception('ConnectionError {} while requesting "{}"'.format(
  134. e, url))
  135. except requests.exceptions.Timeout as e:
  136. logger.exception('TimeOut {} while requesting "{}"'.format(
  137. e, url))
  138. except json.decoder.JSONDecodeError as e:
  139. logger.exception('Failed to parse JSON "{}" while requesting "{}".'.format(
  140. e, url))
  141.  
  142. if retry > 0:
  143. logger.info('Retrying... (Attempts left: {})'.format(retry))
  144. return query_single_page(url, html_response, retry-1)
  145.  
  146. logger.error('Giving up.')
  147. return [], None
  148.  
  149.  
  150. def query_tweets_once_generator(query, limit=None, lang=''):
  151. """
  152. Queries twitter for all the tweets you want! It will load all pages it gets
  153. from twitter. However, twitter might out of a sudden stop serving new pages,
  154. in that case, use the `query_tweets` method.
  155. Note that this function catches the KeyboardInterrupt so it can return
  156. tweets on incomplete queries if the user decides to abort.
  157. :param query: Any advanced query you want to do! Compile it at
  158. https://twitter.com/search-advanced and just copy the query!
  159. :param limit: Scraping will be stopped when at least ``limit`` number of
  160. items are fetched.
  161. :param num_tweets: Number of tweets fetched outside this function.
  162. :return: A list of twitterscraper.Tweet objects. You will get at least
  163. ``limit`` number of items.
  164. """
  165. logger.info('Querying {}'.format(query))
  166. query = query.replace(' ', '%20').replace('#', '%23').replace(':', '%3A')
  167. pos = None
  168. num_tweets = 0
  169. try:
  170. while True:
  171. new_tweets, pos = query_single_page(
  172. INIT_URL.format(q=query, lang=lang) if pos is None
  173. else RELOAD_URL.format(q=query, pos=pos, lang=lang),
  174. pos is None
  175. )
  176. if len(new_tweets) == 0:
  177. logger.info('Got {} tweets for {}.'.format(
  178. num_tweets, query))
  179. return
  180.  
  181. for t in new_tweets:
  182. yield t, pos
  183.  
  184. num_tweets += len(new_tweets)
  185.  
  186. if limit and num_tweets >= limit:
  187. logger.info('Got {} tweets for {}.'.format(
  188. num_tweets, query))
  189. return
  190.  
  191. except KeyboardInterrupt:
  192. logger.info('Program interrupted by user. Returning tweets gathered '
  193. 'so far...')
  194. except BaseException:
  195. logger.exception('An unknown error occurred! Returning tweets '
  196. 'gathered so far.')
  197. logger.info('Got {} tweets for {}.'.format(
  198. num_tweets, query))
  199.  
  200.  
  201. def query_tweets_once(*args, **kwargs):
  202. res = list(query_tweets_once_generator(*args, **kwargs))
  203. if res:
  204. tweets, positions = zip(*res)
  205. return tweets
  206. else:
  207. return []
  208.  
  209.  
  210. def query_tweets(query, limit=None, begindate=dt.date(2006,3,21), enddate=dt.date.today(), poolsize=20, lang=''):
  211. no_days = (enddate - begindate).days
  212. if poolsize > no_days:
  213. # Since we are assigning each pool a range of dates to query,
  214. # the number of pools should not exceed the number of dates.
  215. poolsize = no_days
  216. dateranges = [begindate + dt.timedelta(days=elem) for elem in linspace(0, no_days, poolsize+1)]
  217.  
  218. if limit:
  219. limit_per_pool = (limit // poolsize)+1
  220. else:
  221. limit_per_pool = None
  222.  
  223. queries = ['{} since:{} until:{}'.format(query, since, until)
  224. for since, until in zip(dateranges[:-1], dateranges[1:])]
  225.  
  226. all_tweets = []
  227. try:
  228. pool = Pool(poolsize)
  229. logger.info('queries: {}'.format(queries))
  230. try:
  231. for new_tweets in pool.imap_unordered(partial(query_tweets_once, limit=limit_per_pool, lang=lang), queries):
  232. all_tweets.extend(new_tweets)
  233. logger.info('Got {} tweets ({} new).'.format(
  234. len(all_tweets), len(new_tweets)))
  235. except KeyboardInterrupt:
  236. logger.info('Program interrupted by user. Returning all tweets '
  237. 'gathered so far.')
  238. finally:
  239. pool.close()
  240. pool.join()
  241.  
  242. return all_tweets
  243.  
  244. query_tweets('ford', 5)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement