Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os, pymongo, csv, time, json, random, fcntl, collections
- from datetime import datetime
- from locust import Locust, TaskSet, task, events
- #from mongo_connection import MongoDbClient
- from bson.code import Code
- from pymongo import MongoClient, GEO2D
- import resource
- resource.setrlimit(resource.RLIMIT_NOFILE, (999999, 999999))
- split3 = '/home/segment_3/'
- split5 = '/home/segment_5/'
- split10 = '/home/segment_10/'
- # DEFINE THE USED SPLIT METHOD
- split = split10
- # Paths
- newestLocation = split + 'newest_location/'
- locationsForUserInInterval = split + 'locations_for_user_in_interval/'
- newestUserLocationsInArea = split + 'newest_user_locations_in_area/'
- locationsInAreaInInterval = split + 'locations_in_area_in_interval/'
- locationsInRadius = split + 'locations_in_radius/'
- ### READ TEST FOR MONGO DB ###
- class UserBehavior(TaskSet):
- # Query arrays
- newestLocationQueries = []
- locationsForUserInIntervalQueries = []
- newestUserLocationsInAreaQueries = []
- locationsInAreaInIntervalQueries = []
- locationsInRadiusQueries = []
- # Query iterators
- newestLocationIterator = None
- locationsForUserInIntervalIterator = None
- newestUserLocationsInAreaIterator = None
- locationsInAreaInIntervalIterator = None
- locationsInRadiusIterator = None
- # Query files used
- newestLocationDataFile = None
- locationsForUserInIntervalDataFile = None
- newestUserLocationsInAreaDataFile = None
- locationsInAreaInIntervalDataFile = None
- locationsInRadiusDataFile = None
- newestLocationFiles = []
- locationsForUserInIntervalFiles = []
- newestUserLocationsInAreaFiles = []
- locationsInAreaInIntervalFiles = []
- locationsInRadiusFiles = []
- mongo = None
- def on_start(self):
- if (UserBehavior.mongo == None):
- print "init mongo"
- UserBehavior.mongo = MongoDbClient()
- for fn in os.listdir(newestLocation):
- filePath = newestLocation + fn
- if os.path.isfile(filePath):
- self.newestLocationFiles.append(filePath)
- for fn in os.listdir(locationsForUserInInterval):
- filePath = locationsForUserInInterval + fn
- if os.path.isfile(filePath):
- self.locationsForUserInIntervalFiles.append(filePath)
- for fn in os.listdir(newestUserLocationsInArea):
- filePath = newestUserLocationsInArea + fn
- if os.path.isfile(filePath):
- self.newestUserLocationsInAreaFiles.append(filePath)
- for fn in os.listdir(locationsInAreaInInterval):
- filePath = locationsInAreaInInterval + fn
- if os.path.isfile(filePath):
- self.locationsInAreaInIntervalFiles.append(filePath)
- for fn in os.listdir(locationsInRadius):
- filePath = locationsInRadius + fn
- if os.path.isfile(filePath):
- self.locationsInRadiusFiles.append(filePath)
- self.getNewestLocationQueriesData()
- self.getLocationsForUserInIntervalQueriesData()
- self.getNewestUserLocationsInAreaQueriesData()
- self.getLocationsInAreaInIntervalQueriesData()
- self.getLocationsInRadiusQueriesData()
- @task(10)
- def getNewestLocation(self):
- try:
- queryData = self.newestLocationIterator.next()
- except StopIteration:
- self.getNewestLocationQueriesData()
- queryData = self.newestLocationIterator.next()
- startTime = time.time()
- try:
- result = UserBehavior.mongo.getNewestLocation(queryData['userId'])
- except pymongo.errors.OperationFailure, e:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_failure.fire(request_type='mongodb', name='getNewestLocation', response_time=totalTime, exception=e)
- else:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_success.fire(request_type='mongodb', name='getNewestLocation', response_time=totalTime, response_length=0)
- @task(10)
- def getNewestLocationsInArea(self):
- try:
- queryData = self.newestUserLocationsInAreaIterator.next()
- except StopIteration:
- self.getNewestUserLocationsInAreaQueriesData()
- queryData = self.newestUserLocationsInAreaIterator.next()
- startTime = time.time()
- try:
- result = UserBehavior.mongo.getNewestLocationsInArea(queryData['lon1'], queryData['lat1'], queryData['lon2'], queryData['lat2'], queryData['lon3'], queryData['lat3'], queryData['lon4'], queryData['lat4'])
- result.close()
- except pymongo.errors.OperationFailure, e:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_failure.fire(request_type='mongodb', name='getNewestLocationsInArea', response_time=totalTime, exception=e)
- else:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_success.fire(request_type='mongodb', name='getNewestLocationsInArea', response_time=totalTime, response_length=0)
- @task(7)
- def getLocationsAtTimeInterval(self):
- try:
- queryData = self.locationsForUserInIntervalIterator.next()
- except StopIteration:
- self.getLocationsForUserInIntervalQueriesData()
- queryData = self.locationsForUserInIntervalIterator.next()
- startTime = time.time()
- try:
- result = UserBehavior.mongo.getLocationsAtTimeInterval(queryData['userId'], queryData['timestamp1'], queryData['timestamp2'])
- except pymongo.errors.OperationFailure, e:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_failure.fire(request_type='mongodb', name='getLocationsAtTimeInterval', response_time=totalTime, exception=e)
- else:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_success.fire(request_type='mongodb', name='getLocationsAtTimeInterval', response_time=totalTime, response_length=0)
- @task(5)
- def getLocationsInAreaAtTimeInterval(self):
- try:
- queryData = self.locationsInAreaInIntervalIterator.next()
- except StopIteration:
- self.getLocationsInAreaInIntervalQueriesData()
- queryData = self.locationsInAreaInIntervalIterator.next()
- startTime = time.time()
- try:
- result = UserBehavior.mongo.getLocationsInAreaAtTimeInterval(queryData['lon1'], queryData['lat1'], queryData['lon2'], queryData['lat2'], queryData['lon3'], queryData['lat3'], queryData['lon4'], queryData['lat4'], queryData['timestamp1'], queryData['timestamp2'])
- except pymongo.errors.OperationFailure, e:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_failure.fire(request_type='mongodb', name='getLocationsInAreaAtTimeInterval', response_time=totalTime, exception=e)
- else:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_success.fire(request_type='mongodb', name='getLocationsInAreaAtTimeInterval', response_time=totalTime, response_length=0)
- @task(5)
- def getAllLocationsInRadius(self):
- try:
- queryData = self.locationsInRadiusIterator.next()
- except StopIteration:
- self.getLocationsInRadiusQueriesData()
- queryData = self.locationsInRadiusIterator.next()
- startTime = time.time()
- try:
- result = UserBehavior.mongo.getAllLocationsInRadius(queryData['lon'], queryData['lat'], queryData['radius'])
- except pymongo.errors.OperationFailure, e:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_failure.fire(request_type='mongodb', name='getAllLocationsInRadius', response_time=totalTime, exception=e)
- else:
- totalTime = int((time.time() - startTime) * 1000)
- events.request_success.fire(request_type='mongodb', name='getAllLocationsInRadius', response_time=totalTime, response_length=0)
- ### Parser functions
- def getNewestLocationQueriesData(self):
- self.newestLocationQueries = []
- if self.newestLocationDataFile != None:
- fcntl.lockf(self.newestLocationDataFile, fcntl.LOCK_UN)
- self.newestLocationDataFile.close()
- f = None
- startIdx = random.randint(0, len(self.newestLocationFiles) - 1)
- idx = startIdx
- while f == None:
- try:
- chosenFile = self.newestLocationFiles[idx]
- f = open(chosenFile, "r+")
- fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as e:
- f = None
- idx = idx + 1
- if idx == len(self.newestLocationFiles):
- idx = 0
- elif idx == startIdx:
- print "no more files"
- raise StopLocust()
- time.sleep(0.1)
- continue
- reader = csv.reader(f)
- for row in reader:
- # Parse data
- userId = row[0]
- self.newestLocationQueries.append({'userId':userId})
- self.newestLocationIterator = self.newestLocationQueries.__iter__()
- def getLocationsForUserInIntervalQueriesData(self):
- self.locationsForUserInIntervalQueries = []
- if self.locationsForUserInIntervalDataFile != None:
- fcntl.lockf(self.locationsForUserInIntervalDataFile, fcntl.LOCK_UN)
- self.locationsForUserInIntervalDataFile.close()
- f = None
- startIdx = random.randint(0, len(self.locationsForUserInIntervalFiles) - 1)
- idx = startIdx
- while f == None:
- try:
- chosenFile = self.locationsForUserInIntervalFiles[idx]
- f = open(chosenFile, "r+")
- fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as e:
- f = None
- idx = idx + 1
- if idx == len(self.locationsForUserInIntervalFiles):
- idx = 0
- elif idx == startIdx:
- print "no more files"
- raise StopLocust()
- time.sleep(0.1)
- continue
- reader = csv.reader(f)
- for row in reader:
- # Parse data
- userId = row[0]
- timeString1 = row[1][:19]
- year = timeString1[:4]
- month = timeString1[5:7]
- day = timeString1[8:10]
- hour = timeString1[11:13]
- minutes = timeString1[14:16]
- seconds = timeString1[17:19]
- timeString2 = row[2][:19]
- year2 = timeString2[:4]
- month2 = timeString2[5:7]
- day2 = timeString2[8:10]
- hour2 = timeString2[11:13]
- minutes2 = timeString2[14:16]
- seconds2 = timeString2[17:19]
- dateTimeObject1 = datetime(int(year),int(month),int(day), int(hour), int(minutes), int(seconds))
- dateTimeObject2 = datetime(int(year2),int(month2),int(day2), int(hour2), int(minutes2), int(seconds2))
- self.locationsForUserInIntervalQueries.append({'userId':userId, 'timestamp1':dateTimeObject1, 'timestamp2':dateTimeObject2})
- self.locationsForUserInIntervalIterator = self.locationsForUserInIntervalQueries.__iter__()
- def getNewestUserLocationsInAreaQueriesData(self):
- self.newestUserLocationsInAreaQueries = []
- if self.newestUserLocationsInAreaDataFile != None:
- fcntl.lockf(self.newestUserLocationsInAreaDataFile, fcntl.LOCK_UN)
- self.newestUserLocationsInAreaDataFile.close()
- f = None
- startIdx = random.randint(0, len(self.newestUserLocationsInAreaFiles) - 1)
- idx = startIdx
- while f == None:
- try:
- chosenFile = self.newestUserLocationsInAreaFiles[idx]
- f = open(chosenFile, "r+")
- fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as e:
- f = None
- idx = idx + 1
- if idx == len(self.newestUserLocationsInAreaFiles):
- idx = 0
- elif idx == startIdx:
- print "no more files"
- raise StopLocust()
- time.sleep(0.1)
- continue
- reader = csv.reader(f)
- for row in reader:
- # Parse data
- lat1 = float(row[0])
- lon1 = float(row[1])
- lat2 = float(row[2])
- lon2 = float(row[3])
- lat3 = float(row[4])
- lon3 = float(row[5])
- lat4 = float(row[6])
- lon4 = float(row[7])
- self.newestUserLocationsInAreaQueries.append({'lat1':lat1, 'lon1':lon1, 'lat2':lat2, 'lon2':lon2, 'lat3':lat3, 'lon3':lon3, 'lat4':lat4, 'lon4':lon4})
- self.newestUserLocationsInAreaIterator = self.newestUserLocationsInAreaQueries.__iter__()
- def getLocationsInAreaInIntervalQueriesData(self):
- self.locationsInAreaInIntervalQueries = []
- if self.locationsInAreaInIntervalDataFile != None:
- fcntl.lockf(self.locationsInAreaInIntervalDataFile, fcntl.LOCK_UN)
- self.locationsInAreaInIntervalDataFile.close()
- f = None
- startIdx = random.randint(0, len(self.locationsInAreaInIntervalFiles) - 1)
- idx = startIdx
- while f == None:
- try:
- chosenFile = self.locationsInAreaInIntervalFiles[idx]
- f = open(chosenFile, "r+")
- fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as e:
- f = None
- idx = idx + 1
- if idx == len(self.locationsInAreaInIntervalFiles):
- idx = 0
- elif idx == startIdx:
- print "no more files"
- raise StopLocust()
- time.sleep(0.1)
- continue
- reader = csv.reader(f)
- for row in reader:
- # Parse data
- lat1 = float(row[0])
- lon1 = float(row[1])
- lat2 = float(row[2])
- lon2 = float(row[3])
- lat3 = float(row[4])
- lon3 = float(row[5])
- lat4 = float(row[6])
- lon4 = float(row[7])
- timeString1 = row[8][:19]
- year = timeString1[:4]
- month = timeString1[5:7]
- day = timeString1[8:10]
- hour = timeString1[11:13]
- minutes = timeString1[14:16]
- seconds = timeString1[17:19]
- timeString2 = row[9][:19]
- year2 = timeString2[:4]
- month2 = timeString2[5:7]
- day2 = timeString2[8:10]
- hour2 = timeString2[11:13]
- minutes2 = timeString2[14:16]
- seconds2 = timeString2[17:19]
- dateTimeObject1 = datetime(int(year),int(month),int(day), int(hour), int(minutes), int(seconds))
- dateTimeObject2 = datetime(int(year2),int(month2),int(day2), int(hour2), int(minutes2), int(seconds2))
- self.locationsInAreaInIntervalQueries.append({'lat1':lat1, 'lon1':lon1, 'lat2':lat2, 'lon2':lon2, 'lat3':lat3, 'lon3':lon3, 'lat4':lat4, 'lon4':lon4, 'timestamp1':dateTimeObject1, 'timestamp2':dateTimeObject2})
- self.locationsInAreaInIntervalIterator = self.locationsInAreaInIntervalQueries.__iter__()
- def getLocationsInRadiusQueriesData(self):
- self.locationsInRadiusQueries = []
- if self.locationsInRadiusDataFile != None:
- fcntl.lockf(self.locationsInRadiusDataFile, fcntl.LOCK_UN)
- self.locationsInRadiusDataFile.close()
- f = None
- startIdx = random.randint(0, len(self.locationsInRadiusFiles) - 1)
- idx = startIdx
- while f == None:
- try:
- chosenFile = self.locationsInRadiusFiles[idx]
- f = open(chosenFile, "r+")
- fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as e:
- f = None
- idx = idx + 1
- if idx == len(self.locationsInRadiusFiles):
- idx = 0
- elif idx == startIdx:
- print "no more files"
- raise StopLocust()
- time.sleep(0.1)
- continue
- reader = csv.reader(f)
- for row in reader:
- # Parse data
- lat = float(row[0])
- lon = float(row[1])
- radius = float(row[2]) / 1.609344 # Conversion from km to miles
- self.locationsInRadiusQueries.append({'lat':lat, 'lon':lon, 'radius':radius})
- self.locationsInRadiusIterator = self.locationsInRadiusQueries.__iter__()
- class WebsiteUser(Locust):
- task_set = UserBehavior
- min_wait = 4000
- max_wait = 5000
- class MongoDbClient():
- mongoClient = None
- db = None
- coll = None
- def __init__(self):
- MongoDbClient.mongoClient = MongoClient('172.16.0.76', 27019, maxPoolSize = 1000)
- MongoDbClient.db = MongoDbClient.mongoClient['sw603']
- MongoDbClient.coll = MongoDbClient.db['locs']
- ### Queries ###
- def saveLocation(self, userId, timestamp, lon, lat):
- return MongoDbClient.coll.insert_one({'timestamp':timestamp, 'userId':userId, 'location':{'type':'Point', 'coordinates':[lon, lat]}})
- def getNewestLocation(self, userId):
- return MongoDbClient.coll.find({'userId':userId}).sort('timestamp', pymongo.DESCENDING).limit(1)
- def getNewestLocationsInArea(self, lon1, lat1, lon2, lat2, lon3, lat3, lon4, lat4):
- query = {'location':{'$geoWithin':{'$geometry':{'type':'Polygon','coordinates':[[[lon1,lat1],[lon2,lat2],[lon3,lat3],[lon4,lat4],[lon1,lat1]]]}}}}
- #sort = collections.OrderedDict([("userId", pymongo.ASCENDING), ("timestamp", pymongo.DESCENDING)])
- #pipeline = [
- #{"$match": query},
- #{"$sort":sort},
- # {"$group": {
- # "_id": "$userId",
- # "userId": {"$first":"$userId"},
- # "timestamp": {"$first":"$timestamp"},
- # "location": {"$first":"$location"}
- # }}
- ]
- #return MongoDbClient.coll.aggregate(pipeline, maxTimeMS=5000)
- # map
- map = {}
- for loc in MongoDbClient.coll.find(query):
- try:
- map[loc['userId']].append(loc)
- except KeyError:
- map[loc['userId']] = [loc]
- # reduce
- result = {}
- for userId in map.keys():
- mostRecent = None
- for loc in map[userId]:
- if mostRecent == None:
- mostRecent = loc['timestamp']
- result[userId] = loc
- elif mostRecent < loc['timestamp']:
- mostRecent = loc['timestamp']
- result[userId] = loc
- return result
- def getLocationsAtTimeInterval(self, userId, time1, time2):
- return MongoDbClient.coll.find({'userId':userId,'timestamp':{'$gte':time1, '$lte':time2}})
- def getLocationsInAreaAtTimeInterval(self, lon1, lat1, lon2, lat2, lon3, lat3, lon4, lat4, time1, time2):
- return MongoDbClient.coll.find({'location':{'$geoWithin':{'$geometry':{'type':'Polygon','coordinates':[[[lon1,lat1],[lon2,lat2],[lon3,lat3],[lon4,lat4],[lon1,lat1]]]}}},'timestamp':{'$gte':time1, '$lte':time2}}).sort('userId', pymongo.ASCENDING)
- def getAllLocationsInRadius(self, lon, lat, radius):
- return MongoDbClient.coll.find({'location': {'$geoWithin': {'$centerSphere':[[lon,lat],radius / 3963.2]}}})
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement