Advertisement
Guest User

Untitled

a guest
May 1st, 2016
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.40 KB | None | 0 0
  1. import os, pymongo, csv, time, json, random, fcntl, collections
  2.  
  3. from datetime import datetime
  4. from locust import Locust, TaskSet, task, events
  5. #from mongo_connection import MongoDbClient
  6.  
  7. from bson.code import Code
  8. from pymongo import MongoClient, GEO2D
  9.  
  10. import resource
  11.  
  12. resource.setrlimit(resource.RLIMIT_NOFILE, (999999, 999999))
  13.  
  14. split3 = '/home/segment_3/'
  15. split5 = '/home/segment_5/'
  16. split10 = '/home/segment_10/'
  17.  
  18. # DEFINE THE USED SPLIT METHOD
  19. split = split10
  20.  
  21. # Paths
  22. newestLocation = split + 'newest_location/'
  23. locationsForUserInInterval = split + 'locations_for_user_in_interval/'
  24. newestUserLocationsInArea = split + 'newest_user_locations_in_area/'
  25. locationsInAreaInInterval = split + 'locations_in_area_in_interval/'
  26. locationsInRadius = split + 'locations_in_radius/'
  27.  
  28. ### READ TEST FOR MONGO DB ###
  29. class UserBehavior(TaskSet):
  30. # Query arrays
  31. newestLocationQueries = []
  32. locationsForUserInIntervalQueries = []
  33. newestUserLocationsInAreaQueries = []
  34. locationsInAreaInIntervalQueries = []
  35. locationsInRadiusQueries = []
  36. # Query iterators
  37. newestLocationIterator = None
  38. locationsForUserInIntervalIterator = None
  39. newestUserLocationsInAreaIterator = None
  40. locationsInAreaInIntervalIterator = None
  41. locationsInRadiusIterator = None
  42.  
  43. # Query files used
  44. newestLocationDataFile = None
  45. locationsForUserInIntervalDataFile = None
  46. newestUserLocationsInAreaDataFile = None
  47. locationsInAreaInIntervalDataFile = None
  48. locationsInRadiusDataFile = None
  49.  
  50. newestLocationFiles = []
  51. locationsForUserInIntervalFiles = []
  52. newestUserLocationsInAreaFiles = []
  53. locationsInAreaInIntervalFiles = []
  54. locationsInRadiusFiles = []
  55.  
  56. mongo = None
  57.  
  58. def on_start(self):
  59. if (UserBehavior.mongo == None):
  60. print "init mongo"
  61. UserBehavior.mongo = MongoDbClient()
  62.  
  63. for fn in os.listdir(newestLocation):
  64. filePath = newestLocation + fn
  65. if os.path.isfile(filePath):
  66. self.newestLocationFiles.append(filePath)
  67.  
  68. for fn in os.listdir(locationsForUserInInterval):
  69. filePath = locationsForUserInInterval + fn
  70. if os.path.isfile(filePath):
  71. self.locationsForUserInIntervalFiles.append(filePath)
  72.  
  73. for fn in os.listdir(newestUserLocationsInArea):
  74. filePath = newestUserLocationsInArea + fn
  75. if os.path.isfile(filePath):
  76. self.newestUserLocationsInAreaFiles.append(filePath)
  77.  
  78. for fn in os.listdir(locationsInAreaInInterval):
  79. filePath = locationsInAreaInInterval + fn
  80. if os.path.isfile(filePath):
  81. self.locationsInAreaInIntervalFiles.append(filePath)
  82.  
  83. for fn in os.listdir(locationsInRadius):
  84. filePath = locationsInRadius + fn
  85. if os.path.isfile(filePath):
  86. self.locationsInRadiusFiles.append(filePath)
  87.  
  88. self.getNewestLocationQueriesData()
  89. self.getLocationsForUserInIntervalQueriesData()
  90. self.getNewestUserLocationsInAreaQueriesData()
  91. self.getLocationsInAreaInIntervalQueriesData()
  92. self.getLocationsInRadiusQueriesData()
  93.  
  94. @task(10)
  95. def getNewestLocation(self):
  96. try:
  97. queryData = self.newestLocationIterator.next()
  98. except StopIteration:
  99. self.getNewestLocationQueriesData()
  100. queryData = self.newestLocationIterator.next()
  101.  
  102. startTime = time.time()
  103. try:
  104. result = UserBehavior.mongo.getNewestLocation(queryData['userId'])
  105. except pymongo.errors.OperationFailure, e:
  106. totalTime = int((time.time() - startTime) * 1000)
  107. events.request_failure.fire(request_type='mongodb', name='getNewestLocation', response_time=totalTime, exception=e)
  108. else:
  109. totalTime = int((time.time() - startTime) * 1000)
  110. events.request_success.fire(request_type='mongodb', name='getNewestLocation', response_time=totalTime, response_length=0)
  111.  
  112. @task(10)
  113. def getNewestLocationsInArea(self):
  114. try:
  115. queryData = self.newestUserLocationsInAreaIterator.next()
  116. except StopIteration:
  117. self.getNewestUserLocationsInAreaQueriesData()
  118. queryData = self.newestUserLocationsInAreaIterator.next()
  119.  
  120. startTime = time.time()
  121. try:
  122. result = UserBehavior.mongo.getNewestLocationsInArea(queryData['lon1'], queryData['lat1'], queryData['lon2'], queryData['lat2'], queryData['lon3'], queryData['lat3'], queryData['lon4'], queryData['lat4'])
  123. result.close()
  124. except pymongo.errors.OperationFailure, e:
  125. totalTime = int((time.time() - startTime) * 1000)
  126. events.request_failure.fire(request_type='mongodb', name='getNewestLocationsInArea', response_time=totalTime, exception=e)
  127. else:
  128. totalTime = int((time.time() - startTime) * 1000)
  129. events.request_success.fire(request_type='mongodb', name='getNewestLocationsInArea', response_time=totalTime, response_length=0)
  130.  
  131. @task(7)
  132. def getLocationsAtTimeInterval(self):
  133. try:
  134. queryData = self.locationsForUserInIntervalIterator.next()
  135. except StopIteration:
  136. self.getLocationsForUserInIntervalQueriesData()
  137. queryData = self.locationsForUserInIntervalIterator.next()
  138.  
  139. startTime = time.time()
  140. try:
  141. result = UserBehavior.mongo.getLocationsAtTimeInterval(queryData['userId'], queryData['timestamp1'], queryData['timestamp2'])
  142. except pymongo.errors.OperationFailure, e:
  143. totalTime = int((time.time() - startTime) * 1000)
  144. events.request_failure.fire(request_type='mongodb', name='getLocationsAtTimeInterval', response_time=totalTime, exception=e)
  145. else:
  146. totalTime = int((time.time() - startTime) * 1000)
  147. events.request_success.fire(request_type='mongodb', name='getLocationsAtTimeInterval', response_time=totalTime, response_length=0)
  148.  
  149. @task(5)
  150. def getLocationsInAreaAtTimeInterval(self):
  151. try:
  152. queryData = self.locationsInAreaInIntervalIterator.next()
  153. except StopIteration:
  154. self.getLocationsInAreaInIntervalQueriesData()
  155. queryData = self.locationsInAreaInIntervalIterator.next()
  156.  
  157. startTime = time.time()
  158. try:
  159. result = UserBehavior.mongo.getLocationsInAreaAtTimeInterval(queryData['lon1'], queryData['lat1'], queryData['lon2'], queryData['lat2'], queryData['lon3'], queryData['lat3'], queryData['lon4'], queryData['lat4'], queryData['timestamp1'], queryData['timestamp2'])
  160. except pymongo.errors.OperationFailure, e:
  161. totalTime = int((time.time() - startTime) * 1000)
  162. events.request_failure.fire(request_type='mongodb', name='getLocationsInAreaAtTimeInterval', response_time=totalTime, exception=e)
  163. else:
  164. totalTime = int((time.time() - startTime) * 1000)
  165. events.request_success.fire(request_type='mongodb', name='getLocationsInAreaAtTimeInterval', response_time=totalTime, response_length=0)
  166.  
  167. @task(5)
  168. def getAllLocationsInRadius(self):
  169. try:
  170. queryData = self.locationsInRadiusIterator.next()
  171. except StopIteration:
  172. self.getLocationsInRadiusQueriesData()
  173. queryData = self.locationsInRadiusIterator.next()
  174.  
  175. startTime = time.time()
  176. try:
  177. result = UserBehavior.mongo.getAllLocationsInRadius(queryData['lon'], queryData['lat'], queryData['radius'])
  178. except pymongo.errors.OperationFailure, e:
  179. totalTime = int((time.time() - startTime) * 1000)
  180. events.request_failure.fire(request_type='mongodb', name='getAllLocationsInRadius', response_time=totalTime, exception=e)
  181. else:
  182. totalTime = int((time.time() - startTime) * 1000)
  183. events.request_success.fire(request_type='mongodb', name='getAllLocationsInRadius', response_time=totalTime, response_length=0)
  184.  
  185.  
  186. ### Parser functions
  187. def getNewestLocationQueriesData(self):
  188. self.newestLocationQueries = []
  189.  
  190. if self.newestLocationDataFile != None:
  191. fcntl.lockf(self.newestLocationDataFile, fcntl.LOCK_UN)
  192. self.newestLocationDataFile.close()
  193.  
  194. f = None
  195. startIdx = random.randint(0, len(self.newestLocationFiles) - 1)
  196. idx = startIdx
  197. while f == None:
  198. try:
  199. chosenFile = self.newestLocationFiles[idx]
  200. f = open(chosenFile, "r+")
  201. fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
  202. except IOError as e:
  203. f = None
  204. idx = idx + 1
  205.  
  206. if idx == len(self.newestLocationFiles):
  207. idx = 0
  208. elif idx == startIdx:
  209. print "no more files"
  210. raise StopLocust()
  211.  
  212. time.sleep(0.1)
  213. continue
  214.  
  215. reader = csv.reader(f)
  216. for row in reader:
  217. # Parse data
  218. userId = row[0]
  219. self.newestLocationQueries.append({'userId':userId})
  220.  
  221. self.newestLocationIterator = self.newestLocationQueries.__iter__()
  222.  
  223. def getLocationsForUserInIntervalQueriesData(self):
  224. self.locationsForUserInIntervalQueries = []
  225.  
  226. if self.locationsForUserInIntervalDataFile != None:
  227. fcntl.lockf(self.locationsForUserInIntervalDataFile, fcntl.LOCK_UN)
  228. self.locationsForUserInIntervalDataFile.close()
  229.  
  230. f = None
  231. startIdx = random.randint(0, len(self.locationsForUserInIntervalFiles) - 1)
  232. idx = startIdx
  233. while f == None:
  234. try:
  235. chosenFile = self.locationsForUserInIntervalFiles[idx]
  236. f = open(chosenFile, "r+")
  237. fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
  238. except IOError as e:
  239. f = None
  240. idx = idx + 1
  241.  
  242. if idx == len(self.locationsForUserInIntervalFiles):
  243. idx = 0
  244. elif idx == startIdx:
  245. print "no more files"
  246. raise StopLocust()
  247.  
  248. time.sleep(0.1)
  249. continue
  250.  
  251. reader = csv.reader(f)
  252. for row in reader:
  253. # Parse data
  254. userId = row[0]
  255. timeString1 = row[1][:19]
  256. year = timeString1[:4]
  257. month = timeString1[5:7]
  258. day = timeString1[8:10]
  259. hour = timeString1[11:13]
  260. minutes = timeString1[14:16]
  261. seconds = timeString1[17:19]
  262.  
  263. timeString2 = row[2][:19]
  264. year2 = timeString2[:4]
  265. month2 = timeString2[5:7]
  266. day2 = timeString2[8:10]
  267. hour2 = timeString2[11:13]
  268. minutes2 = timeString2[14:16]
  269. seconds2 = timeString2[17:19]
  270.  
  271. dateTimeObject1 = datetime(int(year),int(month),int(day), int(hour), int(minutes), int(seconds))
  272. dateTimeObject2 = datetime(int(year2),int(month2),int(day2), int(hour2), int(minutes2), int(seconds2))
  273.  
  274. self.locationsForUserInIntervalQueries.append({'userId':userId, 'timestamp1':dateTimeObject1, 'timestamp2':dateTimeObject2})
  275.  
  276. self.locationsForUserInIntervalIterator = self.locationsForUserInIntervalQueries.__iter__()
  277.  
  278. def getNewestUserLocationsInAreaQueriesData(self):
  279. self.newestUserLocationsInAreaQueries = []
  280.  
  281. if self.newestUserLocationsInAreaDataFile != None:
  282. fcntl.lockf(self.newestUserLocationsInAreaDataFile, fcntl.LOCK_UN)
  283. self.newestUserLocationsInAreaDataFile.close()
  284.  
  285. f = None
  286. startIdx = random.randint(0, len(self.newestUserLocationsInAreaFiles) - 1)
  287. idx = startIdx
  288. while f == None:
  289. try:
  290. chosenFile = self.newestUserLocationsInAreaFiles[idx]
  291. f = open(chosenFile, "r+")
  292. fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
  293. except IOError as e:
  294. f = None
  295. idx = idx + 1
  296.  
  297. if idx == len(self.newestUserLocationsInAreaFiles):
  298. idx = 0
  299. elif idx == startIdx:
  300. print "no more files"
  301. raise StopLocust()
  302.  
  303. time.sleep(0.1)
  304. continue
  305.  
  306. reader = csv.reader(f)
  307. for row in reader:
  308. # Parse data
  309. lat1 = float(row[0])
  310. lon1 = float(row[1])
  311. lat2 = float(row[2])
  312. lon2 = float(row[3])
  313. lat3 = float(row[4])
  314. lon3 = float(row[5])
  315. lat4 = float(row[6])
  316. lon4 = float(row[7])
  317.  
  318. self.newestUserLocationsInAreaQueries.append({'lat1':lat1, 'lon1':lon1, 'lat2':lat2, 'lon2':lon2, 'lat3':lat3, 'lon3':lon3, 'lat4':lat4, 'lon4':lon4})
  319.  
  320. self.newestUserLocationsInAreaIterator = self.newestUserLocationsInAreaQueries.__iter__()
  321.  
  322. def getLocationsInAreaInIntervalQueriesData(self):
  323. self.locationsInAreaInIntervalQueries = []
  324.  
  325. if self.locationsInAreaInIntervalDataFile != None:
  326. fcntl.lockf(self.locationsInAreaInIntervalDataFile, fcntl.LOCK_UN)
  327. self.locationsInAreaInIntervalDataFile.close()
  328.  
  329. f = None
  330. startIdx = random.randint(0, len(self.locationsInAreaInIntervalFiles) - 1)
  331. idx = startIdx
  332. while f == None:
  333. try:
  334. chosenFile = self.locationsInAreaInIntervalFiles[idx]
  335. f = open(chosenFile, "r+")
  336. fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
  337. except IOError as e:
  338. f = None
  339. idx = idx + 1
  340.  
  341. if idx == len(self.locationsInAreaInIntervalFiles):
  342. idx = 0
  343. elif idx == startIdx:
  344. print "no more files"
  345. raise StopLocust()
  346.  
  347. time.sleep(0.1)
  348. continue
  349.  
  350. reader = csv.reader(f)
  351. for row in reader:
  352. # Parse data
  353. lat1 = float(row[0])
  354. lon1 = float(row[1])
  355. lat2 = float(row[2])
  356. lon2 = float(row[3])
  357. lat3 = float(row[4])
  358. lon3 = float(row[5])
  359. lat4 = float(row[6])
  360. lon4 = float(row[7])
  361.  
  362. timeString1 = row[8][:19]
  363. year = timeString1[:4]
  364. month = timeString1[5:7]
  365. day = timeString1[8:10]
  366. hour = timeString1[11:13]
  367. minutes = timeString1[14:16]
  368. seconds = timeString1[17:19]
  369.  
  370. timeString2 = row[9][:19]
  371. year2 = timeString2[:4]
  372. month2 = timeString2[5:7]
  373. day2 = timeString2[8:10]
  374. hour2 = timeString2[11:13]
  375. minutes2 = timeString2[14:16]
  376. seconds2 = timeString2[17:19]
  377.  
  378. dateTimeObject1 = datetime(int(year),int(month),int(day), int(hour), int(minutes), int(seconds))
  379. dateTimeObject2 = datetime(int(year2),int(month2),int(day2), int(hour2), int(minutes2), int(seconds2))
  380.  
  381. self.locationsInAreaInIntervalQueries.append({'lat1':lat1, 'lon1':lon1, 'lat2':lat2, 'lon2':lon2, 'lat3':lat3, 'lon3':lon3, 'lat4':lat4, 'lon4':lon4, 'timestamp1':dateTimeObject1, 'timestamp2':dateTimeObject2})
  382.  
  383. self.locationsInAreaInIntervalIterator = self.locationsInAreaInIntervalQueries.__iter__()
  384.  
  385. def getLocationsInRadiusQueriesData(self):
  386. self.locationsInRadiusQueries = []
  387.  
  388. if self.locationsInRadiusDataFile != None:
  389. fcntl.lockf(self.locationsInRadiusDataFile, fcntl.LOCK_UN)
  390. self.locationsInRadiusDataFile.close()
  391.  
  392. f = None
  393. startIdx = random.randint(0, len(self.locationsInRadiusFiles) - 1)
  394. idx = startIdx
  395. while f == None:
  396. try:
  397. chosenFile = self.locationsInRadiusFiles[idx]
  398. f = open(chosenFile, "r+")
  399. fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
  400. except IOError as e:
  401. f = None
  402. idx = idx + 1
  403.  
  404. if idx == len(self.locationsInRadiusFiles):
  405. idx = 0
  406. elif idx == startIdx:
  407. print "no more files"
  408. raise StopLocust()
  409.  
  410. time.sleep(0.1)
  411. continue
  412.  
  413. reader = csv.reader(f)
  414. for row in reader:
  415. # Parse data
  416. lat = float(row[0])
  417. lon = float(row[1])
  418. radius = float(row[2]) / 1.609344 # Conversion from km to miles
  419. self.locationsInRadiusQueries.append({'lat':lat, 'lon':lon, 'radius':radius})
  420.  
  421. self.locationsInRadiusIterator = self.locationsInRadiusQueries.__iter__()
  422.  
  423. class WebsiteUser(Locust):
  424. task_set = UserBehavior
  425. min_wait = 4000
  426. max_wait = 5000
  427.  
  428. class MongoDbClient():
  429. mongoClient = None
  430. db = None
  431. coll = None
  432.  
  433. def __init__(self):
  434. MongoDbClient.mongoClient = MongoClient('172.16.0.76', 27019, maxPoolSize = 1000)
  435. MongoDbClient.db = MongoDbClient.mongoClient['sw603']
  436. MongoDbClient.coll = MongoDbClient.db['locs']
  437.  
  438. ### Queries ###
  439. def saveLocation(self, userId, timestamp, lon, lat):
  440. return MongoDbClient.coll.insert_one({'timestamp':timestamp, 'userId':userId, 'location':{'type':'Point', 'coordinates':[lon, lat]}})
  441.  
  442. def getNewestLocation(self, userId):
  443. return MongoDbClient.coll.find({'userId':userId}).sort('timestamp', pymongo.DESCENDING).limit(1)
  444.  
  445. def getNewestLocationsInArea(self, lon1, lat1, lon2, lat2, lon3, lat3, lon4, lat4):
  446. query = {'location':{'$geoWithin':{'$geometry':{'type':'Polygon','coordinates':[[[lon1,lat1],[lon2,lat2],[lon3,lat3],[lon4,lat4],[lon1,lat1]]]}}}}
  447. #sort = collections.OrderedDict([("userId", pymongo.ASCENDING), ("timestamp", pymongo.DESCENDING)])
  448.  
  449. #pipeline = [
  450. #{"$match": query},
  451. #{"$sort":sort},
  452. # {"$group": {
  453. # "_id": "$userId",
  454. # "userId": {"$first":"$userId"},
  455. # "timestamp": {"$first":"$timestamp"},
  456. # "location": {"$first":"$location"}
  457. # }}
  458. ]
  459.  
  460. #return MongoDbClient.coll.aggregate(pipeline, maxTimeMS=5000)
  461.  
  462. # map
  463. map = {}
  464. for loc in MongoDbClient.coll.find(query):
  465. try:
  466. map[loc['userId']].append(loc)
  467. except KeyError:
  468. map[loc['userId']] = [loc]
  469.  
  470. # reduce
  471. result = {}
  472. for userId in map.keys():
  473. mostRecent = None
  474. for loc in map[userId]:
  475. if mostRecent == None:
  476. mostRecent = loc['timestamp']
  477. result[userId] = loc
  478. elif mostRecent < loc['timestamp']:
  479. mostRecent = loc['timestamp']
  480. result[userId] = loc
  481.  
  482. return result
  483.  
  484. def getLocationsAtTimeInterval(self, userId, time1, time2):
  485. return MongoDbClient.coll.find({'userId':userId,'timestamp':{'$gte':time1, '$lte':time2}})
  486.  
  487. def getLocationsInAreaAtTimeInterval(self, lon1, lat1, lon2, lat2, lon3, lat3, lon4, lat4, time1, time2):
  488. return MongoDbClient.coll.find({'location':{'$geoWithin':{'$geometry':{'type':'Polygon','coordinates':[[[lon1,lat1],[lon2,lat2],[lon3,lat3],[lon4,lat4],[lon1,lat1]]]}}},'timestamp':{'$gte':time1, '$lte':time2}}).sort('userId', pymongo.ASCENDING)
  489.  
  490. def getAllLocationsInRadius(self, lon, lat, radius):
  491. return MongoDbClient.coll.find({'location': {'$geoWithin': {'$centerSphere':[[lon,lat],radius / 3963.2]}}})
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement