Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python
- # -*- coding: utf-8 -*-
- #
- # tweetstream to redis motherlode
- #
- import tweetstream_gzip
- import logging
- from tornado import escape
- import urllib
- import cjson, zlib, sys, time, os
- import struct
- from time import sleep
- from tornado import ioloop
- from tornado import iostream
- import socket, ssl, sys, re
- from urlparse import urlparse
- from bs4 import BeautifulSoup
- import opengraph
- from lxml.html import document_fromstring
- from lxml.html import fragment_fromstring
- from lxml.html import fromstring
- from lxml import etree
- import lxml
- # redis connection to motherlode
- import redis
- r = redis.StrictRedis(host='REDACTED', port=9999, db=0, password='REDACTED')
- ps = r.pubsub()
- # global debug vars
- global n, mm, f, l, intime, sockcount, pcount
- pcount = 0
- sockcount = 0
- n = 0
- mm = 10000
- intime = time.time()
- ##print time.time()
- ######################
- # OPEN GRAPH PARSER #
- ######################
- size = 5000
- class FastScraper(object):
- """ Scrape and find Open Graph metadata in urls """
- def __init__(self):
- self.io = ioloop.IOLoop.instance()
- self.redirect = False
- self.results = []
- self.redirect_url = ''
- def finish_up(self):
- """ Send json (self.rj) to motherlode """
- try:
- message = cjson.encode(self.rj)
- zm = zlib.compress(message) # saving bandwidth
- r.publish('tweet', zm) # send to motherlode
- self.rj = ''
- except Exception:
- print 'finish_up error: ', sys.exc_info()[1:3]
- pass
- def run(self, rjson):
- """ Run scraper """
- try:
- # set json
- self.rj = {}
- self.rj = rjson
- # if no url, finish early
- if not self.rj['urls']:
- self.finish_up()
- return
- # set url and scrape
- self.longurl = self.rj['urls'][0]['expanded_url'].replace('\/', '/')
- self.scrape()
- except Exception:
- print 'run error: ', sys.exc_info()[1:3]
- pass
- def scrape(self):
- # make connection
- try:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.settimeout(2) # 2 seconds timeout
- # streamloop.add_callback(self.connect, s)
- l_onoff = 1
- l_linger = 0
- s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger))
- streamloop.add_callback(self.connect, s) # exp. tried putting this after linger settings, no apparent effect
- except Exception:
- print 'socket error', sys.exc_info()[1:3]
- pass
- def prepare_url(self):
- try:
- # add http if missing in url
- if not 'http' in self.longurl:
- url = 'http://' + self.longurl
- # parse url
- url = urlparse(self.longurl)
- self.url = url.netloc # eg. www.vg.no
- self.path = url.path # eg. /artikkel.php?artid=10119923
- # catch empty url
- if self.url == '' or self.url == None:
- return False
- # determine path
- self.dieurl = self.longurl.split(self.url)
- self.dieurl.reverse()
- self.path = self.dieurl[0]
- if self.path == '':
- self.path = '/'
- # set get size in bytes
- self.size = self.setsize(url.netloc)
- # determine SSL
- if url.scheme == 'https':
- self.SSL = True
- else:
- self.SSL = False
- return True
- except Exception:
- print 'prepare_url error: ', sys.exc_info()[1:3]
- print 'url: ', url
- return False
- def connect(self, s):
- # connect
- try:
- prep = self.prepare_url()
- if not prep:
- self.stream.close()
- return
- if self.SSL:
- ss = ssl.wrap_socket(s, do_handshake_on_connect=False)
- self.stream = iostream.SSLIOStream(ss)
- self.stream.connect((self.url, 443), self.send_ssl_request)
- else:
- self.stream = iostream.IOStream(s)
- self.stream.connect((self.url, 80), self.send_request)
- global sockcount
- sockcount += 1
- # print 'currently open sockets: ', sockcount
- except Exception:
- print 'connect error:'
- print sys.exc_info()[1:3]
- print 'url: ', self.url
- print len(self.url)
- print self.path
- self.stream.close() # maybe not needed
- pass
- def reconnect(self):
- print 'reconnect!reconnect!reconnect!reconnect!'
- pass
- def send_request(self):
- """ Send non-SSL request """
- self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url))
- self.stream.read_until("\r\n\r\n", self.on_headers) # read headers
- def send_ssl_request(self):
- """ Send SSL request """
- self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url))
- self.stream.read_until("\r\n\r\n", self.on_headers) # read headers
- def setsize(self, neturl):
- """ Custom scrape sizes in bytes """
- if 'youtu' in neturl:
- return 5000
- if 'aje.me' in neturl:
- return 10000
- if 'aljazeera' in neturl:
- return 10000
- else:
- # default size
- return size
- def on_headers(self, data):
- """ Callback for headers """
- self.redirect = False
- global sockcount
- try:
- headers = {}
- for line in data.split("\r\n"):
- parts = line.split(": ")
- if len(parts) == 2:
- headers[parts[0].strip()] = parts[1].strip()
- if 'HTTP' in line:
- code = line.split(' ')
- if code[1] == '301' or code[1] == '302' or code[1] == '303':
- # set redirect
- self.redirect = True
- if self.redirect:
- if 'Location' in headers:
- self.longurl = headers['Location']
- if 'location' in headers:
- self.longurl = headers['location']
- self.stream.close() # close
- self.io.add_callback(self.scrape) # restart
- sockcount -= 1
- return
- if 'Content-Length' in headers:
- if size > int(headers['Content-Length']):
- # make sure we're not scraping more than length of document
- self.size = int(headers['Content-Length'])
- try:
- # go ahead and scrape the body
- self.stream.read_bytes(self.size, self.on_body)
- except Exception:
- print 'on headers ==> body error:'
- print sys.exc_info()[1:3]
- print headers
- print data
- print self.size
- print self.url
- print self.path
- sockcount -= 1
- self.stream.close()
- pass
- except Exception:
- print 'on headers error:'
- print sys.exc_info()[1:3]
- print headers
- print data
- sockcount -= 1
- self.stream.close()
- pass
- def on_body(self, data):
- """ Callback after scraping body """
- # parse the og:tags
- self.parse_og(data)
- # finish up
- self.finish_up()
- # clean up
- self.stream.close()
- global sockcount
- sockcount -= 1
- def parse_og(self, data):
- """ lxml parsing to the bone! """
- # return # debug, still all sockets but no lxml // runs fine, aka it's lxml, not amazon aws blocking socket use for example...
- try:
- tree = etree.HTML( data )
- m = tree.xpath("//meta[@property]")
- # for i in m:
- # # print (i.attrib['property'], i.attrib['content'])
- # y = i.attrib['property']
- # x = i.attrib['content']
- # self.rj[y] = x
- tree = ''
- m = ''
- i = ''
- y = ''
- x = ''
- del tree
- del m
- del i
- del y
- del x
- except Exception:
- print 'lxml error: ', sys.exc_info()[1:3]
- print len(data)
- data = ''
- tree = ''
- m = ''
- i = ''
- x = ''
- y = ''
- pass
- ####################
- # RSJON PARSER #
- ####################
- # todo: convert facebook, instagram, bambuser, etc. to rawger json
- def t2r(tdoc):
- """ Convert Twitter JSON to Rawger JSON. """
- try:
- # if rate limit
- rjson = {}
- # general
- rjson['source'] = 'twitter'
- rjson['device'] = tdoc['source'] # todo
- # user
- rjson['userid'] = tdoc['user']['id']
- rjson['fullname'] = tdoc['user']['name']
- rjson['screenname'] = tdoc['user']['screen_name']
- rjson['avatar'] = tdoc['user']['profile_image_url_https'].replace('\/', '/')
- rjson['description'] = tdoc['user']['description']
- rjson['followers'] = tdoc['user']['followers_count']
- rjson['friends'] = tdoc['user']['friends_count']
- rjson['following'] = tdoc['user']['friends_count']
- rjson['userlocation'] = tdoc['user']['location']
- rjson['statuscount'] = tdoc['user']['statuses_count']
- rjson['hashtags'] = tdoc['entities']['hashtags']
- rjson['mentions'] = tdoc['entities']['user_mentions']
- # urls
- rjson['urls'] = tdoc['entities']['urls']
- # tweet
- rjson['id'] = tdoc['id']
- if 'lang' in tdoc:
- rjson['lang'] = tdoc['lang']
- else:
- rjson['lang'] = None
- rjson['rt'] = tdoc['retweet_count']
- try:
- rjson['text'] = escape.linkify(tdoc['text'].replace('\/', '/'), True, extra_params='target="_blank"') # todo: get rid of &s
- except Exception:
- print 't2r escape error!', sys.exc_info()[1:3]
- pass
- # different types of geo
- # only [geo] atm, twitter places too general
- rjson['geo'] = tdoc['geo']
- # if retweet, get original timestamp instead
- if 'retweeted_status' in tdoc.keys():
- rjson['timestamp'] = tdoc['retweeted_status']['created_at']
- else:
- rjson['timestamp'] = tdoc['created_at']
- # if twitter embedded image
- if 'media' in tdoc['entities'].keys():
- for m in tdoc['entities']['media']:
- media = m['media_url_https'].replace('\/', '/')
- rjson['photo'] = media
- else:
- rjson['photo'] = None
- # if other images
- if not rjson['photo']:
- if rjson['urls']:
- for urls in rjson['urls']:
- url = urls['expanded_url'].replace('\/', '/')
- #youtube
- if ('youtu' in url):
- rjson['video'] = url
- #vid = video_id(self, youtube_url)
- #yfrog
- if ('yfrog' in url):
- rjson['photo'] = url
- #instagram
- if ('instagr' in url):
- rjson['photo'] = url
- #twitpic
- if ('twitpic' in url):
- arr = url.split('/')
- rjson['photo'] = 'https://twitpic.com/show/large/'
- rjson['photo'] += '%s' % arr[3].encode('ascii')
- #tumblr
- if ('media.tumblr' in url):
- if ('.jpg' in url):
- rjson['photo'] = '%s500.jpg' % url[:-7]
- # done!
- return rjson
- except Exception:
- print 't2r general error!'
- print sys.exc_info()[1:3]
- print tdoc
- pass
- def callback(message):
- """ Callback for tweetstream_gzip """
- try:
- global n, mm, f, l, intime
- n += 1
- if n == 1:
- doc = cjson.decode(message)
- f = doc['id']
- ##print '@%s: %s' % (doc['user']['screen_name'], doc['text'])
- if n == mm:
- mm += 1000
- n += 1
- print '%i tweets per second.' % ( n / ( time.time() - intime ))
- if len(message) > 1:
- # process from json to rawger json + opengraph
- m = cjson.decode(message)
- if 'limit' in m:
- print 'limit!'
- print m
- return
- # convert to rawger json
- rj = t2r(m)
- # add scrape to ioloop
- streamloop.add_callback(fast, rj)
- except Exception:
- print 'stream.py callback error: ', sys.exc_info()
- pass
- def fast(rj):
- """ IOLoop add_callback for FastScraper """
- try:
- fs = FastScraper()
- fs.run(rj)
- global sockcount
- except Exception:
- print 'fast error:', sys.exc_info()[1:3]
- pass
- if __name__ == '__main__':
- #config 0
- configuration_0 = {
- "twitter_consumer_key": "sorry",
- "twitter_consumer_secret": "sorry",
- "twitter_access_token": "sorry",
- "twitter_access_token_secret": "sorry"
- }
- keywords_0 = [
- 'love',
- 'RT',
- 'you',
- ]
- tracks_0 = ','.join(keywords_0)
- stream_0 = tweetstream_gzip.TweetStream(configuration_0, gzip=True)
- url_0 = "/1.1/statuses/filter.json?track=%s" % urllib.quote(tracks_0).encode('utf-8')
- stream_0.fetch(url_0, callback=callback)
- print 'Stream is up.'
- from tornado.ioloop import IOLoop
- streamloop = IOLoop.instance()
- streamloop.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement