Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ##########################################################################################
- #
- # Code to create a TFRecord from videos (droplet videos). The frames are inputted into a
- # VAE, and the latent variable is stored into the TFRecord (instead of the full frame).
- # Thus, for each video the TFRecord will store the latent variable that represents each
- # frame.
- # The videos must be binasired, and 400x400, in the format expected by the VAE.
- #
- # Author: Juan Manuel Parrilla Gutierrez ([email protected])
- #
- ##########################################################################################
- import glob, cv2, tqdm, sys, os, re, random, shutil
- import tensorflow as tf
- import numpy as np
- from tensorflow.keras import utils
- from src.utils.modelutils import load_vae_model
- class TFRecordWriter(object):
- """
- Class handling writing the video data (through a VAE) to a TFRecord.
- Please check TFRecordReader to see how to read it back
- """
- def __init__(self, videopaths, modelpath, dillation,
- step_size=1, batch_size=32, train_split=0.8):
- """Creates an object that will be used to save the data into a TFRecord
- Args:
- videopaths: Path to the folder where all the videos are. The videos must be
- binarised, created with the script VideoUtils.py
- modelpath: Path to the folder with the trained VAE (created with train_vae.py)
- dillation: "normal", "reverse" or "no". Check FramesLoader for more info
- step_size: How many frames to take. For example step_3 will take 1 out 3 frames
- batch_size: Batch size of the data when we run it against the VAE encoder
- train_split: % of data in train_set (the rest goes to test_set)
- """
- self.videopaths = videopaths
- self.modelpath = modelpath
- self.dillation = dillation
- self.step_size = step_size
- self.batch_size = batch_size
- self.train_split = train_split
- self.vae, self.input_dim = load_vae_model(self.modelpath)
- # Because the videos have different sizes, we need to break the longer ones
- # into sequences of max 416, so that all the batches have the same number of elements
- # therefore all the batches will be of size 416, latent_size
- # this number needs to be adjusted depending on self.batch_size
- self.seq_length = 416 * 2
- def serialise_to_tfrecords(self):
- """ Serialises all the data into one file with TFRecords."""
- # get all the videos
- run_name = (self.videopaths+'/*.avi')
- videos = glob.glob(run_name)
- random.shuffle(videos)
- db_split = int(len(videos)*self.train_split)
- train_set = videos[:db_split]
- test_set = videos[db_split:]
- for ti, tset in enumerate([train_set, test_set]):
- if ti==0:
- out_path = self.videopaths + "/train.tfrecord"
- else:
- out_path = self.videopaths + "/test.tfrecord"
- writer = tf.io.TFRecordWriter(out_path)
- for video in tqdm.tqdm(tset, unit='F'):
- frames_latent_vectors = self.video2latentvectors(video)
- vector_length = frames_latent_vectors.shape[0]
- recipe = self.get_recipe(video)
- # break down the latent_vectors into sequences of length seq_lentgh
- for seq in range(0, vector_length, self.seq_length):
- start = seq
- end = seq + self.seq_length
- if end > vector_length:
- break
- chunk = frames_latent_vectors[start:end]
- # self.visualise_latent_reconstructions(frames_latent_vectors)
- serialised_video = self.prepare_TFRecord(chunk, recipe)
- writer.write(serialised_video)
- # move file to either train or test folder
- # get the folder where the file is
- current_folder = os.path.dirname(video)
- # get the name of the file
- file_name = os.path.basename(video)
- if ti == 0:
- dest_folder = current_folder + "/train/"
- else:
- dest_folder = current_folder + "/test/"
- destination = dest_folder + file_name
- shutil.move(video, destination)
- writer.close()
- def get_recipe(self, videopath):
- """ from the filename we can get the recipe. For example:
- octanoic_0_pentanol_0_octanol_9_dep_90_raw_1_bin.mp4
- means 0% octanoic, 0% pentaol, 9% octanol, 90% DEP"""
- # get the file name only, not the full path with folders
- file_name = os.path.basename(videopath)
- # remove the extension
- file_name = os.path.splitext(file_name)[0]
- # find the first four numbers, which are the recipe
- recipe = re.findall(r'\d+', file_name)[:4]
- # transform to int and to numpy
- recipenp = np.array( [int(x) for x in recipe] )
- # normalise and return
- return recipenp / np.sum(recipenp)
- def video2latentvectors(self, videopath):
- """
- Given a video, it will use a trained vae to return the latent vectors for each frame
- """
- # create dataset and load vae
- ds = self.video2dataset(videopath)
- # where to store the data as it is generated by the vae
- vectors = []
- # using vae get latent vectors
- for batch in ds:
- _, _, latent = self.vae.encoder(batch)
- vectors.append(latent)
- # this will go from (n_video, batch, latent_v) to (n_video*batch, latent_v)
- return np.concatenate( np.array(vectors) )
- def video2dataset(self, videopath):
- """
- Given a video, it will return a TF dataset with its frames
- """
- AUTOTUNE = tf.data.AUTOTUNE
- # Get a numpy array with all the frames
- frames = self.frames_from_video_file(videopath)
- # convert the numpy array into a tf dataset
- dataset = tf.data.Dataset.from_tensor_slices(frames)
- # batch it
- dataset = dataset.batch(self.batch_size, drop_remainder=True)
- # preprocess it
- dataset = self.preprocess_dataset(dataset)
- # configure for performance
- dataset = dataset.prefetch(buffer_size=AUTOTUNE)
- return dataset
- def preprocess_dataset(self, dataset):
- # perform some pre-processing as we did to train the vae
- normalization_layer = tf.keras.layers.Rescaling(1./255)
- dillation_layer = tf.keras.layers.MaxPool2D(pool_size=5, strides=1, padding='same')
- dataset = dataset.map(lambda x: tf.image.resize(
- x, (self.input_dim[0], self.input_dim[1]) ))
- if self.dillation == "normal":
- dataset = dataset.map(lambda x: dillation_layer(x))
- elif self.dillation == "reverse":
- dataset = dataset.map(lambda x: 1-dillation_layer(1-x))
- normalized_ds = dataset.map(lambda x: normalization_layer(x))
- return normalized_ds
- def frames_from_video_file(self, videopath):
- """
- Given a video, it will return the frames in a numpy array
- """
- frames = []
- video_capture = cv2.VideoCapture(videopath)
- while True:
- # Take one frame every step_size
- for _ in range(self.step_size):
- ret, frame = video_capture.read()
- if not ret:
- break
- if not ret:
- break
- # the following line would convert it from 0..255 to 0..1
- # but we do a normalization layer later on, so I will comment this out
- # frame = tf.image.convert_image_dtype(frame, tf.float32)
- frames.append(frame)
- # last bit changes from bgr to rgb
- return np.array(frames)[..., [2, 1, 0]]
- def prepare_TFRecord(self, frames, recipe):
- # Tensorflow nomenclature to serialised data to create the TFRecords
- frames_feature = tf.train.Feature(
- bytes_list=tf.train.BytesList(value=[
- tf.io.serialize_tensor(frames).numpy(),
- ])
- )
- recipe_feature = tf.train.Feature(
- float_list=tf.train.FloatList(value=recipe),
- )
- features = tf.train.Features(feature={
- 'frames': frames_feature,
- 'recipe': recipe_feature
- })
- example = tf.train.Example(features=features)
- return example.SerializeToString()
- def visualise_latent_reconstructions(self, latent_vectors):
- """
- creates images of the latent vectors generated, to see if the previous encoding
- is correct
- """
- batch_size = 32
- ds = tf.data.Dataset.from_tensor_slices(latent_vectors)
- ds = ds.batch(batch_size)
- for entry in ds.take(1):
- generated_images = self.vae.decoder(entry)
- for i in range(batch_size):
- img = utils.array_to_img(generated_images[i])
- img.save("writer_img_%03d.png" % (i))
- class TFRecordReader(object):
- """
- Class handling reading the TFRecord into a dataset to use for training
- Please check TFRecordWriter to see how it was saved to disk.
- The TFRecord to be read must have been created with TFRecordWriter
- """
- def __init__(self, tfrecordfile, batch_size = 64):
- self.BATCH_SIZE = batch_size
- self.tfrecordfile = tfrecordfile
- self.AUTOTUNE = tf.data.AUTOTUNE
- self.dataset = self.get_dataset() # this will set self.dataset
- self.dataset_iter = iter(self.dataset)
- def decode_frames(self, frames):
- parsed_data = tf.io.parse_tensor(frames, tf.float32)
- parsed_data = tf.reshape(parsed_data, [832, 200]) # explicit size needed for TPU
- return parsed_data
- def read_tfrecord(self, example):
- TFREC_FORMAT = {
- "frames": tf.io.FixedLenFeature([], tf.string), # tf.string means bytestring
- "recipe": tf.io.FixedLenFeature([4], tf.float32)
- }
- example = tf.io.parse_single_example(example, TFREC_FORMAT)
- video_latent_vectors = self.decode_frames(example['frames'])
- return video_latent_vectors, example['recipe']
- def load_dataset(self):
- """ Loads a TFRecord and uses map to parse it, and stores it into self.dataset
- Check https://keras.io/examples/keras_recipes/tfrecord/ "define load methods"
- because this is basically a copy paste of that code with small modifications
- Args:
- properties (list, optional): Check parse_fn above
- Returns:
- dataset: Loadad TFRecord
- """
- ignore_order = tf.data.Options()
- ignore_order.experimental_deterministic = False # disable order, increase speed
- dataset = tf.data.TFRecordDataset(
- self.tfrecordfile
- ) # automatically interleaves reads from multiple files
- dataset = dataset.with_options(
- ignore_order
- ) # uses data as soon as it streams in, rather than in its original order
- dataset = dataset.map(
- self.read_tfrecord,
- num_parallel_calls=self.AUTOTUNE
- )
- # returns the dataset as loaded
- return dataset
- def get_dataset(self):
- """Loads the TFRecord from the paths (filenames), and then shuffles the data and
- divides it into batches.
- """
- dataset = self.load_dataset()
- dataset = dataset.shuffle(2048)
- dataset = dataset.prefetch(buffer_size=self.AUTOTUNE)
- dataset = dataset.batch(self.BATCH_SIZE, drop_remainder=True)
- return dataset # .repeat()
- def visualise_latent_reconstructions_and_recipes(self, vaepath):
- batch_size = 32
- data = next(self.dataset_iter)[0] # returns 576,200 (or whatever latent size)
- ds = tf.data.Dataset.from_tensor_slices(data.numpy()[0])
- ds = ds.batch(batch_size) # returns for example 9,32,200
- vae, _ = load_vae_model(vaepath)
- for entry in ds.take(1):
- generated_images = vae.decoder(entry)
- for i in range(batch_size):
- img = utils.array_to_img(generated_images[i])
- img.save("reader_img_%03d.png" % (i))
- if __name__ == "__main__":
- videopaths = sys.argv[1]
- # tfrecord_file = sys.argv[1]
- modelpath = sys.argv[2]
- tfrecord_writer = TFRecordWriter(videopaths, modelpath, dillation="reverse")
- tfrecord_writer.serialise_to_tfrecords()
- # tfrecord_reader = TFRecordReader(tfrecord_file, batch_size=32)
- # tfrecord_reader.visualise_latent_reconstructions_and_recipes(modelpath)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement