Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pathlib import Path
- from flytekit import workflow, task, FlyteFile, ContainerTask, kwtypes
- import logging
- logging.basicConfig(level=logging.INFO)
- translate = ContainerTask(
- name="Translate",
- image="ubuntu:latest",
- input_data_dir="/var/inputs",
- output_data_dir="/var/outputs",
- inputs=kwtypes(input_file=FlyteFile),
- outputs=kwtypes(output_file=FlyteFile),
- command=["/bin/sh", "-c", "cat /var/inputs/input_file | sed -r 's/Hello/Goodbye/g' > /var/outputs/result"],
- )
- @task
- def read_file() -> FlyteFile:
- return FlyteFile(
- path=Path('hello_world.txt'))
- @task
- def translate_file(data: FlyteFile) -> FlyteFile:
- with open(data, 'r') as f:
- logging.info(f'Input: {f.read()}')
- out = translate(input_file=data)
- return out,
- @task
- def log_result(data: FlyteFile) -> None:
- with open(data, 'r') as f:
- result = f.read()
- logging.info(f'Result: {result}')
- @workflow
- def main():
- file = read_file()
- outcome = translate_file(file)
- log_result(outcome.out_file)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement