Advertisement
Guest User

Untitled

a guest
Jul 14th, 2016
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.52 KB | None | 0 0
  1. import luigi
  2. import luigi.postgres
  3. import psycopg2
  4. import os
  5.  
  6. testdata = ([99, 'My fake plants died because I did not pretend to water'
  7. ' them.'],
  8. [100, 'I always arrive late at the office, '
  9. 'but I make up for it by leaving early.'],
  10. [101, u'∩ ∪'])
  11.  
  12.  
  13. class PgServiceTarget(luigi.postgres.PostgresTarget):
  14. """
  15. Target for a resource in PostgreSQL, overriding the standard PostgresTarget
  16. to use a pg_service.conf service name to make the connection instead of
  17. separate connection params
  18. """
  19. def __init__(self, service, update_id, table):
  20. """
  21. Args:
  22. service (str): the name of a service defined in local
  23. pg_service.conf file
  24. update_id (str): An identifier for this data set
  25. """
  26. self.service = service
  27. self.update_id = update_id
  28. self.table = table
  29.  
  30. def connect(self):
  31. """
  32. Get a psycopg2 connection object to the database where the table is.
  33. """
  34. connection = psycopg2.connect(
  35. service=self.service)
  36. connection.set_client_encoding('utf-8')
  37. return connection
  38.  
  39.  
  40. class PgServiceQuery(luigi.postgres.PostgresQuery):
  41. """
  42. Template task for querying a PostgreSQL database, with standard output
  43. method overridden to return a PgServiceTarget
  44. """
  45. # Handle the properties expected by abstract class rdbms.Query
  46. host = None
  47. database = None
  48. user = None
  49. password = None
  50.  
  51. def output(self):
  52. return PgServiceTarget(service=self.service, table=self.table, update_id=self.update_id)
  53.  
  54.  
  55. class PgCopyToTable(luigi.postgres.CopyToTable):
  56. """
  57. Template task for inserting a data set into Postgres via PgServiceTarget
  58. """
  59. # Handle the properties expected by abstract class rdbms.Query
  60. host = None
  61. database = None
  62. user = None
  63. password = None
  64.  
  65. def output(self):
  66. return PgServiceTarget(service=self.service, table=self.table, update_id=self.update_id)
  67.  
  68.  
  69. class PgExampleQuery(PgServiceQuery):
  70. env_service = os.getenv('PGSERVICE', 'local')
  71. service = luigi.Parameter(default=env_service)
  72. table = 'whoa'
  73. query = 'create table whoa (trival_id serial, whoa_txt text)'
  74.  
  75.  
  76. class PgExampleLoad(PgCopyToTable):
  77. env_service = os.getenv('PGSERVICE', 'local')
  78. service = luigi.Parameter(default=env_service)
  79. table = 'whoa'
  80. columns = [("trival_id", "INT"),
  81. ("description", "TEXT")]
  82.  
  83. def rows(self):
  84. for record in testdata:
  85. yield record
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement