Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import luigi
- import luigi.postgres
- import psycopg2
- import os
- testdata = ([99, 'My fake plants died because I did not pretend to water'
- ' them.'],
- [100, 'I always arrive late at the office, '
- 'but I make up for it by leaving early.'],
- [101, u'∩ ∪'])
- class PgServiceTarget(luigi.postgres.PostgresTarget):
- """
- Target for a resource in PostgreSQL, overriding the standard PostgresTarget
- to use a pg_service.conf service name to make the connection instead of
- separate connection params
- """
- def __init__(self, service, update_id, table):
- """
- Args:
- service (str): the name of a service defined in local
- pg_service.conf file
- update_id (str): An identifier for this data set
- """
- self.service = service
- self.update_id = update_id
- self.table = table
- def connect(self):
- """
- Get a psycopg2 connection object to the database where the table is.
- """
- connection = psycopg2.connect(
- service=self.service)
- connection.set_client_encoding('utf-8')
- return connection
- class PgServiceQuery(luigi.postgres.PostgresQuery):
- """
- Template task for querying a PostgreSQL database, with standard output
- method overridden to return a PgServiceTarget
- """
- # Handle the properties expected by abstract class rdbms.Query
- host = None
- database = None
- user = None
- password = None
- def output(self):
- return PgServiceTarget(service=self.service, table=self.table, update_id=self.update_id)
- class PgCopyToTable(luigi.postgres.CopyToTable):
- """
- Template task for inserting a data set into Postgres via PgServiceTarget
- """
- # Handle the properties expected by abstract class rdbms.Query
- host = None
- database = None
- user = None
- password = None
- def output(self):
- return PgServiceTarget(service=self.service, table=self.table, update_id=self.update_id)
- class PgExampleQuery(PgServiceQuery):
- env_service = os.getenv('PGSERVICE', 'local')
- service = luigi.Parameter(default=env_service)
- table = 'whoa'
- query = 'create table whoa (trival_id serial, whoa_txt text)'
- class PgExampleLoad(PgCopyToTable):
- env_service = os.getenv('PGSERVICE', 'local')
- service = luigi.Parameter(default=env_service)
- table = 'whoa'
- columns = [("trival_id", "INT"),
- ("description", "TEXT")]
- def rows(self):
- for record in testdata:
- yield record
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement