abhaystoic

Track Human Movement

May 13th, 2016
46
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 6.20 KB | None | 0 0
  1. __author__ = 'Abhay Gupta'
  2. __version__ = 0.1
  3.  
  4. import pyhs2
  5. import time
  6. import MySQLdb
  7. import datetime
  8. import sys
  9. import pytz
  10.  
  11.  
  12. def DATE_TIME():
  13.     return datetime.datetime.now(pytz.timezone('Asia/Calcutta'))
  14.  
  15. def FORMATTED_TIME():
  16.     return datetime.datetime.strptime(str(DATE_TIME()).split('.')[0], '%Y-%m-%d %H:%M:%S')
  17.  
  18. #Spinner
  19. def DrawSpinner(counter):
  20.     if counter % 4 == 0:
  21.         sys.stdout.write("/")
  22.     elif counter % 4 == 1:
  23.         sys.stdout.write("-")
  24.     elif counter % 4 == 2:
  25.         sys.stdout.write("\\")
  26.     elif counter % 4 == 3:
  27.         sys.stdout.write("|")
  28.     sys.stdout.flush()
  29.     sys.stdout.write('\b')
  30.  
  31. #Generator
  32. def neighourhood(iterable):
  33.     iterator = iter(iterable)
  34.     prev = None
  35.     item = iterator.next()  # throws StopIteration if empty.
  36.     for next in iterator:
  37.         yield (prev,item,next)
  38.         prev = item
  39.         item = next
  40.     yield (prev,item,None)
  41.  
  42.  
  43. #hive cursor
  44. def get_cursor():
  45.         conn = pyhs2.connect(host='',
  46.                port=10000,
  47.                authMechanism="PLAIN",
  48.                user='hadoop',
  49.                password='',
  50.                database='test')
  51.         return conn.cursor()
  52.  
  53. def get_mysql_cursor():
  54.         conn = MySQLdb.connect(user='db', passwd='',
  55.                               host='',
  56.                               db='test')
  57.         return conn.cursor()
  58.  
  59. def get_records():
  60.         cur = get_cursor()
  61.         cur.execute("select * from user_location_history")
  62.         #Fetch table results
  63.         return cur.fetchall()
  64.  
  65. def get_user_movement():
  66.         #Initializing
  67.         location_dict = {}
  68.         #Fetching all the records
  69.         records = get_records()
  70.         counter = 0
  71.         for record in records:
  72.                 counter = counter + 1
  73.                 DrawSpinner(counter)
  74.                 if location_dict.has_key(record[1]):
  75.                         location_dict[record[1]].append(int(record[3]))
  76.                 else:
  77.                         location_dict[record[1]] = [int(record[3])]
  78.         return location_dict
  79.  
  80. #For performance improvements use list instead of dictionary as we don't need count of the users here
  81. def prepare_movement_data():
  82.         #Initializing
  83.         city_movement_data_dict = {}
  84.         user_location_dict = get_user_movement()
  85. counter = 0
  86.         for user_id, user_movement_path in user_location_dict.iteritems():
  87.                 counter = counter + 1
  88.                 DrawSpinner(counter)
  89.                 if len(set(user_movement_path)) > 1:
  90.                         if city_movement_data_dict.has_key(tuple(user_movement_path)):
  91.                                 city_movement_data_dict[tuple(user_movement_path)] = city_movement_data_dict[tuple(user_movement_path)] + 1
  92.                         else:
  93.                                 city_movement_data_dict[tuple(user_movement_path)] = 1
  94.         return city_movement_data_dict
  95.  
  96. def store_mining_results(unique_movement_map_tuple):
  97.         sql_query = None
  98.         insert_flag = False
  99.         update_flag = False
  100.         cur = get_mysql_cursor()
  101.         for prev, current, next in neighourhood(unique_movement_map_tuple):
  102.                 #Execute query
  103.                 cur.execute('select * from talent_flow_location')
  104.                 #Handling the empty table case
  105.                 fetched_data = cur.fetchall()
  106.                 if len(fetched_data) == 0 and prev is not None and current is not None and prev != current and current != next:
  107.                         sql_query = 'insert into talent_flow_location(location_from, location_to, count)\
  108.                                                values('+  str(prev) + ',' + str(current) + ', 1 )'
  109.                         cur.execute(sql_query)
  110.                 else:
  111.                         insert_flag = False
  112.                         update_flag = False
  113.                         for record in fetched_data:
  114.                                 if record[2] == prev and record[3] == current:
  115.                                         update_flag = True
  116.                                         sql_query = 'update talent_flow_location set count = ' + str(record[3] + 1) +\
  117.                                         ' where id = ' + str(record[0])
  118.                                         cur.execute(sql_query)
  119.                                 elif prev is not None and current is not None and prev != current and current != next:
  120.                                         insert_flag = True
  121.                 if update_flag == False and insert_flag == True:
  122.                         #Insert only if the entry doesn't exists
  123.                         #A quick fix. It can be improved later.
  124.                         cur2 = get_mysql_cursor()
  125.                         cur2.execute('select * from talent_flow_location where location_from = ' +\
  126.                                          str(prev) + ' and location_to = ' + str(current))
  127.                         if len(cur2.fetchall()) == 0:
  128.                                 sql_query = 'insert into talent_flow_location\
  129.                                                (location_from, location_to, count)\
  130.                                                values('+  str(prev) + ',' + str(current)+', 1)'
  131.                                 cur.execute(sql_query)
  132.                         cur2.close()
  133.         cur.close()
  134.  
  135. if __name__ == '__main__':
  136.         start_time = time.time()
  137.         #spinner = spinning_cursor()
  138.         print 'Preparing data...'
  139.         city_movement_data_dict = prepare_movement_data()
  140.         sys.stdout.flush()
  141.         sys.stdout.write('\b')
  142.         print '\nData prepared for mining.'
  143.         print 'Processing data and storing results...'
  144.         counter = 0
  145.         for unique_movement_map, unique_user_count in city_movement_data_dict.iteritems():
  146.                 #print str(unique_movement_map), ' : ', str(unique_user_count), ' user(s) relocated through this path'
  147.                 store_mining_results(unique_movement_map)
  148.                 counter = counter + 1
  149.                 DrawSpinner(counter)
  150.         sys.stdout.flush()
  151.         sys.stdout.write('\b')
  152.         end_time = time.time()
  153.         print '\nData mining complete!'
  154.         print '\nTotal execution time = ', str(end_time - start_time), ' seconds\n'
Add Comment
Please, Sign In to add comment