Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from fbp import outport, inport, state, EOS
- # Decorators
- @inport('filename', type=str, description='the file path to be read')
- @outport('line', type=str, description='a single line read from the file, END-OF-STREAM when the content is over')
- @state # tipo il @pass_context di click
- async def cat(filename, line, state):
- """ Read a file one line at a time. """
- async with aiofiles.open(await filename.pop(), mode='r') as f:
- # restore state if present
- fpos = state.get('fpos', 0)
- await f.seek(fpos)
- async for fline in f:
- await line.push(fline)
- # save state
- state['fpos'] = await f.tell()
- await line.push(EOS)
- @component
- async def cat(filename, line, state):
- """ Read a file one line at a time.
- Parameters
- ----------
- filename: inport(type=str)
- the file path to be read
- line: outport(type=str)
- a single line read from the file, END-OF-STREAM when the content is over
- state: state
- """
- async with aiofiles.open(await filename.pop(), mode='r') as f:
- # restore state if present
- fpos = state.get('fpos', 0)
- await f.seek(fpos)
- async for fline in f:
- await line.push(fline)
- # save state
- state['fpos'] = await f.tell()
- await line.push(EOS)
Add Comment
Please, Sign In to add comment