class AsterixParser(object):
def __init__(self, categories_nb=None, start=None, end=None):
self.categories = category.get_asterix_categories(categories_nb)
self.start = start
self.end = end
self.cat_tables = {}
def convert(self, filename, h5filename):
"""Convert a final format file to HDF5 file"""
self.cat_tables = {}
with tables.openFile(h5filename, mode="w", title="ASTERIX Test file") as h5file:
group = h5file.createGroup("/", 'ASTERIX', 'ASTERIX records')
for number, category in self.categories.items():
self.cat_tables[number] = h5file.createTable(group,
category.name,
category.create_class(),
'ASTERIX %s' % category.name,
tables.Filters(complevel=1))
self._read_final_format_file(filename)
def _read_final_format_file(self, filename):
"""Read all final format blocks of filename"""
with open(filename, 'rb') as f:
map = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
while True:
try:
self._read_final_format(map)
except struct.error:
break
def _read_final_format(self, map):
"""Read one final format block from map"""
# header: 8 bytes / data: length - 8 - 4 / padding: 4 bytes
length, board_nb, line_nb, timestamp = struct.unpack('>HBBI', map.read(8))
# Convert timestamp to seconds
timestamp = timestamp / 100.0
data_blocks = map.read(length - 12)
# Only parse blocks between start and end time
if ((self.start is None or timestamp >= self.start) and
(self.end is None or timestamp <= self.end)):
offset = 0
while offset < length - 12:
offset += self._read_data_block(buffer(data_blocks, offset), timestamp)
# padding
map.read(4)
def _read_data_block(self, data, timestamp):
"""Read one data block from data"""
# header: 3 bytes / data: length - 3
category, length = struct.unpack_from('>BH', data)
# Check category
if category in self.cat_tables:
offset = 3
while offset < length - 3:
offset += self._read_record(buffer(data, offset), timestamp, category)
return length
def _get_uap(self, data, category):
"""Return the UAP of the given category"""
uap_bit_set = self.categories[category].uap_bit_set
if uap_bit_set is not None:
# The UAP depends on a specific bit (ex: cat001)
# Check this bit to use the proper UAP
byte_nb, bit_nb = uap_bit_set
value, = struct.unpack('>B', data[byte_nb])
if (value >> bit_nb) & 0x01:
uap_type = self.categories[category].uap_type_if_bit_set
else:
uap_type = self.categories[category].uap_type_if_bit_not_set
else:
uap_type = ''
return self.categories[category].uaps[uap_type]
def _read_record(self, data, timestamp, category):
"""Read one record from data"""
fspec, offset = self._get_fspec(data)
uap = self._get_uap(buffer(data, offset), category)
row = self.cat_tables[category].row
row['ff_timestamp'] = timestamp
for index, presence in enumerate(fspec):
if presence:
# UAP is a list with FRN - 1 as index
data_item, data_item_format = uap[index]
# Set the valid bool to True for this data item
row['%s_valid' % data_item] = True
subfields, size = self._read_data_item(buffer(data, offset), data_item_format)
nv_dict = collections.defaultdict(list)
for name, value in subfields:
nv_dict[name].append(value)
for name, values in nv_dict.items():
try:
if isinstance(row[name], np.ndarray):
# We can only assign an array of identical size
# to the table cell
# So use an intermediate array
array = np.copy(row[name])
array[0:len(values)] = values
row[name] = array
else:
# Check if len(values) == 1?
row[name] = values[0]
except ValueError:
logging.error('Error while assigning %s to %s', str(values), name)
if isinstance(row[name], np.ndarray) and len(values) > len(row[name]):
logging.error('Try to increase max number of extents to %d', len(values))
else:
logging.error('%s is not a valid enumerated value', values)
raise
except KeyError:
logging.error('Unknown key %s', name)
raise
offset += size
row.append()
return offset
def _read_data_item(self, data, data_item_format):
"""Return a tuple ([(name, value)], size) for all subfields of the data item"""
if data_item_format.type == 'Fixed':
# There is only one part
subfields, size = self._read_subfields(data, data_item_format.fields[0])
elif data_item_format.type == 'Variable':
# Read the first part
subfields, size = self._read_subfields(data, data_item_format.fields[0])
idx = 1
# FX is the last subfield
# subfields[-1] == ('FX', FX value)
while subfields[-1][1]:
# Read the extents while FX == 1
subfields_, size_ = self._read_subfields(buffer(data, size), data_item_format.fields[idx])
subfields.extend(subfields_)
size += size_
if idx != -1 and idx < (len(data_item_format.fields) - 1):
# Use the next extent format
idx += 1
else:
# Use the last defined extent format
idx = -1
elif data_item_format.type == 'Repetitive':
# Read the repetition indicator field
subfields, size = self._read_subfields(data, data_item_format.fields[0])
repetition = subfields[0][1]
while repetition:
# Read the subfield "repetition" times
subfields_, size_ = self._read_subfields(buffer(data, size), data_item_format.fields[1])
subfields.extend(subfields_)
size += size_
repetition -= 1
elif data_item_format.type == 'Explicit':
# Read the length indicator field
subfields, size = self._read_subfields(data, data_item_format.fields[0])
length = subfields[0][1]
# Try to decode other subfields?
size = length
elif data_item_format.type == 'Compound':
# data_item_format.fields is a list of DataItemFormat elements
# The primary subfield determines the presence or absence of the
# subsequent data subfields
# Should we return primary_subfields?
# To remove, we need to remove it from get_data_item_cols or
# _get_format
primary_item_format = data_item_format.fields[0]
primary_subfields, size = self._read_data_item(data, primary_item_format)
subfields = primary_subfields
data_subfields_idx, offset = self._get_fspec(data)
# To debug
if size != offset:
raise ValueError('Invalid size found when reading Compound primary field')
for index in data_subfields_idx:
data_item_format = data_item_format.fields[index + 1]
data_subfields, size_ = self._read_data_item(buffer(data, size), data_item_format)
size += size_
subfields.extend(data_subfields)
else:
raise ValueError('Unknown data item format %s' % data_item_format.type)
return (subfields, size)
def _read_subfields(self, data, data_item_format_field):
"""Return a tuple ([(name, value)], size) for the given subfields
subfields is a list of namedtuple SubFieldFormat
struct_format is a Struct object initialized with the format needed
to read all subfields
"""
subfields, struct_format = data_item_format_field
sizes = [subfield.size for subfield in subfields]
bits_size = sum(sizes)
bytes_size = bits_size / 8
types = [subfield.type for subfield in subfields]
if not frozenset(('bits', 'sbits', 'octal')).isdisjoint(frozenset(types)):
# Get the value
struct_values = struct_format.unpack_from(data)
if len(struct_values) == 1:
# only one uint read
value = struct_values[0]
else:
# Reconstruct value
value = 0
# Reverse the list to get the LSB in idx == 0
for idx, v in enumerate(reversed(struct_values)):
value += v << (idx * 8)
# and split it
values = self._unpack_bitfields(value, data, bits_size, sizes, types)
else:
values = list(struct_format.unpack_from(data))
for idx, sf_type in enumerate(types):
if sf_type == '6bitschar':
values[idx] = base64.b64encode(values[idx])
name_values = [(subfield.name, values[idx] * subfield.scale)
for idx, subfield in enumerate(subfields)]
return (name_values, bytes_size)
def _get_fspec(self, data):
"""Return a tuple (FSPEC, offset)
FSPEC is a list of FRN (FX field is removed)
offset is the number of bytes read in data
"""
fspec = []
offset = 0
fx = 1
while fx:
value, = struct.unpack('>B', data[offset])
# Add the 7 first bits value to the list
fspec.extend([(value >> i) & 0x01 for i in range(7, 0, -1)])
# Get fx (LSB)
fx = value & 0x01
offset += 1
return (fspec, offset)
def _unpack_bitfields(self, value, data, size, bf_sizes, bf_types):
offset = 0
values = []
for (bf_size, bf_type) in zip(bf_sizes, bf_types):
if bf_type == '6bitschar':
# TO check data index
#tokens = '>%ds' % (bf_size / 8)
#bitfield_val = base64.b64encode(tokens, struct.unpack_from(data[offset:]))
bitfield_val = base64.b64encode(data[offset:offset + bf_size])
else:
mask = pow(2, bf_size) - 1
bitfield_val = (value >> (size - offset - bf_size)) & mask
if bf_type == 'octal':
# Convert to octal and format on the right number of
# characters
nb_char = bf_size / 3
bitfield_val = oct(bitfield_val)[-nb_char:].zfill(nb_char)
elif bf_type == 'sbits':
# Convert to signed value
if bitfield_val & pow(2, bf_size - 1) != 0:
bitfield_val = bitfield_val - pow(2, bf_size)
offset += bf_size
values.append(bitfield_val)
return values