Advertisement
Guest User

Untitled

a guest
May 1st, 2016
51
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 21.87 KB | None | 0 0
  1. import os, pymongo, csv, time, json, random, fcntl, collections
  2.  
  3. from datetime import datetime
  4. from locust import Locust, TaskSet, task, events
  5. #from mongo_connection import MongoDbClient
  6.  
  7. from bson.code import Code
  8. from pymongo import MongoClient, GEO2D
  9.  
  10. import resource
  11.  
  12. resource.setrlimit(resource.RLIMIT_NOFILE, (999999, 999999))
  13.  
  14. split3 = '/home/segment_3/'
  15. split5 = '/home/segment_5/'
  16. split10 = '/home/segment_10/'
  17.  
  18. # DEFINE THE USED SPLIT METHOD
  19. split = split10
  20.  
  21. # Paths
  22. newestLocation = split + 'newest_location/'
  23. locationsForUserInInterval = split + 'locations_for_user_in_interval/'
  24. newestUserLocationsInArea = split + 'newest_user_locations_in_area/'
  25. locationsInAreaInInterval = split + 'locations_in_area_in_interval/'
  26. locationsInRadius = split + 'locations_in_radius/'
  27.  
  28. ### READ TEST FOR MONGO DB ###
  29. class UserBehavior(TaskSet):
  30. # Query arrays
  31. newestLocationQueries = []
  32. locationsForUserInIntervalQueries = []
  33. newestUserLocationsInAreaQueries = []
  34. locationsInAreaInIntervalQueries = []
  35. locationsInRadiusQueries = []
  36. # Query iterators
  37. newestLocationIterator = None
  38. locationsForUserInIntervalIterator = None
  39. newestUserLocationsInAreaIterator = None
  40. locationsInAreaInIntervalIterator = None
  41. locationsInRadiusIterator = None
  42.  
  43. # Query files used
  44. newestLocationDataFile = None
  45. locationsForUserInIntervalDataFile = None
  46. newestUserLocationsInAreaDataFile = None
  47. locationsInAreaInIntervalDataFile = None
  48. locationsInRadiusDataFile = None
  49.  
  50. newestLocationFiles = []
  51. locationsForUserInIntervalFiles = []
  52. newestUserLocationsInAreaFiles = []
  53. locationsInAreaInIntervalFiles = []
  54. locationsInRadiusFiles = []
  55.  
  56. mongo = None
  57.  
  58. def on_start(self):
  59. if (UserBehavior.mongo == None):
  60. print "init mongo"
  61. UserBehavior.mongo = MongoDbClient()
  62.  
  63. for fn in os.listdir(newestLocation):
  64. filePath = newestLocation + fn
  65. if os.path.isfile(filePath):
  66. self.newestLocationFiles.append(filePath)
  67.  
  68. for fn in os.listdir(locationsForUserInInterval):
  69. filePath = locationsForUserInInterval + fn
  70. if os.path.isfile(filePath):
  71. self.locationsForUserInIntervalFiles.append(filePath)
  72.  
  73. for fn in os.listdir(newestUserLocationsInArea):
  74. filePath = newestUserLocationsInArea + fn
  75. if os.path.isfile(filePath):
  76. self.newestUserLocationsInAreaFiles.append(filePath)
  77.  
  78. for fn in os.listdir(locationsInAreaInInterval):
  79. filePath = locationsInAreaInInterval + fn
  80. if os.path.isfile(filePath):
  81. self.locationsInAreaInIntervalFiles.append(filePath)
  82.  
  83. for fn in os.listdir(locationsInRadius):
  84. filePath = locationsInRadius + fn
  85. if os.path.isfile(filePath):
  86. self.locationsInRadiusFiles.append(filePath)
  87.  
  88. self.getNewestLocationQueriesData()
  89. self.getLocationsForUserInIntervalQueriesData()
  90. self.getNewestUserLocationsInAreaQueriesData()
  91. self.getLocationsInAreaInIntervalQueriesData()
  92. self.getLocationsInRadiusQueriesData()
  93.  
  94. @task(10)
  95. def getNewestLocation(self):
  96. try:
  97. queryData = self.newestLocationIterator.next()
  98. except StopIteration:
  99. self.getNewestLocationQueriesData()
  100. queryData = self.newestLocationIterator.next()
  101.  
  102. startTime = time.time()
  103. try:
  104. result = UserBehavior.mongo.getNewestLocation(queryData['userId'])
  105. doc = None
  106. for d in result:
  107. doc = d
  108. result.close()
  109. except pymongo.errors.OperationFailure, e:
  110. totalTime = int((time.time() - startTime) * 1000)
  111. events.request_failure.fire(request_type='mongodb', name='getNewestLocation', response_time=totalTime, exception=e)
  112. else:
  113. totalTime = int((time.time() - startTime) * 1000)
  114. events.request_success.fire(request_type='mongodb', name='getNewestLocation', response_time=totalTime, response_length=0)
  115.  
  116. @task(1)
  117. def getNewestLocationsInArea(self):
  118. try:
  119. queryData = self.newestUserLocationsInAreaIterator.next()
  120. except StopIteration:
  121. self.getNewestUserLocationsInAreaQueriesData()
  122. queryData = self.newestUserLocationsInAreaIterator.next()
  123.  
  124. startTime = time.time()
  125. try:
  126. result = UserBehavior.mongo.getNewestLocationsInArea(queryData['lon1'], queryData['lat1'], queryData['lon2'], queryData['lat2'], queryData['lon3'], queryData['lat3'], queryData['lon4'], queryData['lat4'])
  127. doc = None
  128. for d in result:
  129. doc = d
  130. result.close()
  131. except pymongo.errors.OperationFailure, e:
  132. totalTime = int((time.time() - startTime) * 1000)
  133. events.request_failure.fire(request_type='mongodb', name='getNewestLocationsInArea', response_time=totalTime, exception=e)
  134. else:
  135. totalTime = int((time.time() - startTime) * 1000)
  136. events.request_success.fire(request_type='mongodb', name='getNewestLocationsInArea', response_time=totalTime, response_length=0)
  137.  
  138. @task(7)
  139. def getLocationsAtTimeInterval(self):
  140. try:
  141. queryData = self.locationsForUserInIntervalIterator.next()
  142. except StopIteration:
  143. self.getLocationsForUserInIntervalQueriesData()
  144. queryData = self.locationsForUserInIntervalIterator.next()
  145.  
  146. startTime = time.time()
  147. try:
  148. result = UserBehavior.mongo.getLocationsAtTimeInterval(queryData['userId'], queryData['timestamp1'], queryData['timestamp2'])
  149. doc = None
  150. for d in result:
  151. doc = d
  152. result.close()
  153. except pymongo.errors.OperationFailure, e:
  154. totalTime = int((time.time() - startTime) * 1000)
  155. events.request_failure.fire(request_type='mongodb', name='getLocationsAtTimeInterval', response_time=totalTime, exception=e)
  156. else:
  157. totalTime = int((time.time() - startTime) * 1000)
  158. events.request_success.fire(request_type='mongodb', name='getLocationsAtTimeInterval', response_time=totalTime, response_length=0)
  159.  
  160. @task(5)
  161. def getLocationsInAreaAtTimeInterval(self):
  162. try:
  163. queryData = self.locationsInAreaInIntervalIterator.next()
  164. except StopIteration:
  165. self.getLocationsInAreaInIntervalQueriesData()
  166. queryData = self.locationsInAreaInIntervalIterator.next()
  167.  
  168. startTime = time.time()
  169. try:
  170. result = UserBehavior.mongo.getLocationsInAreaAtTimeInterval(queryData['lon1'], queryData['lat1'], queryData['lon2'], queryData['lat2'], queryData['lon3'], queryData['lat3'], queryData['lon4'], queryData['lat4'], queryData['timestamp1'], queryData['timestamp2'])
  171. doc = None
  172. for d in result:
  173. doc = d
  174. result.close()
  175. except pymongo.errors.OperationFailure, e:
  176. totalTime = int((time.time() - startTime) * 1000)
  177. events.request_failure.fire(request_type='mongodb', name='getLocationsInAreaAtTimeInterval', response_time=totalTime, exception=e)
  178. else:
  179. totalTime = int((time.time() - startTime) * 1000)
  180. events.request_success.fire(request_type='mongodb', name='getLocationsInAreaAtTimeInterval', response_time=totalTime, response_length=0)
  181.  
  182. @task(5)
  183. def getAllLocationsInRadius(self):
  184. try:
  185. queryData = self.locationsInRadiusIterator.next()
  186. except StopIteration:
  187. self.getLocationsInRadiusQueriesData()
  188. queryData = self.locationsInRadiusIterator.next()
  189.  
  190. startTime = time.time()
  191. try:
  192. result = UserBehavior.mongo.getAllLocationsInRadius(queryData['lon'], queryData['lat'], queryData['radius'])
  193. doc = None
  194. for d in result:
  195. doc = d
  196. result.close()
  197. except pymongo.errors.OperationFailure, e:
  198. totalTime = int((time.time() - startTime) * 1000)
  199. events.request_failure.fire(request_type='mongodb', name='getAllLocationsInRadius', response_time=totalTime, exception=e)
  200. else:
  201. totalTime = int((time.time() - startTime) * 1000)
  202. events.request_success.fire(request_type='mongodb', name='getAllLocationsInRadius', response_time=totalTime, response_length=0)
  203.  
  204.  
  205. ### Parser functions
  206. def getNewestLocationQueriesData(self):
  207. self.newestLocationQueries = []
  208.  
  209. if self.newestLocationDataFile != None:
  210. fcntl.lockf(self.newestLocationDataFile, fcntl.LOCK_UN)
  211. self.newestLocationDataFile.close()
  212.  
  213. self.newestLocationDataFile = None
  214. startIdx = random.randint(0, len(self.newestLocationFiles) - 1)
  215. idx = startIdx
  216. while self.newestLocationDataFile == None:
  217. try:
  218. chosenFile = self.newestLocationFiles[idx]
  219. self.newestLocationDataFile = open(chosenFile, "r+")
  220. fcntl.lockf(self.newestLocationDataFile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  221. except IOError as e:
  222. self.newestLocationDataFile = None
  223. idx = idx + 1
  224.  
  225. if idx == len(self.newestLocationFiles):
  226. idx = 0
  227. elif idx == startIdx:
  228. print "no more files"
  229. raise StopLocust()
  230.  
  231. time.sleep(0.1)
  232. continue
  233.  
  234. reader = csv.reader(self.newestLocationDataFile)
  235. for row in reader:
  236. # Parse data
  237. userId = row[0]
  238. self.newestLocationQueries.append({'userId':userId})
  239.  
  240. self.newestLocationIterator = self.newestLocationQueries.__iter__()
  241.  
  242. def getLocationsForUserInIntervalQueriesData(self):
  243. self.locationsForUserInIntervalQueries = []
  244.  
  245. if self.locationsForUserInIntervalDataFile != None:
  246. fcntl.lockf(self.locationsForUserInIntervalDataFile, fcntl.LOCK_UN)
  247. self.locationsForUserInIntervalDataFile.close()
  248.  
  249. self.locationsForUserInIntervalDataFile = None
  250. startIdx = random.randint(0, len(self.locationsForUserInIntervalFiles) - 1)
  251. idx = startIdx
  252. while self.locationsForUserInIntervalDataFile == None:
  253. try:
  254. chosenFile = self.locationsForUserInIntervalFiles[idx]
  255. self.locationsForUserInIntervalDataFile = open(chosenFile, "r+")
  256. fcntl.lockf(self.locationsForUserInIntervalDataFile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  257. except IOError as e:
  258. self.locationsForUserInIntervalDataFile = None
  259. idx = idx + 1
  260.  
  261. if idx == len(self.locationsForUserInIntervalFiles):
  262. idx = 0
  263. elif idx == startIdx:
  264. print "no more files"
  265. raise StopLocust()
  266.  
  267. time.sleep(0.1)
  268. continue
  269.  
  270. reader = csv.reader(self.locationsForUserInIntervalDataFile)
  271. for row in reader:
  272. # Parse data
  273. userId = row[0]
  274. timeString1 = row[1][:19]
  275. year = timeString1[:4]
  276. month = timeString1[5:7]
  277. day = timeString1[8:10]
  278. hour = timeString1[11:13]
  279. minutes = timeString1[14:16]
  280. seconds = timeString1[17:19]
  281.  
  282. timeString2 = row[2][:19]
  283. year2 = timeString2[:4]
  284. month2 = timeString2[5:7]
  285. day2 = timeString2[8:10]
  286. hour2 = timeString2[11:13]
  287. minutes2 = timeString2[14:16]
  288. seconds2 = timeString2[17:19]
  289.  
  290. dateTimeObject1 = datetime(int(year),int(month),int(day), int(hour), int(minutes), int(seconds))
  291. dateTimeObject2 = datetime(int(year2),int(month2),int(day2), int(hour2), int(minutes2), int(seconds2))
  292.  
  293. self.locationsForUserInIntervalQueries.append({'userId':userId, 'timestamp1':dateTimeObject1, 'timestamp2':dateTimeObject2})
  294.  
  295. self.locationsForUserInIntervalIterator = self.locationsForUserInIntervalQueries.__iter__()
  296.  
  297. def getNewestUserLocationsInAreaQueriesData(self):
  298. self.newestUserLocationsInAreaQueries = []
  299.  
  300. if self.newestUserLocationsInAreaDataFile != None:
  301. fcntl.lockf(self.newestUserLocationsInAreaDataFile, fcntl.LOCK_UN)
  302. self.newestUserLocationsInAreaDataFile.close()
  303.  
  304. self.newestUserLocationsInAreaDataFile = None
  305. startIdx = random.randint(0, len(self.newestUserLocationsInAreaFiles) - 1)
  306. idx = startIdx
  307. while self.newestUserLocationsInAreaDataFile == None:
  308. try:
  309. chosenFile = self.newestUserLocationsInAreaFiles[idx]
  310. self.newestUserLocationsInAreaDataFile = open(chosenFile, "r+")
  311. fcntl.lockf(self.newestUserLocationsInAreaDataFile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  312. except IOError as e:
  313. self.newestUserLocationsInAreaDataFile = None
  314. idx = idx + 1
  315.  
  316. if idx == len(self.newestUserLocationsInAreaFiles):
  317. idx = 0
  318. elif idx == startIdx:
  319. print "no more files"
  320. raise StopLocust()
  321.  
  322. time.sleep(0.1)
  323. continue
  324.  
  325. reader = csv.reader(self.newestUserLocationsInAreaDataFile)
  326. for row in reader:
  327. # Parse data
  328. lat1 = float(row[0])
  329. lon1 = float(row[1])
  330. lat2 = float(row[2])
  331. lon2 = float(row[3])
  332. lat3 = float(row[4])
  333. lon3 = float(row[5])
  334. lat4 = float(row[6])
  335. lon4 = float(row[7])
  336.  
  337. self.newestUserLocationsInAreaQueries.append({'lat1':lat1, 'lon1':lon1, 'lat2':lat2, 'lon2':lon2, 'lat3':lat3, 'lon3':lon3, 'lat4':lat4, 'lon4':lon4})
  338.  
  339. self.newestUserLocationsInAreaIterator = self.newestUserLocationsInAreaQueries.__iter__()
  340.  
  341. def getLocationsInAreaInIntervalQueriesData(self):
  342. self.locationsInAreaInIntervalQueries = []
  343.  
  344. if self.locationsInAreaInIntervalDataFile != None:
  345. fcntl.lockf(self.locationsInAreaInIntervalDataFile, fcntl.LOCK_UN)
  346. self.locationsInAreaInIntervalDataFile.close()
  347.  
  348. self.locationsInAreaInIntervalDataFile = None
  349. startIdx = random.randint(0, len(self.locationsInAreaInIntervalFiles) - 1)
  350. idx = startIdx
  351. while self.locationsInAreaInIntervalDataFile == None:
  352. try:
  353. chosenFile = self.locationsInAreaInIntervalFiles[idx]
  354. self.locationsInAreaInIntervalDataFile = open(chosenFile, "r+")
  355. fcntl.lockf(self.locationsInAreaInIntervalDataFile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  356. except IOError as e:
  357. self.locationsInAreaInIntervalDataFile = None
  358. idx = idx + 1
  359.  
  360. if idx == len(self.locationsInAreaInIntervalFiles):
  361. idx = 0
  362. elif idx == startIdx:
  363. print "no more files"
  364. raise StopLocust()
  365.  
  366. time.sleep(0.1)
  367. continue
  368.  
  369. reader = csv.reader(self.locationsInAreaInIntervalDataFile)
  370. for row in reader:
  371. # Parse data
  372. lat1 = float(row[0])
  373. lon1 = float(row[1])
  374. lat2 = float(row[2])
  375. lon2 = float(row[3])
  376. lat3 = float(row[4])
  377. lon3 = float(row[5])
  378. lat4 = float(row[6])
  379. lon4 = float(row[7])
  380.  
  381. timeString1 = row[8][:19]
  382. year = timeString1[:4]
  383. month = timeString1[5:7]
  384. day = timeString1[8:10]
  385. hour = timeString1[11:13]
  386. minutes = timeString1[14:16]
  387. seconds = timeString1[17:19]
  388.  
  389. timeString2 = row[9][:19]
  390. year2 = timeString2[:4]
  391. month2 = timeString2[5:7]
  392. day2 = timeString2[8:10]
  393. hour2 = timeString2[11:13]
  394. minutes2 = timeString2[14:16]
  395. seconds2 = timeString2[17:19]
  396.  
  397. dateTimeObject1 = datetime(int(year),int(month),int(day), int(hour), int(minutes), int(seconds))
  398. dateTimeObject2 = datetime(int(year2),int(month2),int(day2), int(hour2), int(minutes2), int(seconds2))
  399.  
  400. self.locationsInAreaInIntervalQueries.append({'lat1':lat1, 'lon1':lon1, 'lat2':lat2, 'lon2':lon2, 'lat3':lat3, 'lon3':lon3, 'lat4':lat4, 'lon4':lon4, 'timestamp1':dateTimeObject1, 'timestamp2':dateTimeObject2})
  401.  
  402. self.locationsInAreaInIntervalIterator = self.locationsInAreaInIntervalQueries.__iter__()
  403.  
  404. def getLocationsInRadiusQueriesData(self):
  405. self.locationsInRadiusQueries = []
  406.  
  407. if self.locationsInRadiusDataFile != None:
  408. fcntl.lockf(self.locationsInRadiusDataFile, fcntl.LOCK_UN)
  409. self.locationsInRadiusDataFile.close()
  410.  
  411. self.locationsInRadiusDataFile = None
  412. startIdx = random.randint(0, len(self.locationsInRadiusFiles) - 1)
  413. idx = startIdx
  414. while self.locationsInRadiusDataFile == None:
  415. try:
  416. chosenFile = self.locationsInRadiusFiles[idx]
  417. self.locationsInRadiusDataFile = open(chosenFile, "r+")
  418. fcntl.lockf(self.locationsInRadiusDataFile, fcntl.LOCK_EX | fcntl.LOCK_NB)
  419. except IOError as e:
  420. self.locationsInRadiusDataFile = None
  421. idx = idx + 1
  422.  
  423. if idx == len(self.locationsInRadiusFiles):
  424. idx = 0
  425. elif idx == startIdx:
  426. print "no more files"
  427. raise StopLocust()
  428.  
  429. time.sleep(0.1)
  430. continue
  431.  
  432. reader = csv.reader(self.locationsInRadiusDataFile)
  433. for row in reader:
  434. # Parse data
  435. lat = float(row[0])
  436. lon = float(row[1])
  437. radius = float(row[2]) / 1.609344 # Conversion from km to miles
  438. self.locationsInRadiusQueries.append({'lat':lat, 'lon':lon, 'radius':radius})
  439.  
  440. self.locationsInRadiusIterator = self.locationsInRadiusQueries.__iter__()
  441.  
  442. class WebsiteUser(Locust):
  443. task_set = UserBehavior
  444. min_wait = 4000
  445. max_wait = 5000
  446.  
  447. class MongoDbClient():
  448. mongoClient = None
  449. db = None
  450. coll = None
  451.  
  452. def __init__(self):
  453. MongoDbClient.mongoClient = MongoClient('172.16.0.76', 27019, maxPoolSize = 1000)
  454. MongoDbClient.db = MongoDbClient.mongoClient['sw603']
  455. MongoDbClient.coll = MongoDbClient.db['locs']
  456.  
  457. ### Queries ###
  458. def saveLocation(self, userId, timestamp, lon, lat):
  459. return MongoDbClient.coll.insert_one({'timestamp':timestamp, 'userId':userId, 'location':{'type':'Point', 'coordinates':[lon, lat]}})
  460.  
  461. def getNewestLocation(self, userId):
  462. return MongoDbClient.coll.find({'userId':userId}).sort('timestamp', pymongo.DESCENDING).limit(1)
  463.  
  464. def getNewestLocationsInArea(self, lon1, lat1, lon2, lat2, lon3, lat3, lon4, lat4):
  465. query = {'location':{'$geoWithin':{'$geometry':{'type':'Polygon','coordinates':[[[lon1,lat1],[lon2,lat2],[lon3,lat3],[lon4,lat4],[lon1,lat1]]]}}}}
  466. sort = collections.OrderedDict([("userId", pymongo.ASCENDING), ("timestamp", pymongo.DESCENDING)])
  467.  
  468. pipeline = [
  469. {"$match": query},
  470. {"$sort":sort},
  471. {"$group": {
  472. "_id": "$userId",
  473. "timestamp": {"$first":"$timestamp"},
  474. "location": {"$first":"$location"}
  475. }}
  476. ]
  477.  
  478. return MongoDbClient.coll.aggregate(pipeline, maxTimeMS=30000)
  479.  
  480. # map
  481. # map = {}
  482. # r = MongoDbClient.coll.find(query)
  483.  
  484. # for loc in r:
  485. # try:
  486. # map[loc['userId']].append(loc)
  487. # except KeyError:
  488. # map[loc['userId']] = [loc]
  489.  
  490. # # reduce
  491. # result = {}
  492. # for userId in map.keys():
  493. # mostRecent = None
  494. # for loc in map[userId]:
  495. # if mostRecent == None:
  496. # mostRecent = loc['timestamp']
  497. # result[userId] = loc
  498. # elif mostRecent < loc['timestamp']:
  499. # mostRecent = loc['timestamp']
  500. # result[userId] = loc
  501.  
  502. # return result
  503.  
  504. def getLocationsAtTimeInterval(self, userId, time1, time2):
  505. return MongoDbClient.coll.find({'userId':userId,'timestamp':{'$gte':time1, '$lte':time2}})
  506.  
  507. def getLocationsInAreaAtTimeInterval(self, lon1, lat1, lon2, lat2, lon3, lat3, lon4, lat4, time1, time2):
  508. return MongoDbClient.coll.find({'location':{'$geoWithin':{'$geometry':{'type':'Polygon','coordinates':[[[lon1,lat1],[lon2,lat2],[lon3,lat3],[lon4,lat4],[lon1,lat1]]]}}},'timestamp':{'$gte':time1, '$lte':time2}}).sort('userId', pymongo.ASCENDING)
  509.  
  510. def getAllLocationsInRadius(self, lon, lat, radius):
  511. return MongoDbClient.coll.find({'location': {'$geoWithin': {'$centerSphere':[[lon,lat],radius / 3963.2]}}})
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement