#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# tweetstream to redis motherlode
#
import tweetstream_gzip
import logging
from tornado import escape
import urllib
import cjson, zlib, sys, time, os
import struct
from time import sleep
from tornado import ioloop
from tornado import iostream
import socket, ssl, sys, re
from urlparse import urlparse
from bs4 import BeautifulSoup
import opengraph
from lxml.html import document_fromstring
from lxml.html import fragment_fromstring
from lxml.html import fromstring
from lxml import etree
import lxml
# redis connection to motherlode
import redis
r = redis.StrictRedis(host='REDACTED', port=9999, db=0, password='REDACTED')
ps = r.pubsub()
# global debug vars
global n, mm, f, l, intime, sockcount, pcount
pcount = 0
sockcount = 0
n = 0
mm = 10000
intime = time.time()
##print time.time()
######################
# OPEN GRAPH PARSER #
######################
size = 5000
class FastScraper(object):
""" Scrape and find Open Graph metadata in urls """
def __init__(self):
self.io = ioloop.IOLoop.instance()
self.redirect = False
self.results = []
self.redirect_url = ''
def finish_up(self):
""" Send json (self.rj) to motherlode """
try:
message = cjson.encode(self.rj)
zm = zlib.compress(message) # saving bandwidth
r.publish('tweet', zm) # send to motherlode
self.rj = ''
except Exception:
print 'finish_up error: ', sys.exc_info()[1:3]
pass
def run(self, rjson):
""" Run scraper """
try:
# set json
self.rj = {}
self.rj = rjson
# if no url, finish early
if not self.rj['urls']:
self.finish_up()
return
# set url and scrape
self.longurl = self.rj['urls'][0]['expanded_url'].replace('\/', '/')
self.scrape()
except Exception:
print 'run error: ', sys.exc_info()[1:3]
pass
def scrape(self):
# make connection
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.settimeout(2) # 2 seconds timeout
# streamloop.add_callback(self.connect, s)
l_onoff = 1
l_linger = 0
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger))
streamloop.add_callback(self.connect, s) # exp. tried putting this after linger settings, no apparent effect
except Exception:
print 'socket error', sys.exc_info()[1:3]
pass
def prepare_url(self):
try:
# add http if missing in url
if not 'http' in self.longurl:
url = 'http://' + self.longurl
# parse url
url = urlparse(self.longurl)
self.url = url.netloc # eg. www.vg.no
self.path = url.path # eg. /artikkel.php?artid=10119923
# catch empty url
if self.url == '' or self.url == None:
return False
# determine path
self.dieurl = self.longurl.split(self.url)
self.dieurl.reverse()
self.path = self.dieurl[0]
if self.path == '':
self.path = '/'
# set get size in bytes
self.size = self.setsize(url.netloc)
# determine SSL
if url.scheme == 'https':
self.SSL = True
else:
self.SSL = False
return True
except Exception:
print 'prepare_url error: ', sys.exc_info()[1:3]
print 'url: ', url
return False
def connect(self, s):
# connect
try:
prep = self.prepare_url()
if not prep:
self.stream.close()
return
if self.SSL:
ss = ssl.wrap_socket(s, do_handshake_on_connect=False)
self.stream = iostream.SSLIOStream(ss)
self.stream.connect((self.url, 443), self.send_ssl_request)
else:
self.stream = iostream.IOStream(s)
self.stream.connect((self.url, 80), self.send_request)
global sockcount
sockcount += 1
# print 'currently open sockets: ', sockcount
except Exception:
print 'connect error:'
print sys.exc_info()[1:3]
print 'url: ', self.url
print len(self.url)
print self.path
self.stream.close() # maybe not needed
pass
def reconnect(self):
print 'reconnect!reconnect!reconnect!reconnect!'
pass
def send_request(self):
""" Send non-SSL request """
self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url))
self.stream.read_until("\r\n\r\n", self.on_headers) # read headers
def send_ssl_request(self):
""" Send SSL request """
self.stream.write("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n" % (self.path, self.url))
self.stream.read_until("\r\n\r\n", self.on_headers) # read headers
def setsize(self, neturl):
""" Custom scrape sizes in bytes """
if 'youtu' in neturl:
return 5000
if 'aje.me' in neturl:
return 10000
if 'aljazeera' in neturl:
return 10000
else:
# default size
return size
def on_headers(self, data):
""" Callback for headers """
self.redirect = False
global sockcount
try:
headers = {}
for line in data.split("\r\n"):
parts = line.split(": ")
if len(parts) == 2:
headers[parts[0].strip()] = parts[1].strip()
if 'HTTP' in line:
code = line.split(' ')
if code[1] == '301' or code[1] == '302' or code[1] == '303':
# set redirect
self.redirect = True
if self.redirect:
if 'Location' in headers:
self.longurl = headers['Location']
if 'location' in headers:
self.longurl = headers['location']
self.stream.close() # close
self.io.add_callback(self.scrape) # restart
sockcount -= 1
return
if 'Content-Length' in headers:
if size > int(headers['Content-Length']):
# make sure we're not scraping more than length of document
self.size = int(headers['Content-Length'])
try:
# go ahead and scrape the body
self.stream.read_bytes(self.size, self.on_body)
except Exception:
print 'on headers ==> body error:'
print sys.exc_info()[1:3]
print headers
print data
print self.size
print self.url
print self.path
sockcount -= 1
self.stream.close()
pass
except Exception:
print 'on headers error:'
print sys.exc_info()[1:3]
print headers
print data
sockcount -= 1
self.stream.close()
pass
def on_body(self, data):
""" Callback after scraping body """
# parse the og:tags
self.parse_og(data)
# finish up
self.finish_up()
# clean up
self.stream.close()
global sockcount
sockcount -= 1
def parse_og(self, data):
""" lxml parsing to the bone! """
# return # debug, still all sockets but no lxml // runs fine, aka it's lxml, not amazon aws blocking socket use for example...
try:
tree = etree.HTML( data )
m = tree.xpath("//meta[@property]")
# for i in m:
# # print (i.attrib['property'], i.attrib['content'])
# y = i.attrib['property']
# x = i.attrib['content']
# self.rj[y] = x
tree = ''
m = ''
i = ''
y = ''
x = ''
del tree
del m
del i
del y
del x
except Exception:
print 'lxml error: ', sys.exc_info()[1:3]
print len(data)
data = ''
tree = ''
m = ''
i = ''
x = ''
y = ''
pass
####################
# RSJON PARSER #
####################
# todo: convert facebook, instagram, bambuser, etc. to rawger json
def t2r(tdoc):
""" Convert Twitter JSON to Rawger JSON. """
try:
# if rate limit
rjson = {}
# general
rjson['source'] = 'twitter'
rjson['device'] = tdoc['source'] # todo
# user
rjson['userid'] = tdoc['user']['id']
rjson['fullname'] = tdoc['user']['name']
rjson['screenname'] = tdoc['user']['screen_name']
rjson['avatar'] = tdoc['user']['profile_image_url_https'].replace('\/', '/')
rjson['description'] = tdoc['user']['description']
rjson['followers'] = tdoc['user']['followers_count']
rjson['friends'] = tdoc['user']['friends_count']
rjson['following'] = tdoc['user']['friends_count']
rjson['userlocation'] = tdoc['user']['location']
rjson['statuscount'] = tdoc['user']['statuses_count']
rjson['hashtags'] = tdoc['entities']['hashtags']
rjson['mentions'] = tdoc['entities']['user_mentions']
# urls
rjson['urls'] = tdoc['entities']['urls']
# tweet
rjson['id'] = tdoc['id']
if 'lang' in tdoc:
rjson['lang'] = tdoc['lang']
else:
rjson['lang'] = None
rjson['rt'] = tdoc['retweet_count']
try:
rjson['text'] = escape.linkify(tdoc['text'].replace('\/', '/'), True, extra_params='target="_blank"') # todo: get rid of &s
except Exception:
print 't2r escape error!', sys.exc_info()[1:3]
pass
# different types of geo
# only [geo] atm, twitter places too general
rjson['geo'] = tdoc['geo']
# if retweet, get original timestamp instead
if 'retweeted_status' in tdoc.keys():
rjson['timestamp'] = tdoc['retweeted_status']['created_at']
else:
rjson['timestamp'] = tdoc['created_at']
# if twitter embedded image
if 'media' in tdoc['entities'].keys():
for m in tdoc['entities']['media']:
media = m['media_url_https'].replace('\/', '/')
rjson['photo'] = media
else:
rjson['photo'] = None
# if other images
if not rjson['photo']:
if rjson['urls']:
for urls in rjson['urls']:
url = urls['expanded_url'].replace('\/', '/')
#youtube
if ('youtu' in url):
rjson['video'] = url
#vid = video_id(self, youtube_url)
#yfrog
if ('yfrog' in url):
rjson['photo'] = url
#instagram
if ('instagr' in url):
rjson['photo'] = url
#twitpic
if ('twitpic' in url):
arr = url.split('/')
rjson['photo'] = 'https://twitpic.com/show/large/'
rjson['photo'] += '%s' % arr[3].encode('ascii')
#tumblr
if ('media.tumblr' in url):
if ('.jpg' in url):
rjson['photo'] = '%s500.jpg' % url[:-7]
# done!
return rjson
except Exception:
print 't2r general error!'
print sys.exc_info()[1:3]
print tdoc
pass
def callback(message):
""" Callback for tweetstream_gzip """
try:
global n, mm, f, l, intime
n += 1
if n == 1:
doc = cjson.decode(message)
f = doc['id']
##print '@%s: %s' % (doc['user']['screen_name'], doc['text'])
if n == mm:
mm += 1000
n += 1
print '%i tweets per second.' % ( n / ( time.time() - intime ))
if len(message) > 1:
# process from json to rawger json + opengraph
m = cjson.decode(message)
if 'limit' in m:
print 'limit!'
print m
return
# convert to rawger json
rj = t2r(m)
# add scrape to ioloop
streamloop.add_callback(fast, rj)
except Exception:
print 'stream.py callback error: ', sys.exc_info()
pass
def fast(rj):
""" IOLoop add_callback for FastScraper """
try:
fs = FastScraper()
fs.run(rj)
global sockcount
except Exception:
print 'fast error:', sys.exc_info()[1:3]
pass
if __name__ == '__main__':
#config 0
configuration_0 = {
"twitter_consumer_key": "sorry",
"twitter_consumer_secret": "sorry",
"twitter_access_token": "sorry",
"twitter_access_token_secret": "sorry"
}
keywords_0 = [
'love',
'RT',
'you',
]
tracks_0 = ','.join(keywords_0)
stream_0 = tweetstream_gzip.TweetStream(configuration_0, gzip=True)
url_0 = "/1.1/statuses/filter.json?track=%s" % urllib.quote(tracks_0).encode('utf-8')
stream_0.fetch(url_0, callback=callback)
print 'Stream is up.'
from tornado.ioloop import IOLoop
streamloop = IOLoop.instance()
streamloop.start()