sobach

4SQ-Collector

Mar 31st, 2013
365
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 10.86 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2. #-------------------------------------------------------------------------------
  3. # Name:        Foursquare checkins crawler-processor
  4. # Author:      Alexander Tolmach
  5. # Created:     15.03.2013
  6. # Licence:     CC BY-SA
  7. #-------------------------------------------------------------------------------
  8. #!/usr/bin/env python
  9.  
  10. import re
  11.  
  12. import time
  13. import datetime
  14.  
  15. import urlparse
  16. from urllib2 import urlopen
  17.  
  18. import json
  19.  
  20. import PySQLPool
  21. import MySQLdb
  22. import tweetstream
  23. import threading
  24.  
  25. #TWITTER
  26. twitter_login = 'TWITTER_LOGIN'
  27. twitter_password = 'TWITTER_PASSWORD'
  28. twitter_locations = ['37.286,55.438', '37.984,55.99'] # Moscow region coordinates
  29.  
  30. #BITLY
  31. bitly_login = 'BITLY_LOGIN'
  32. bitly_apikey = 'BITLY_API_KEY'
  33.  
  34. #FOURSQUARE
  35. fsq_clientid = 'FOURSQUARE_CLINET_ID'
  36. fsq_clientsecret = 'FOURSQUARE_CLIENT_SECRET'
  37.  
  38. #REKOGNITION
  39. rek_key = 'REKOGNITION_KEY'
  40. rek_secret = 'REKOGNITION_SECRET'
  41.  
  42. #MYSQL
  43. PySQLPool.getNewPool().maxActiveConnections = 1
  44. db_connection = PySQLPool.getNewConnection(username='DB_USER', password='DB_PASSWORD', host='DB_SERVER', db='DB_NAME')
  45.  
  46.  
  47. class TwitterThread(threading.Thread):
  48.     def run(self):
  49.         passtime = 60
  50.         while True:
  51.             try:
  52.                 stream = tweetstream.FilterStream(twitter_login, twitter_password, locations=twitter_locations)
  53.                 passtime = 60
  54.                 for tweet in stream:
  55.                     if tweet['entities']['urls']:
  56.                         for element in tweet['entities']['urls']:
  57.                             if '4sq.com' in element['expanded_url']:
  58.                                 query = PySQLPool.getNewQuery(db_connection, commitOnEnd=True)
  59.                                 query.Query('INSERT INTO checkins(twitlink) VALUES ("{0}");'.format(element['expanded_url']))
  60.             except:
  61.                 print 'ERROR'
  62.                 time.sleep(passtime)
  63.                 passtime *= 2
  64.                 pass
  65.  
  66.  
  67. class BitlyThread(threading.Thread):
  68.     def run(self):
  69.         while True:
  70.             query = PySQLPool.getNewQuery(db_connection, commitOnEnd=True)
  71.             query.Query('SELECT * FROM checkins WHERE twitunboxed=0 LIMIT 1;')
  72.             try:
  73.                 dbrow = query.record[0]
  74.             except IndexError:
  75.                 time.sleep(60)
  76.             else:
  77.                 checkinid, signature = expandlink(dbrow['twitlink'])
  78.                 if checkinid == 'error':
  79.                     twitstat = 9
  80.                 else:
  81.                     twitstat = 1
  82.                 query.Query('UPDATE checkins SET twitunboxed={0}, checkinid="{1}", signature="{2}" WHERE twitid={3};'.format(
  83.                     twitstat,
  84.                     checkinid,
  85.                     signature,
  86.                     dbrow['twitid']))
  87.                 time.sleep(3)
  88.  
  89.  
  90. class FoursquareThread(threading.Thread):
  91.     def run(self):
  92.         while True:
  93.             query = PySQLPool.getNewQuery(db_connection, commitOnEnd=True)
  94.             query.Query('SELECT * FROM checkins WHERE twitunboxed=1 LIMIT 1;')
  95.             try:
  96.                 dbrow = query.record[0]
  97.             except IndexError:
  98.                 time.sleep(60)
  99.             else:
  100.                 startt = time.clock()
  101.                 checkindict, userdict, venuedict = checkfoursquare(dbrow['checkinid'], dbrow['signature'])
  102.  
  103.                 #checkindict = {'time':dt,'venueid':venueid,'userid':userid}
  104.                 #userdict = {'id':userid, 'photo':userphoto}
  105.                 #venuedict = {'id':venueid, 'name':venuename, 'lat':venuelat, 'lng':venuelng, 'cat1':cat1, 'cat2':cat2}
  106.  
  107.                 if venuedict['id'] == 'error':
  108.                     twitstat = 9
  109.                 else:
  110.                     twitstat = 2
  111.                 query.Query('UPDATE checkins SET twitunboxed={0}, userid="{1}", venueid="{2}", chekintime="{3}" WHERE twitid={4};'.format(twitstat, checkindict['userid'], checkindict['venueid'], checkindict['time'], dbrow['twitid']))
  112.  
  113.                 query.Query('SELECT * FROM venues WHERE venueid="{0}";'.format(venuedict['id']))
  114.                 if len(query.record) == 0:
  115.                     query.Query('INSERT INTO venues(venueid, name, lat, lng, cat1, cat2) VALUES ("{0}", "{1}", {2}, {3}, "{4}", "{5}");'.format(
  116.                         venuedict['id'],
  117.                         MySQLdb.escape_string(venuedict['name'].encode('utf-8', 'replace')),
  118.                         venuedict['lat'],
  119.                         venuedict['lng'],
  120.                         venuedict['cat1'],
  121.                         venuedict['cat2'],
  122.                     ))
  123.  
  124.                 query.Query('SELECT * FROM people WHERE userid="{0}";'.format(userdict['id']))
  125.                 if len(query.record) == 0:
  126.                     query.Query('INSERT INTO people(userid, photo) VALUES ("{0}", "{1}");'.format(
  127.                         userdict['id'],
  128.                         userdict['photo'],
  129.                     ))
  130.  
  131.                 if (time.clock() - startt) < 8:
  132.                     time.sleep(8 - (time.clock() - startt))
  133.  
  134.  
  135. class MonitorThread(threading.Thread):
  136.     def run(self):
  137.         while True:
  138.             counterr = counttwi = countbit = countfsq = 0
  139.             query = PySQLPool.getNewQuery(db_connection, commitOnEnd=True)
  140.             query.Query('SELECT twitunboxed, COUNT(*) FROM checkins GROUP BY twitunboxed;')
  141.             for line in query.record:
  142.                 if line['twitunboxed'] == 0: counttwi = line['COUNT(*)']
  143.                 elif line['twitunboxed'] == 1: countbit = line['COUNT(*)']
  144.                 elif line['twitunboxed'] == 2: countfsq = line['COUNT(*)']
  145.                 elif line['twitunboxed'] == 9: counterr = line['COUNT(*)']
  146.             query.Query('SELECT COUNT(*) FROM venues;')
  147.             countvenues = query.record[0]['COUNT(*)']
  148.             query.Query('SELECT COUNT(*) FROM people;')
  149.             countusers = query.record[0]['COUNT(*)']
  150.             timer = time.strftime('%d.%m.%Y %H:%M:%S', time.gmtime(time.time() + (4 * 60 * 60)))
  151.             query.Query('INSERT INTO stats(timer,checkins,users,venues,waiting,errors) VALUES ("{0}","{1}","{2}","{3}","{4}","{5}");'.format(
  152.                 timer,
  153.                 countfsq,
  154.                 countusers,
  155.                 countvenues,
  156.                 countbit + counttwi,
  157.                 counterr))
  158.             time.sleep(60 * 60)
  159.  
  160.  
  161. class ReKognitionThread(threading.Thread):
  162.     def run(self):
  163.         while True:
  164.             query = PySQLPool.getNewQuery(db_connection, commitOnEnd=True)
  165.             query.Query('SELECT * FROM people WHERE age_prob is NULL and not(photo = "error") LIMIT 1;')
  166.             try: dbrow = query.record[0]
  167.             except IndexError: time.sleep(60 * 3)
  168.             else:
  169.                 facedict = facerekognition('https://is1.4sqi.net/userpix/{0}'.format(dbrow['photo']))
  170.                 query.Query('UPDATE people SET age={0}, age_prob={1}, gender={2}, gender_prob={3} WHERE userid={4};'.format(
  171.                     facedict['age'],
  172.                     facedict['age_prob'],
  173.                     facedict['sex'],
  174.                     facedict['sex_prob'],
  175.                     dbrow['userid']
  176.                 ))
  177.                 time.sleep(60 * 3)
  178.  
  179.  
  180. def expandlink(link):
  181.     json_tx = jsonfromurl('https://api-ssl.bitly.com/v3/expand?shortUrl={0}&login={1}&apiKey={2}'.format(link, bitly_login, bitly_apikey))
  182.     try:
  183.         retval1 = urlparse.urlparse(json_tx['data']['expand'][0]['long_url']).path.split('/')[-1]
  184.         retval2 = urlparse.parse_qs(urlparse.urlparse(json_tx['data']['expand'][0]['long_url']).query)['s'][0]
  185.     except:
  186.         retval1 = retval2 = 'error'
  187.     return retval1, retval2
  188.  
  189.  
  190. def checkfoursquare(check, code):
  191.     json_ch = jsonfromurl('https://api.foursquare.com/v2/checkins/{0}?signature={1}&client_id={2}&client_secret={3}&v={4}'.format(
  192.         check,
  193.         code,
  194.         fsq_clientid,
  195.         fsq_clientsecret,
  196.         datetime.datetime.now().strftime('%Y%m%d')))
  197.  
  198.     try: dt = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(int(json_ch['response']['checkin']['createdAt']) + (4 * 60 * 60)))
  199.     except: dt = '1900-01-01 00:00:00'
  200.  
  201.     try: venueid = json_ch['response']['checkin']['venue']['id']
  202.     except: venueid = 'error'
  203.  
  204.     try: userid = json_ch['response']['checkin']['user']['id']
  205.     except: userid = 'error'
  206.  
  207.     try:
  208.         userphoto = json_ch['response']['checkin']['user']['photo']['suffix']
  209.         if 'blank' in userphoto: userphoto='error'
  210.     except: userphoto = 'error'
  211.  
  212.     try: venuename = json_ch['response']['checkin']['venue']['name']
  213.     except: venuename = 'error'
  214.  
  215.     try:
  216.         venuelat = json_ch['response']['checkin']['venue']['location']['lat']
  217.         venuelng = json_ch['response']['checkin']['venue']['location']['lng']
  218.     except: venuelat = venuelng = 0.
  219.  
  220.     try:
  221.         cat1 = 'error'
  222.         for category in json_ch['response']['checkin']['venue']['categories']:
  223.             if 'primary' in category and category['primary'] is True:
  224.                 cat1 = category['id']
  225.                 break
  226.     except:
  227.         pass
  228.  
  229.     try:
  230.         cat2 = re.sub('https://foursquare.com/img/categories_v2/', '', json_ch['response']['checkin']['venue']['categories'][0]['icon']['prefix'])
  231.         if cat2[-1:] == '_': cat2=cat2[:-1]
  232.     except: cat2 = 'error'
  233.  
  234.  
  235.     checkindict = {'time': dt, 'venueid': venueid, 'userid': userid}
  236.     userdict = {'id': userid, 'photo': userphoto}
  237.     venuedict = {'id': venueid, 'name': venuename, 'lat': venuelat, 'lng': venuelng, 'cat1': cat1, 'cat2': cat2}
  238.  
  239.     return checkindict, userdict, venuedict
  240.  
  241.  
  242. def facerekognition(url):
  243.     json_fc = jsonfromurl('http://rekognition.com/func/api/?api_key={0}&api_secret={1}&jobs=face_gender_age&urls={2}'.format(
  244.         rek_key,
  245.         rek_secret,
  246.         url
  247.     ))
  248.     if 'face_detection' not in json_fc or len(json_fc['face_detection']) == 0:
  249.         return {'sex': 9, 'age': 0, 'age_prob': 9, 'sex_prob': 9}
  250.     else:
  251.         face = sorted(json_fc['face_detection'], key=lambda x: x['confidence'], reverse=True)[0]
  252.         try:
  253.             sexprob = abs(.5 - face['sex']) * 2
  254.             sex = round(face['sex'], 0)
  255.         except:
  256.             sexprob = sex = 9
  257.  
  258.         try: age = face['age']
  259.         except: age = 0
  260.  
  261.         return {'sex': sex, 'sex_prob': sexprob, 'age': age, 'age_prob': face['confidence']}
  262.  
  263.  
  264. def jsonfromurl(url, attempts=5):
  265.     errorcount = 0
  266.     while True:
  267.         try:
  268.             ret_json = json.loads(urlopen(url).read())
  269.             break
  270.         except:
  271.             errorcount += 1
  272.             if errorcount < attempts:
  273.                 time.sleep(10)
  274.                 pass
  275.             else:
  276.                 ret_json = {}
  277.                 break
  278.     return ret_json
  279.  
  280.  
  281. TwitterThread().start()
  282. time.sleep(60)
  283. BitlyThread().start()
  284. time.sleep(60)
  285. FoursquareThread().start()
  286. time.sleep(60)
  287. ReKognitionThread().start()
  288. time.sleep(60)
  289. MonitorThread().start()
Add Comment
Please, Sign In to add comment