Advertisement
Guest User

Untitled

a guest
May 13th, 2016
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.04 KB | None | 0 0
  1. __author__ = 'Abhay Gupta'
  2. __version__ = 0.1
  3.  
  4. import pyhs2
  5. import time
  6. import MySQLdb
  7. import datetime
  8. import sys
  9. import pytz
  10.  
  11.  
  12. def DATE_TIME():
  13. return datetime.datetime.now(pytz.timezone('Asia/Calcutta'))
  14.  
  15. def FORMATTED_TIME():
  16. return datetime.datetime.strptime(str(DATE_TIME()).split('.')[0], '%Y-%m-%d %H:%M:%S')
  17.  
  18. #Spinner
  19. def DrawSpinner(counter):
  20. if counter % 4 == 0:
  21. sys.stdout.write("/")
  22. elif counter % 4 == 1:
  23. sys.stdout.write("-")
  24. elif counter % 4 == 2:
  25. sys.stdout.write("\\")
  26. elif counter % 4 == 3:
  27. sys.stdout.write("|")
  28. sys.stdout.flush()
  29. sys.stdout.write('\b')
  30.  
  31. #Generator
  32. def neighourhood(iterable):
  33. iterator = iter(iterable)
  34. prev = None
  35. item = iterator.next() # throws StopIteration if empty.
  36. for next in iterator:
  37. yield (prev,item,next)
  38. prev = item
  39. item = next
  40. yield (prev,item,None)
  41.  
  42.  
  43. #hive cursor
  44. def get_cursor():
  45. conn = pyhs2.connect(host='',
  46. port=10000,
  47. authMechanism="PLAIN",
  48. user='hadoop',
  49. password='',
  50. database='test')
  51. return conn.cursor()
  52.  
  53. def get_mysql_cursor():
  54. conn = MySQLdb.connect(user='db', passwd='',
  55. host='',
  56. db='test')
  57. return conn.cursor()
  58.  
  59. def get_records():
  60. cur = get_cursor()
  61. cur.execute("select * from user_location_history")
  62. #Fetch table results
  63. return cur.fetchall()
  64.  
  65. def get_user_movement():
  66. #Initializing
  67. location_dict = {}
  68. #Fetching all the records
  69. records = get_records()
  70. counter = 0
  71. for record in records:
  72. counter = counter + 1
  73. DrawSpinner(counter)
  74. if location_dict.has_key(record[1]):
  75. location_dict[record[1]].append(int(record[3]))
  76. else:
  77. location_dict[record[1]] = [int(record[3])]
  78. return location_dict
  79.  
  80. #For performance improvements use list instead of dictionary as we don't need count of the users here
  81. def prepare_movement_data():
  82. #Initializing
  83. city_movement_data_dict = {}
  84. user_location_dict = get_user_movement()
  85. counter = 0
  86. for user_id, user_movement_path in user_location_dict.iteritems():
  87. counter = counter + 1
  88. DrawSpinner(counter)
  89. if len(set(user_movement_path)) > 1:
  90. if city_movement_data_dict.has_key(tuple(user_movement_path)):
  91. city_movement_data_dict[tuple(user_movement_path)] = city_movement_data_dict[tuple(user_movement_path)] + 1
  92. else:
  93. city_movement_data_dict[tuple(user_movement_path)] = 1
  94. return city_movement_data_dict
  95.  
  96. def store_mining_results(unique_movement_map_tuple):
  97. sql_query = None
  98. insert_flag = False
  99. update_flag = False
  100. cur = get_mysql_cursor()
  101. for prev, current, next in neighourhood(unique_movement_map_tuple):
  102. #Execute query
  103. cur.execute('select * from user_flow_location')
  104. #Handling the empty table case
  105. fetched_data = cur.fetchall()
  106. if len(fetched_data) == 0 and prev is not None and current is not None and prev != current and current != next:
  107. sql_query = 'insert into user_flow_location(location_from, location_to, count)\
  108. values('+ str(prev) + ',' + str(current) + ', 1 )'
  109. cur.execute(sql_query)
  110. else:
  111. insert_flag = False
  112. update_flag = False
  113. for record in fetched_data:
  114. if record[2] == prev and record[3] == current:
  115. update_flag = True
  116. sql_query = 'update user_flow_location set count = ' + str(record[3] + 1) +\
  117. ' where id = ' + str(record[0])
  118. cur.execute(sql_query)
  119. elif prev is not None and current is not None and prev != current and current != next:
  120. insert_flag = True
  121. if update_flag == False and insert_flag == True:
  122. #Insert only if the entry doesn't exists
  123. #A quick fix. It can be improved later.
  124. cur2 = get_mysql_cursor()
  125. cur2.execute('select * from user_flow_location where location_from = ' +\
  126. str(prev) + ' and location_to = ' + str(current))
  127. if len(cur2.fetchall()) == 0:
  128. sql_query = 'insert into user_flow_location\
  129. (location_from, location_to, count)\
  130. values('+ str(prev) + ',' + str(current)+', 1)'
  131. cur.execute(sql_query)
  132. cur2.close()
  133. cur.close()
  134.  
  135. if __name__ == '__main__':
  136. start_time = time.time()
  137. #spinner = spinning_cursor()
  138. print 'Preparing data...'
  139. city_movement_data_dict = prepare_movement_data()
  140. sys.stdout.flush()
  141. sys.stdout.write('\b')
  142. print '\nData prepared for mining.'
  143. print 'Processing data and storing results...'
  144. counter = 0
  145. for unique_movement_map, unique_user_count in city_movement_data_dict.iteritems():
  146. #print str(unique_movement_map), ' : ', str(unique_user_count), ' user(s) relocated through this path'
  147. store_mining_results(unique_movement_map)
  148. counter = counter + 1
  149. DrawSpinner(counter)
  150. sys.stdout.flush()
  151. sys.stdout.write('\b')
  152. end_time = time.time()
  153. print '\nData mining complete!'
  154. print '\nTotal execution time = ', str(end_time - start_time), ' seconds\n'
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement