Advertisement
Guest User

Untitled

a guest
Mar 8th, 2013
251
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.81 KB | None | 0 0
  1. #!/usr/bin/python
  2. # -*- coding: utf-8 -*-
  3. #
  4. # tweetstream to redis motherlode
  5. #
  6. import tweetstream_gzip
  7. import logging
  8. from tornado import escape
  9. import urllib
  10. import cjson, zlib, sys, time, os
  11. import struct
  12. from time import sleep
  13.  
  14. from tornado import ioloop
  15. from tornado import iostream
  16. import socket, ssl, sys, re
  17. from urlparse import urlparse
  18. from bs4 import BeautifulSoup
  19. import opengraph
  20. from lxml.html import document_fromstring
  21. from lxml.html import fragment_fromstring
  22. from lxml.html import fromstring
  23.  
  24. from lxml import etree
  25. import lxml
  26.  
  27. # redis connection to motherlode
  28. import redis
  29. r = redis.StrictRedis(host='REDACTED', port=9999, db=0, password='REDACTED')
  30. ps = r.pubsub()
  31.  
  32.  
  33.  
  34.  
  35. # global debug vars
  36. global n, mm, f, l, intime, sockcount, pcount
  37. pcount = 0
  38. sockcount = 0
  39. n = 0
  40. mm = 10000
  41. intime = time.time()
  42. ##print time.time()
  43.  
  44.  
  45.  
  46.  
  47. ######################
  48. #  OPEN GRAPH PARSER #
  49. ######################
  50.  
  51. size = 5000
  52.  
  53.  
  54.  
  55.  
  56.  
  57. class FastScraper(object):
  58.     """ Scrape and find Open Graph metadata in urls """
  59.     def __init__(self):
  60.         self.io = ioloop.IOLoop.instance()
  61.         self.redirect = False
  62.         self.results = []
  63.         self.redirect_url = ''
  64.  
  65.  
  66.     def finish_up(self):
  67.         """ Send json (self.rj) to motherlode """
  68.        
  69.         try:
  70.             message = cjson.encode(self.rj)
  71.             zm = zlib.compress(message) # saving bandwidth
  72.             r.publish('tweet', zm)      # send to motherlode
  73.        
  74.             self.rj = ''
  75.            
  76.         except Exception:
  77.             print 'finish_up error: ', sys.exc_info()[1:3]
  78.             pass
  79.  
  80.  
  81.  
  82.     def run(self, rjson):
  83.         """ Run scraper """
  84.         try:   
  85.             # set json
  86.             self.rj = {}
  87.             self.rj = rjson
  88.    
  89.             # if no url, finish early
  90.             if not self.rj['urls']:
  91.                 self.finish_up()
  92.                 return
  93.    
  94.             # set url and scrape
  95.             self.longurl = self.rj['urls'][0]['expanded_url'].replace('\/', '/')
  96.             self.scrape()
  97.         except Exception:
  98.             print 'run error: ', sys.exc_info()[1:3]
  99.             pass
  100.  
  101.    
  102.  
  103.  
  104.    
  105.     def scrape(self):
  106.    
  107.         # make connection
  108.         try:
  109.             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  110.             s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  111.             s.settimeout(2) # 2 seconds timeout
  112.            
  113.             # streamloop.add_callback(self.connect, s)
  114.             l_onoff = 1
  115.             l_linger = 0
  116.             s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger))
  117.             streamloop.add_callback(self.connect, s)    # exp. tried putting this after linger settings, no apparent effect
  118.            
  119.         except Exception:
  120.             print 'socket error', sys.exc_info()[1:3]
  121.             pass
  122.    
  123.    
  124.     def prepare_url(self):
  125.         try:
  126.             # add http if missing in url
  127.             if not 'http' in self.longurl:
  128.                 url = 'http://' + self.longurl
  129.            
  130.             # parse url
  131.             url = urlparse(self.longurl)
  132.             self.url = url.netloc   # eg. www.vg.no
  133.             self.path = url.path    # eg. /artikkel.php?artid=10119923
  134.    
  135.             # catch empty url
  136.             if self.url == '' or self.url == None:
  137.                 return False
  138.                
  139.             # determine path
  140.             self.dieurl = self.longurl.split(self.url)
  141.             self.dieurl.reverse()
  142.             self.path = self.dieurl[0]
  143.             if self.path == '':
  144.                 self.path = '/'
  145.            
  146.             # set get size in bytes
  147.             self.size = self.setsize(url.netloc)
  148.            
  149.             # determine SSL
  150.             if url.scheme == 'https':
  151.                 self.SSL = True
  152.             else:
  153.                 self.SSL = False
  154.        
  155.             return True
  156.         except Exception:
  157.             print 'prepare_url error: ', sys.exc_info()[1:3]
  158.             print 'url: ', url
  159.             return False
  160.            
  161.    
  162.  
  163.    
  164.    
  165.     def connect(self, s):
  166.         # connect      
  167.        
  168.         try:
  169.             prep = self.prepare_url()
  170.    
  171.             if not prep:
  172.                 self.stream.close()
  173.                 return
  174.            
  175.             if self.SSL:
  176.                 ss = ssl.wrap_socket(s, do_handshake_on_connect=False)
  177.                 self.stream = iostream.SSLIOStream(ss)
  178.                 self.stream.connect((self.url, 443), self.send_ssl_request)
  179.             else:
  180.                 self.stream = iostream.IOStream(s)
  181.                 self.stream.connect((self.url, 80), self.send_request)
  182.            
  183.            
  184.             global sockcount
  185.             sockcount += 1
  186. #           print 'currently open sockets: ', sockcount
  187.         except Exception:
  188.             print 'connect error:'
  189.             print sys.exc_info()[1:3]
  190.             print 'url: ', self.url
  191.             print len(self.url)
  192.             print self.path
  193.             self.stream.close() # maybe not needed
  194.             pass
  195.    
  196.     def reconnect(self):
  197.         print 'reconnect!reconnect!reconnect!reconnect!'
  198.         pass
  199.    
  200.    
  201.    
  202.    
  203.     def send_request(self):
  204.        
  205.         """ Send non-SSL request """
  206.         self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url))
  207.         self.stream.read_until("\r\n\r\n", self.on_headers) # read headers
  208.        
  209.        
  210.  
  211.     def send_ssl_request(self):
  212.         """ Send SSL request """
  213.         self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url))
  214.         self.stream.read_until("\r\n\r\n", self.on_headers) # read headers
  215.        
  216.  
  217.  
  218.     def setsize(self, neturl):
  219.         """ Custom scrape sizes in bytes """
  220.         if 'youtu' in neturl:
  221.             return 5000
  222.         if 'aje.me' in neturl:
  223.             return 10000
  224.         if 'aljazeera' in neturl:
  225.             return 10000
  226.         else:
  227.             # default size
  228.             return size
  229.  
  230.    
  231.  
  232.  
  233.    
  234.            
  235.     def on_headers(self, data):
  236.         """ Callback for headers """
  237.         self.redirect = False
  238.         global sockcount
  239.         try:
  240.             headers = {}
  241.             for line in data.split("\r\n"):
  242.                 parts = line.split(": ")
  243.                 if len(parts) == 2:
  244.                     headers[parts[0].strip()] = parts[1].strip()
  245.                
  246.                 if 'HTTP' in line:
  247.                     code = line.split(' ')
  248.                     if code[1] == '301' or code[1] == '302' or code[1] == '303':
  249.                         # set redirect
  250.                         self.redirect = True
  251.    
  252.             if self.redirect:
  253.                 if 'Location' in headers:
  254.                     self.longurl = headers['Location']
  255.                 if 'location' in headers:
  256.                     self.longurl = headers['location']
  257.                
  258.                 self.stream.close()                 # close            
  259.                 self.io.add_callback(self.scrape)   # restart
  260.  
  261.                 sockcount -= 1
  262.                 return
  263.                
  264.            
  265.            
  266.             if 'Content-Length' in headers:
  267.                 if size > int(headers['Content-Length']):
  268.                     # make sure we're not scraping more than length of document
  269.                     self.size = int(headers['Content-Length'])
  270.  
  271.             try:
  272.                 # go ahead and scrape the body
  273.                 self.stream.read_bytes(self.size, self.on_body)
  274.             except Exception:
  275.                 print 'on headers ==> body error:'
  276.                 print sys.exc_info()[1:3]
  277.                 print headers
  278.                 print data
  279.                 print self.size
  280.                 print self.url
  281.                 print self.path
  282.                 sockcount -= 1
  283.                 self.stream.close()
  284.                 pass
  285.  
  286.         except Exception:
  287.             print 'on headers error:'
  288.             print sys.exc_info()[1:3]
  289.             print headers
  290.             print data
  291.             sockcount -= 1
  292.             self.stream.close()
  293.             pass
  294.  
  295.  
  296.  
  297.  
  298.  
  299.            
  300.                        
  301.    
  302.     def on_body(self, data):
  303.         """ Callback after scraping body """
  304.        
  305.         # parse the og:tags
  306.         self.parse_og(data)    
  307.        
  308.         # finish up
  309.         self.finish_up()
  310.  
  311.         # clean up
  312.         self.stream.close()
  313.         global sockcount
  314.         sockcount -= 1
  315.  
  316.  
  317.  
  318.    
  319.    
  320.    
  321.    
  322.     def parse_og(self, data):
  323.         """ lxml parsing to the bone! """
  324.        
  325. #       return  # debug, still all sockets but no lxml  // runs fine, aka it's lxml, not amazon aws blocking socket use for example...
  326.        
  327.         try:
  328.             tree = etree.HTML( data )
  329.             m = tree.xpath("//meta[@property]")
  330.            
  331. #           for i in m:
  332. # #                 print (i.attrib['property'], i.attrib['content'])      
  333. #               y = i.attrib['property']
  334. #               x = i.attrib['content']
  335. #               self.rj[y] = x
  336.        
  337.        
  338.             tree = ''
  339.             m = ''
  340.             i = ''
  341.             y = ''
  342.             x = ''
  343.            
  344.             del tree
  345.             del m
  346.             del i
  347.             del y
  348.             del x
  349.  
  350.            
  351.        
  352.         except Exception:
  353.             print 'lxml error: ', sys.exc_info()[1:3]
  354.             print len(data)
  355.             data = ''
  356.             tree = ''
  357.             m = ''
  358.             i = ''
  359.             x = ''
  360.             y = ''
  361.             pass
  362.    
  363.    
  364.  
  365.  
  366.  
  367.  
  368.  
  369.  
  370.  
  371. ####################
  372. #  RSJON PARSER    #
  373. ####################
  374.  
  375.  
  376. # todo: convert facebook, instagram, bambuser, etc. to rawger json
  377.  
  378.  
  379. def t2r(tdoc):
  380.     """ Convert Twitter JSON to Rawger JSON. """
  381.    
  382.     try:
  383.        
  384.  
  385.         # if rate limit
  386.  
  387.         rjson = {}
  388.        
  389.         # general
  390.         rjson['source'] = 'twitter'
  391.         rjson['device'] = tdoc['source']    # todo
  392.    
  393.         # user
  394.         rjson['userid'] = tdoc['user']['id']
  395.         rjson['fullname'] = tdoc['user']['name']
  396.         rjson['screenname'] = tdoc['user']['screen_name']
  397.         rjson['avatar'] = tdoc['user']['profile_image_url_https'].replace('\/', '/')
  398.         rjson['description'] = tdoc['user']['description']
  399.         rjson['followers'] = tdoc['user']['followers_count']
  400.         rjson['friends'] = tdoc['user']['friends_count']
  401.         rjson['following'] = tdoc['user']['friends_count']
  402.         rjson['userlocation'] = tdoc['user']['location']
  403.         rjson['statuscount'] = tdoc['user']['statuses_count']
  404.         rjson['hashtags'] = tdoc['entities']['hashtags']
  405.         rjson['mentions'] = tdoc['entities']['user_mentions']
  406.        
  407.         # urls
  408.         rjson['urls'] = tdoc['entities']['urls']
  409.  
  410.         # tweet
  411.         rjson['id'] = tdoc['id']
  412.         if 'lang' in tdoc:
  413.             rjson['lang'] = tdoc['lang']
  414.         else:
  415.             rjson['lang'] = None
  416.        
  417.         rjson['rt'] = tdoc['retweet_count']
  418.        
  419.         try:
  420.             rjson['text'] = escape.linkify(tdoc['text'].replace('\/', '/'), True, extra_params='target="_blank"')   # todo: get rid of &amps
  421.         except Exception:
  422.             print 't2r escape error!', sys.exc_info()[1:3]
  423.             pass
  424.  
  425.  
  426.         # different types of geo
  427.         # only [geo] atm, twitter places too general
  428.         rjson['geo'] = tdoc['geo']
  429.  
  430.         # if retweet, get original timestamp instead
  431.         if 'retweeted_status' in tdoc.keys():
  432.             rjson['timestamp'] = tdoc['retweeted_status']['created_at']
  433.         else:
  434.             rjson['timestamp'] = tdoc['created_at']
  435.  
  436.  
  437.         # if twitter embedded image
  438.         if 'media' in tdoc['entities'].keys():
  439.             for m in tdoc['entities']['media']:
  440.                 media = m['media_url_https'].replace('\/', '/')
  441.             rjson['photo'] = media
  442.         else:
  443.             rjson['photo'] = None
  444.            
  445.            
  446.         # if other images
  447.         if not rjson['photo']:
  448.             if rjson['urls']:
  449.            
  450.            
  451.                 for urls in rjson['urls']:
  452.                     url = urls['expanded_url'].replace('\/', '/')
  453.  
  454.                     #youtube
  455.                     if ('youtu' in url):
  456.                         rjson['video'] = url
  457.                         #vid = video_id(self, youtube_url)
  458.    
  459.                     #yfrog
  460.                     if ('yfrog' in url):
  461.                         rjson['photo'] = url
  462.                        
  463.                     #instagram
  464.                     if ('instagr' in url):
  465.                         rjson['photo'] = url
  466.                    
  467.                     #twitpic
  468.                     if ('twitpic' in url):
  469.                         arr = url.split('/')
  470.                         rjson['photo'] = 'https://twitpic.com/show/large/'
  471.                         rjson['photo'] += '%s' % arr[3].encode('ascii')
  472.                    
  473.                     #tumblr
  474.                     if ('media.tumblr' in url):
  475.                         if ('.jpg' in url):
  476.                             rjson['photo'] = '%s500.jpg' % url[:-7]
  477.                
  478.         # done!
  479.         return rjson
  480.  
  481.  
  482.     except Exception:
  483.         print 't2r general error!'
  484.         print sys.exc_info()[1:3]
  485.         print tdoc
  486.         pass
  487.  
  488.  
  489.  
  490.  
  491.  
  492.  
  493.  
  494.  
  495.  
  496.  
  497.  
  498.  
  499.  
  500.  
  501.  
  502.  
  503.  
  504.  
  505.  
  506. def callback(message):
  507.     """ Callback for tweetstream_gzip """
  508.     try:
  509.         global n, mm, f, l, intime
  510.         n += 1
  511.         if n == 1:
  512.             doc = cjson.decode(message)
  513.             f = doc['id']
  514.             ##print '@%s: %s' % (doc['user']['screen_name'], doc['text'])
  515.    
  516.         if n == mm:
  517.             mm += 1000
  518.             n += 1
  519.             print '%i tweets per second.' % ( n / ( time.time() - intime ))        
  520.    
  521.    
  522.    
  523.         if len(message) > 1:
  524.             # process from json to rawger json + opengraph
  525.             m = cjson.decode(message)
  526.            
  527.             if 'limit' in m:
  528.                 print 'limit!'
  529.                 print m
  530.                 return
  531.  
  532.             # convert to rawger json
  533.             rj = t2r(m)
  534.            
  535.             # add scrape to ioloop
  536.             streamloop.add_callback(fast, rj)
  537.    
  538.     except Exception:
  539.         print 'stream.py callback error: ', sys.exc_info()
  540.         pass
  541.  
  542.  
  543.  
  544.  
  545. def fast(rj):
  546.     """ IOLoop add_callback for FastScraper """
  547.     try:
  548.         fs = FastScraper()
  549.         fs.run(rj)     
  550.         global sockcount
  551.  
  552.     except Exception:
  553.         print 'fast error:', sys.exc_info()[1:3]
  554.         pass
  555.    
  556.    
  557.    
  558.  
  559.  
  560.  
  561.  
  562.  
  563.  
  564.  
  565.  
  566.  
  567. if __name__ == '__main__':
  568.  
  569.    
  570.  
  571.  
  572.  
  573.     #config 0
  574.     configuration_0 = {
  575.         "twitter_consumer_key": "sorry",
  576.         "twitter_consumer_secret": "sorry",
  577.         "twitter_access_token": "sorry",
  578.         "twitter_access_token_secret": "sorry"
  579.     }
  580.  
  581.     keywords_0 = [
  582.         'love',
  583.         'RT',
  584.         'you',
  585.        
  586.     ]
  587.  
  588.     tracks_0 = ','.join(keywords_0)
  589.     stream_0 = tweetstream_gzip.TweetStream(configuration_0, gzip=True)
  590.     url_0 = "/1.1/statuses/filter.json?track=%s" % urllib.quote(tracks_0).encode('utf-8')
  591.  
  592.     stream_0.fetch(url_0, callback=callback)
  593.  
  594.  
  595.     print 'Stream is up.'
  596.    
  597.  
  598.  
  599.     from tornado.ioloop import IOLoop
  600.     streamloop = IOLoop.instance()
  601.     streamloop.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement