Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # BATCH-NORMALIZED FEATURE MATRICES
- dataset = reader.read.format("com.databricks.spark.avro").load("data/collisions_feature_matrices_batch_normalized.avro")
- # Read the collisions dataset to obtain meta-information about the tracks.
- collisions = reader.read.format("com.databricks.spark.avro").load("data/collisions.avro")
- def extract_track_types(iterator):
- for row in iterator:
- tracks = row['tracks']
- for t in tracks:
- yield t['track_type']
- # Obtain the files from which we extracted the collisions.
- files = collisions.mapPartitions(extract_track_types).distinct().collect()
- def construct_output_vector(row):
- collision_id = row['id']
- tracks = row['tracks']
- files = []
- for t in tracks:
- file = t['track_type']
- if file not in files:
- files.append(file)
- # Construct the output vector.
- y = np.zeros(num_types)
- for f in files:
- y[mapping[f]] = 1.0
- return Row(**{'id': collision_id, 'y': y.tolist()})
- # From this, construct a feature vector which represents the track types for every collision-id.
- output_vectors = collisions.map(construct_output_vector).toDF()
- def flatten(row):
- # Obtain the collision-id.
- collision_id = row['collision_id']
- # Obtain the feature matrices, and flatten them.
- m_f = np.asarray(row['front']).flatten()
- m_s = np.asarray(row['side']).flatten()
- return Row(**{'collision_id': collision_id, 'front': m_f.tolist(), 'side': m_s.tolist()})
- training_set = dataset.map(flatten).toDF()
- training_set = training_set.join(output_vectors, training_set.collision_id == output_vectors.id)
- training_set = training_set.select("collision_id", "front", "side", "y")
- training_set.persist(StorageLevel.MEMORY_AND_DISK)
- training_set.printSchema()
- print("Number of training samples: " + str(training_set.count()))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement