Guest User

Untitled

a guest
Jan 13th, 2019
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.62 KB | None | 0 0
  1. The upgrade guide describes the changes you are *require* to make to install 0.3.0. This guide describes the changes you *should* make in order to use the latest capabilities. The new concepts take some getting used to, but are quite powerful.
  2.  
  3. Resources
  4. ---------
  5.  
  6. In 0.2.0 the notion of resources were relatively informal. This is no longer true. They are now an officially supported abstraction. They break apart context creation in a composable, reusable chunks of software.
  7.  
  8. **Defining a Resource**
  9.  
  10.  
  11. Let's take the unittest context in the allscripts_fileload pipeline as an example.
  12.  
  13. ```
  14. def define_unittest_context():
  15. return PipelineContextDefinition(
  16. config_field=Field(
  17. Dict(
  18. {
  19. 'data_source_run_id' : _data_source_run_id_field(),
  20. 'conf' : _conf_field(),
  21. 'log_level' : _log_level_field(),
  22. 'cleanup_files' : _cleanup_field(),
  23. },
  24. )
  25. ),
  26. context_fn=create_allscripts_fileload_unittest_context,
  27. description='''
  28. Context for use in unit tests. It does not allow for any interaction with aws
  29. or s3, and can only be used for a subset of the pipeline that can execute on a
  30. local machine.
  31.  
  32. This context does not log to file and also has a configurable log_level.
  33. '''
  34. )
  35.  
  36. def create_allscripts_fileload_unittest_context(info):
  37. data_source_run_id = info.config['data_source_run_id']
  38. log_level = level_from_string(info.config['log_level'])
  39. pipeline_run_id = str(uuid.uuid4())
  40.  
  41. resources = AllscriptsFileloadResources(
  42. aws=None,
  43. redshift=None,
  44. bucket_path=None,
  45. local_fs=LocalFsHandleResource.for_pipeline_run(pipeline_run_id),
  46. sa=None,
  47. cooper_pair=None,
  48. pipeline_guid=data_source_run_id)
  49.  
  50. yield ExecutionContext(
  51. loggers=[define_colored_console_logger('dagster', log_level)],
  52. resources=resources,
  53. context_stack={
  54. 'data_source_run_id': data_source_run_id,
  55. 'data_source': 'allscripts',
  56. 'pipeline_run_id': pipeline_run_id,
  57. },
  58. )
  59. ```
  60.  
  61. That's quite the ball of wax for what should be relatively straightforward. And this doesn't even include the boilerplate AllScriptsFileloadResources class as well. We're going to break this apart and eliminate the need for that class.
  62.  
  63. The only real reusable resource here is the LocalFsHandleResource, so let's break that out into it's own Resource.
  64.  
  65. ```
  66. def define_local_fs_resource():
  67. def _create_resource(info):
  68. resource = LocalFsHandleResource.for_pipeline_run(info.run_id)
  69. yield resource
  70. if info.config['cleanup_files']:
  71. LocalFsHandleResource.clean_up_dir(info.run_id)
  72.  
  73. return ResourceDefinition(
  74. resource_fn=_create_resource,
  75. config_field=Field(
  76. Dict({'cleanup_files': Field(Bool, is_optional=True, default_value=True)})
  77. ),
  78. )
  79. ```
  80.  
  81. This is now a self-contained piece that can be reused in other contexts as well.
  82.  
  83. Aside: We now guarantee a system-generated run_id, so the manually created pipeline_guid resource is no longer relevant. T
  84.  
  85. The rest of the "resources" in the unittesting context are None, and we have a special helper to create "none" resources.
  86.  
  87. Let's put it all together:
  88.  
  89. ```
  90. def define_unittest_context():
  91. return PipelineContextDefinition(
  92. config_field=Field(Dict({
  93. 'log_level' : _log_level_field(),
  94. 'data_source_run_id': _data_source_run_id_field(),
  95. })),
  96. resources={
  97. 'local_fs': define_local_fs_resource(),
  98. 'aws': ResourceDefinition.none_resource(),
  99. 'redshift': ResourceDefinition.none_resource(),
  100. 'bucket_path': ResourceDefinition.none_resource(),
  101. 'sa': ResourceDefinition.none_resource(),
  102. 'cooper_pair': ResourceDefinition.none_resource(),
  103. },
  104. context_fn=create_allscripts_fileload_unittest_context,
  105. description='''
  106. Context for use in unit tests. It does not allow for any interaction with aws
  107. or s3, and can only be used for a subset of the pipeline that can execute on a
  108. local machine.
  109.  
  110. This context does not log to file and also has a configurable log_level.
  111. '''
  112. )
  113.  
  114. def create_allscripts_fileload_unittest_context(info):
  115. data_source_run_id = info.config['data_source_run_id']
  116. log_level = level_from_string(info.config['log_level'])
  117.  
  118. yield ExecutionContext(
  119. loggers=[define_colored_console_logger('dagster', log_level)],
  120. context_stack={
  121. 'data_source_run_id': data_source_run_id,
  122. 'data_source': 'allscripts',
  123. },
  124. )
  125. ```
  126.  
  127. Notice a few things. The bulk of the context creation function is now gone. Instead of having to manually create the `AllscriptsFileloadResources`, that is replaced by a class (a `namedtuple`) that is system-synthesized. Predictably it has N fields, one for each resource. The pipeline-code-facing API is the same, it just requires less boilerplate within the pipeline infrastructure.
  128.  
  129. **Configuring a Resource**
  130.  
  131. The configuration schema changes, as each resource has it's own section.
  132.  
  133. Before:
  134.  
  135. ```
  136. environment = {
  137. 'context':{
  138. 'unittest' : {
  139. 'config' : {
  140. 'data_source_run_id': str(uuid.uuid4()),
  141. 'conf': CONF,
  142. 'log_level': 'ERROR',
  143. 'cleanup_files': False,
  144. }
  145. }
  146. },
  147. 'solids': {
  148. 'unzip_file': {
  149. 'config' : {
  150. 'zipped_file': ZIP_FILE_PATH,
  151. }
  152. }
  153. }
  154. }
  155. ```
  156.  
  157. In particular we need to move `cleanup_files` to a resource section of the config.
  158.  
  159. ```
  160. environment = {
  161. 'context':{
  162. 'unittest' : {
  163. 'config' : {
  164. 'data_source_run_id': str(uuid.uuid4()),
  165. 'log_level': 'ERROR',
  166. },
  167. 'resources' : {
  168. 'local_fs': {
  169. 'config' : {
  170. 'cleanup_files': False,
  171. }
  172. }
  173. }
  174. }
  175. },
  176. 'solids': {
  177. 'unzip_file': {
  178. 'config' : {
  179. 'zipped_file': ZIP_FILE_PATH,
  180. }
  181. }
  182. }
  183. }
  184. ```
  185.  
  186. While slightly more verbose, you will be able to count on more consistent of configuration between pipelines as you reuse resources, and you an even potentially share resource configuration *between* pipelines using the configuration file merging feature of 0.3.0
  187.  
  188. Resource Libraries
  189. ------------------
  190.  
  191. The real promise of resources to build a library of resuable, composable resources.
  192.  
  193. For example, here would be a resource to create a redshift connection.
  194.  
  195. ```
  196. def define_redshift_sa_resource():
  197. def _create_resource(info):
  198. user = info.config['user']
  199. password = info.config['password']
  200. host = info.config['host']
  201. port = info.config['port']
  202. dbname = info.config['dbname']
  203. return sa.create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')
  204.  
  205. return ResourceDefinition(
  206. resource_fn=_create_resource,
  207. config_field=Field(
  208. Dict(
  209. {
  210. 'user' : Field(String),
  211. 'password' : Field(String),
  212. 'host' : Field(String),
  213. 'port' : Field(Int),
  214. 'dbname' : Field(String),
  215. }
  216. )
  217. )
  218. )
  219. ```
  220.  
  221. This could be used -- unmodified -- across all your pipelines. This will also make it easier to write reusable solids as they can know that they will be using the same resource. Indeed, we may formalize this in subsequent releases, allowing solids to formally declare their dependencies on specific resource types.
Add Comment
Please, Sign In to add comment