Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- """
- Concat spectrum from different sensors.
- """
- from io import StringIO
- import logging
- import click
- import coloredlogs
- import numpy as np
- import pandas as pd
- coloredlogs.install(
- level='DEBUG',
- fmt='%(asctime)s %(module)s[%(process)d] %(levelname)s %(message)s',
- datefmt='%H:%M:%S'
- )
- logger = logging.getLogger(__name__)
- def load_file(path, offset=22):
- with open(path, 'r') as fd:
- lines = fd.readlines()
- if offset:
- lines = lines[offset:]
- buffer = StringIO(''.join(lines))
- df = pd.read_csv(
- buffer,
- sep='\t', # parse by tabs
- names=['wavelength', 'intensity'], index_col=False
- )
- df.set_index('wavelength', inplace=True)
- return df
- def concat_data(x, y, mode='lstsq'):
- """
- Concat two spectrum X and Y using specified region as reference.
- """
- overlap = x.index & y.index
- xo, yo = x.loc[overlap], y.loc[overlap]
- if mode == 'lstsq':
- A = np.vstack([xo['intensity'].values, np.ones(len(xo))]).T
- m, c = np.linalg.lstsq(A, yo['intensity'].values, rcond=None)[0]
- y = (y-c)/m
- elif mode == 'last':
- y /= yo.iloc[-1]/xo.iloc[-1]
- else:
- raise ValueError("unknown concatenation method")
- return x.combine_first(y)
- def normalize_peak(df, lpf=500.):
- logger.info("using data after {:.2f} nm to normalize".format(lpf))
- df_max = df[df.index > lpf].max()
- df /= df_max
- return df
- @click.command()
- @click.argument('short', type=click.Path(exists=True))
- @click.argument('long', type=click.Path(exists=True))
- @click.argument('output')
- @click.option('--mode', type=click.Choice(['lstsq', 'last']),
- help='Method to determine concatenation ratio.')
- @click.option('--no-norm', 'norm', is_flag=True, default=False,
- help='Do not normalize the result to [0, 1].')
- @click.option('--lpf', type=np.float32, default=500.,
- help='Long-pass frequency used in normalization, default 500.0 nm.')
- def main(short, long, output, mode, norm, lpf):
- """
- This script concat spectrum from SHORT and LONG file and save the resolved
- result in OUTPUT. Spectrum from SHORT is favored over LONG.
- """
- sh_data = load_file(short)
- ln_data = load_file(long)
- if mode is None:
- mode = 'lstsq'
- df = concat_data(sh_data, ln_data, mode)
- if not norm:
- df = normalize_peak(df, lpf)
- df.to_csv(output, index_label='wavelength', header=True, float_format='%.6g')
- logger.info("result saved to \"{}\"".format(output))
- if __name__ == '__main__':
- try:
- main()
- except Exception as e:
- logger.exception(str(e))
Add Comment
Please, Sign In to add comment