#!/usr/bin/python # -*- coding: utf-8 -*- # # tweetstream to redis motherlode # import tweetstream_gzip import logging from tornado import escape import urllib import cjson, zlib, sys, time, os import struct from time import sleep from tornado import ioloop from tornado import iostream import socket, ssl, sys, re from urlparse import urlparse from bs4 import BeautifulSoup import opengraph from lxml.html import document_fromstring from lxml.html import fragment_fromstring from lxml.html import fromstring from lxml import etree import lxml # redis connection to motherlode import redis r = redis.StrictRedis(host='REDACTED', port=9999, db=0, password='REDACTED') ps = r.pubsub() # global debug vars global n, mm, f, l, intime, sockcount, pcount pcount = 0 sockcount = 0 n = 0 mm = 10000 intime = time.time() ##print time.time() ###################### # OPEN GRAPH PARSER # ###################### size = 5000 class FastScraper(object): """ Scrape and find Open Graph metadata in urls """ def __init__(self): self.io = ioloop.IOLoop.instance() self.redirect = False self.results = [] self.redirect_url = '' def finish_up(self): """ Send json (self.rj) to motherlode """ try: message = cjson.encode(self.rj) zm = zlib.compress(message) # saving bandwidth r.publish('tweet', zm) # send to motherlode self.rj = '' except Exception: print 'finish_up error: ', sys.exc_info()[1:3] pass def run(self, rjson): """ Run scraper """ try: # set json self.rj = {} self.rj = rjson # if no url, finish early if not self.rj['urls']: self.finish_up() return # set url and scrape self.longurl = self.rj['urls'][0]['expanded_url'].replace('\/', '/') self.scrape() except Exception: print 'run error: ', sys.exc_info()[1:3] pass def scrape(self): # make connection try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.settimeout(2) # 2 seconds timeout # streamloop.add_callback(self.connect, s) l_onoff = 1 l_linger = 0 s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger)) streamloop.add_callback(self.connect, s) # exp. tried putting this after linger settings, no apparent effect except Exception: print 'socket error', sys.exc_info()[1:3] pass def prepare_url(self): try: # add http if missing in url if not 'http' in self.longurl: url = 'http://' + self.longurl # parse url url = urlparse(self.longurl) self.url = url.netloc # eg. www.vg.no self.path = url.path # eg. /artikkel.php?artid=10119923 # catch empty url if self.url == '' or self.url == None: return False # determine path self.dieurl = self.longurl.split(self.url) self.dieurl.reverse() self.path = self.dieurl[0] if self.path == '': self.path = '/' # set get size in bytes self.size = self.setsize(url.netloc) # determine SSL if url.scheme == 'https': self.SSL = True else: self.SSL = False return True except Exception: print 'prepare_url error: ', sys.exc_info()[1:3] print 'url: ', url return False def connect(self, s): # connect try: prep = self.prepare_url() if not prep: self.stream.close() return if self.SSL: ss = ssl.wrap_socket(s, do_handshake_on_connect=False) self.stream = iostream.SSLIOStream(ss) self.stream.connect((self.url, 443), self.send_ssl_request) else: self.stream = iostream.IOStream(s) self.stream.connect((self.url, 80), self.send_request) global sockcount sockcount += 1 # print 'currently open sockets: ', sockcount except Exception: print 'connect error:' print sys.exc_info()[1:3] print 'url: ', self.url print len(self.url) print self.path self.stream.close() # maybe not needed pass def reconnect(self): print 'reconnect!reconnect!reconnect!reconnect!' pass def send_request(self): """ Send non-SSL request """ self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url)) self.stream.read_until("\r\n\r\n", self.on_headers) # read headers def send_ssl_request(self): """ Send SSL request """ self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url)) self.stream.read_until("\r\n\r\n", self.on_headers) # read headers def setsize(self, neturl): """ Custom scrape sizes in bytes """ if 'youtu' in neturl: return 5000 if 'aje.me' in neturl: return 10000 if 'aljazeera' in neturl: return 10000 else: # default size return size def on_headers(self, data): """ Callback for headers """ self.redirect = False global sockcount try: headers = {} for line in data.split("\r\n"): parts = line.split(": ") if len(parts) == 2: headers[parts[0].strip()] = parts[1].strip() if 'HTTP' in line: code = line.split(' ') if code[1] == '301' or code[1] == '302' or code[1] == '303': # set redirect self.redirect = True if self.redirect: if 'Location' in headers: self.longurl = headers['Location'] if 'location' in headers: self.longurl = headers['location'] self.stream.close() # close self.io.add_callback(self.scrape) # restart sockcount -= 1 return if 'Content-Length' in headers: if size > int(headers['Content-Length']): # make sure we're not scraping more than length of document self.size = int(headers['Content-Length']) try: # go ahead and scrape the body self.stream.read_bytes(self.size, self.on_body) except Exception: print 'on headers ==> body error:' print sys.exc_info()[1:3] print headers print data print self.size print self.url print self.path sockcount -= 1 self.stream.close() pass except Exception: print 'on headers error:' print sys.exc_info()[1:3] print headers print data sockcount -= 1 self.stream.close() pass def on_body(self, data): """ Callback after scraping body """ # parse the og:tags self.parse_og(data) # finish up self.finish_up() # clean up self.stream.close() global sockcount sockcount -= 1 def parse_og(self, data): """ lxml parsing to the bone! """ # return # debug, still all sockets but no lxml // runs fine, aka it's lxml, not amazon aws blocking socket use for example... try: tree = etree.HTML( data ) m = tree.xpath("//meta[@property]") # for i in m: # # print (i.attrib['property'], i.attrib['content']) # y = i.attrib['property'] # x = i.attrib['content'] # self.rj[y] = x tree = '' m = '' i = '' y = '' x = '' del tree del m del i del y del x except Exception: print 'lxml error: ', sys.exc_info()[1:3] print len(data) data = '' tree = '' m = '' i = '' x = '' y = '' pass #################### # RSJON PARSER # #################### # todo: convert facebook, instagram, bambuser, etc. to rawger json def t2r(tdoc): """ Convert Twitter JSON to Rawger JSON. """ try: # if rate limit rjson = {} # general rjson['source'] = 'twitter' rjson['device'] = tdoc['source'] # todo # user rjson['userid'] = tdoc['user']['id'] rjson['fullname'] = tdoc['user']['name'] rjson['screenname'] = tdoc['user']['screen_name'] rjson['avatar'] = tdoc['user']['profile_image_url_https'].replace('\/', '/') rjson['description'] = tdoc['user']['description'] rjson['followers'] = tdoc['user']['followers_count'] rjson['friends'] = tdoc['user']['friends_count'] rjson['following'] = tdoc['user']['friends_count'] rjson['userlocation'] = tdoc['user']['location'] rjson['statuscount'] = tdoc['user']['statuses_count'] rjson['hashtags'] = tdoc['entities']['hashtags'] rjson['mentions'] = tdoc['entities']['user_mentions'] # urls rjson['urls'] = tdoc['entities']['urls'] # tweet rjson['id'] = tdoc['id'] if 'lang' in tdoc: rjson['lang'] = tdoc['lang'] else: rjson['lang'] = None rjson['rt'] = tdoc['retweet_count'] try: rjson['text'] = escape.linkify(tdoc['text'].replace('\/', '/'), True, extra_params='target="_blank"') # todo: get rid of &s except Exception: print 't2r escape error!', sys.exc_info()[1:3] pass # different types of geo # only [geo] atm, twitter places too general rjson['geo'] = tdoc['geo'] # if retweet, get original timestamp instead if 'retweeted_status' in tdoc.keys(): rjson['timestamp'] = tdoc['retweeted_status']['created_at'] else: rjson['timestamp'] = tdoc['created_at'] # if twitter embedded image if 'media' in tdoc['entities'].keys(): for m in tdoc['entities']['media']: media = m['media_url_https'].replace('\/', '/') rjson['photo'] = media else: rjson['photo'] = None # if other images if not rjson['photo']: if rjson['urls']: for urls in rjson['urls']: url = urls['expanded_url'].replace('\/', '/') #youtube if ('youtu' in url): rjson['video'] = url #vid = video_id(self, youtube_url) #yfrog if ('yfrog' in url): rjson['photo'] = url #instagram if ('instagr' in url): rjson['photo'] = url #twitpic if ('twitpic' in url): arr = url.split('/') rjson['photo'] = 'https://twitpic.com/show/large/' rjson['photo'] += '%s' % arr[3].encode('ascii') #tumblr if ('media.tumblr' in url): if ('.jpg' in url): rjson['photo'] = '%s500.jpg' % url[:-7] # done! return rjson except Exception: print 't2r general error!' print sys.exc_info()[1:3] print tdoc pass def callback(message): """ Callback for tweetstream_gzip """ try: global n, mm, f, l, intime n += 1 if n == 1: doc = cjson.decode(message) f = doc['id'] ##print '@%s: %s' % (doc['user']['screen_name'], doc['text']) if n == mm: mm += 1000 n += 1 print '%i tweets per second.' % ( n / ( time.time() - intime )) if len(message) > 1: # process from json to rawger json + opengraph m = cjson.decode(message) if 'limit' in m: print 'limit!' print m return # convert to rawger json rj = t2r(m) # add scrape to ioloop streamloop.add_callback(fast, rj) except Exception: print 'stream.py callback error: ', sys.exc_info() pass def fast(rj): """ IOLoop add_callback for FastScraper """ try: fs = FastScraper() fs.run(rj) global sockcount except Exception: print 'fast error:', sys.exc_info()[1:3] pass if __name__ == '__main__': #config 0 configuration_0 = { "twitter_consumer_key": "sorry", "twitter_consumer_secret": "sorry", "twitter_access_token": "sorry", "twitter_access_token_secret": "sorry" } keywords_0 = [ 'love', 'RT', 'you', ] tracks_0 = ','.join(keywords_0) stream_0 = tweetstream_gzip.TweetStream(configuration_0, gzip=True) url_0 = "/1.1/statuses/filter.json?track=%s" % urllib.quote(tracks_0).encode('utf-8') stream_0.fetch(url_0, callback=callback) print 'Stream is up.' from tornado.ioloop import IOLoop streamloop = IOLoop.instance() streamloop.start()