Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import devices.depot
- import events
- import util.datadoc
- import util.logger
- import util.threads
- import numpy
- import threading
- import time
- ## Unique ID for identifying saver instances
- uniqueID = 0
- ## This class simply records all data received during an experiment and then
- # saves it to disk later.
- class DataSaver:
- ## \param cameras List of CameraHandler instances for the cameras that
- # will be generating images
- # \param numReps How many times the experiment will be repeated.
- # \param cameraToImagesPerRep Maps camera handlers to how many images to
- # expect for that camera in a single repeat of the experiment.
- # \param runThread Thread that is executing the experiment. When it exits,
- # we know to stop expecting more images.
- # \param savePath Path to save the incoming data to.
- # \param pixelSizeZ Size of the Z "pixel" (i.e. distance between Z slices).
- # \param titles List of strings to insert into the MRC file's header.
- # Per the file format, each string can be up to 80 characters long
- # and there can be up to 10 of them.
- def __init__(self, cameras, numReps, cameraToImagesPerRep, runThread,
- savePath, pixelSizeZ, titles):
- self.cameras = cameras
- self.numReps = numReps
- self.cameraToImagesPerRep = cameraToImagesPerRep
- self.runThread = runThread
- global uniqueID
- ## Unique ID for our instance
- self.uniqueID = uniqueID
- uniqueID += 1
- ## Set to True once we're done with all saving activity.
- self.isSavingComplete = False
- # Find the maximum image size (in pixels) in X and Y. While we're at it,
- # assign a number to each camera, for indexing into our data array
- # later.
- maxWidth, maxHeight = 0, 0
- ## Maps a camera handler to the index into self.data for that camera.
- self.handleToIndex = {}
- for i, camera in enumerate(self.cameras):
- width, height = camera.getImageSize()
- maxWidth = max(width, maxWidth)
- maxHeight = max(height, maxHeight)
- self.handleToIndex[camera] = i
- self.indexToHandle = dict([(value, key) for key, value in self.handleToIndex.iteritems()])
- ## Array of pixel data, in WTZYX order. Because different cameras
- # may have different image sizes, we pad with zeros to fill in the
- # extra space.
- self.data = numpy.zeros(
- (len(self.cameras), self.numReps,
- max(self.cameraToImagesPerRep.values()),
- maxHeight, maxWidth),
- dtype = numpy.uint16)
- ## Filehandle we will write the data to.
- self.filehandle = open(savePath, 'wb')
- ## Lock on writing to the file.
- self.fileLock = threading.Lock()
- pixelSizeXY = devices.depot.getHandlersOfType(devices.depot.OBJECTIVE)[0].getPixelSize()
- ## MRC header object for the data.
- self.header = util.datadoc.makeHeaderFor(self.data, pixelSizeXY, pixelSizeZ)
- self.header.NumTitles = len(titles)
- self.header.title[:len(titles)] = titles
- with self.fileLock:
- util.datadoc.writeMrcHeader(self.header, self.filehandle)
- ## Number of bytes to allocate for each image in the file.
- # \todo Assuming unsigned 16-bit integer here.
- self.imageBytes = (maxWidth * maxHeight * 2)
- ## List of how many images we've received, on a per-camera basis.
- self.imagesReceived = [0] * len(self.cameras)
- ## List of functions that receive image data and feed it into
- # self.imagesReceived.
- self.lambdas = []
- ## Subscribe to the new-camera-image events for the cameras we care about.
- # Save the functions we generate for handling the subscriptions, so we can
- # unsubscribe later.
- def startCollecting(self):
- for camera in self.cameras:
- func = lambda data, camera = camera: self.onImage(self.handleToIndex[camera], data)
- self.lambdas.append(func)
- events.subscribe('new image %s' % camera.name, func)
- ## Wait for the runThread to finish, then wait a bit longer in case some
- # images are laggardly, before we close our filehandle.
- def executeAndSave(self):
- self.runThread.join()
- # Check if we've received all images; if we haven't, then give them
- # a little time to arrive.
- waitStart = time.time()
- while time.time() - waitStart < .5:
- amDone = True
- for camera in self.cameras:
- total = self.imagesReceived[self.handleToIndex[camera]]
- target = self.cameraToImagesPerRep[camera] * self.numReps
- if total != target:
- # There exists a camera for which we do not have all
- # images yet.
- amDone = False
- break
- if amDone:
- break
- time.sleep(.01)
- for i, camera in enumerate(self.cameras):
- events.unsubscribe('new image %s' % camera.name, self.lambdas[i])
- with self.fileLock:
- # Finish writing.
- self.filehandle.close()
- for camera in self.cameras:
- expected = self.cameraToImagesPerRep[camera] * self.numReps
- received = self.imagesReceived[self.handleToIndex[camera]]
- if expected != received:
- print "Only got %d out of %d images for camera %s" % (received, expected, camera.name)
- util.logger.log.warn("Missing images for camera %s: only got %d out of %d" % (camera.name, received, expected))
- self.isSavingComplete = True
- ## Receive new data and add it to our array.
- @util.threads.callInNewThread
- def onImage(self, cameraIndex, imageData):
- with self.fileLock:
- # Calculate the time and Z indices for the new image.
- imagesPerRep = self.cameraToImagesPerRep[self.indexToHandle[cameraIndex]]
- numImages = self.imagesReceived[cameraIndex]
- timepoint = int(numImages / imagesPerRep)
- zIndex = numImages - timepoint * imagesPerRep
- height, width = imageData.shape
- try:
- self.data[cameraIndex, timepoint, zIndex, :height, :width] = imageData
- except Exception, e:
- print "Bad index",self.uniqueID,numImages,(cameraIndex, timepoint, zIndex, height, width), self.data.shape
- util.logger.log.error("Saver %d tried invalid index: %s compare %s" % (self.uniqueID, (cameraIndex, timepoint, zIndex, height, width), self.data.shape))
- # Determine the offset for the image in the file. 1024 is the
- # size of the image header.
- offset = 1024 + self.imageBytes * (cameraIndex * max(self.cameraToImagesPerRep.values()) + self.imagesReceived[cameraIndex])
- self.filehandle.seek(offset)
- self.filehandle.write(self.data[cameraIndex, timepoint, zIndex])
- self.imagesReceived[cameraIndex] += 1
- ## Return True if saving is complete.
- def getAmDone(self):
- return self.isSavingComplete
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement