Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- '''
- Postgres automatic message queue demo
- Wherein a record-insertion trigger notifies a worker to process new table records
- Requires:
- - Python 3.6
- - Postgres 9.6
- - Psycopg2
- Usage:
- CONNECTION_ARGS="--host localhost --port 5432 --user $USER --password xxxxx'
- message_queue_demo.py $CONNECTION_ARGS --install # Create requisite tables
- message_queue_demo.py $CONNECTION_ARGS --worker # One or more workers
- message_queue_demo.py $CONNECTION_ARGS --message "Message 1" "Message 2" ...
- '''
- import argparse
- import getpass
- import json
- import psycopg2
- import sys
- import traceback
- from contextlib import closing
- from textwrap import dedent
- parser = argparse.ArgumentParser();
- parser.add_argument('--host', help='Postgres hostname', default='localhost')
- parser.add_argument('--port', help='Postgres port', default=5432)
- parser.add_argument('--user', help='Postgres username', default=getpass.getuser())
- parser.add_argument('--password', help='Postgres password')
- group = parser.add_mutually_exclusive_group();
- group.add_argument('--install', action='count')
- group.add_argument('--worker', action='count')
- group.add_argument('--message', type=str, nargs='*')
- args = parser.parse_args()
- # Configuration
- TABLE_NAME = f'my_event_queue'
- CHANNEL_NAME = f'{TABLE_NAME}_channel'
- PROC_NAME = f'notify_{CHANNEL_NAME}'
- TRIGGER_NAME = f'trigger_{CHANNEL_NAME}'
- SETUP_SCRIPT = dedent(f'''\
- DROP TRIGGER IF EXISTS {TRIGGER_NAME} on {TABLE_NAME};
- DROP FUNCTION IF EXISTS {PROC_NAME} ();
- DROP TABLE IF EXISTS {TABLE_NAME};
- CREATE TABLE {TABLE_NAME} (
- id BIGSERIAL,
- state TEXT DEFAULT 'PENDING',
- error_detail TEXT,
- type TEXT,
- job JSONB
- );
- CREATE OR REPLACE FUNCTION {PROC_NAME} ()
- RETURNS trigger
- LANGUAGE plpgsql
- AS
- $$
- BEGIN
- NOTIFY {CHANNEL_NAME};
- RETURN NEW;
- END;
- $$;
- CREATE TRIGGER {TRIGGER_NAME}
- AFTER INSERT
- ON {TABLE_NAME}
- FOR EACH STATEMENT
- EXECUTE PROCEDURE {PROC_NAME}();
- ''')
- def get_conn():
- return psycopg2.connect(host=args.host, port=args.port, user=args.user, password=args.password)
- def process_queue(cursor):
- # Process event queue.
- # Due to the isolation level we set earlier, we need to manually create a transaction
- print(f'Processing queue')
- cursor.execute('START TRANSACTION');
- try:
- # I should have used an ORM here.
- # Also, the way this works, you can only really have a single worker
- # going at a time -- each worker locks all the processable rows at time
- # of the query. A second worker could *maybe* come along and grab any new
- # events that are generated when the first is processing, but the balance
- # could by highly asymmetrical. It might be better to use LIMIT in the SELECT
- # clause to prevent a worker from claiming a bigger workload than it can
- # handle.
- cursor.execute(dedent(f'''\
- SELECT id, state, type, job
- FROM {TABLE_NAME}
- WHERE state = %s AND type = %s
- FOR UPDATE
- SKIP LOCKED
- ;
- '''), (
- 'PENDING',
- 'ECHO',
- ))
- results = cursor.fetchall()
- print(f'Processing {len(results)} items')
- for result in results:
- _id, _state, _type, _job = result
- # Mark as "PROCESSING" so we know it was at least picked up
- # should anything go wrong.
- cursor.execute(dedent(f'''\
- UPDATE {TABLE_NAME}
- SET state = %s
- WHERE id = %s
- '''), (
- 'PROCESSING',
- _id,
- ))
- # Commit early and often so we remember which records have been processed.
- cursor.execute('COMMIT')
- try:
- message = _job['message']
- assert 'cause error' not in message
- print(f'Message = \033[1;33m{message}\033[0m')
- except Exception:
- # Need to do something different for
- exception = traceback.format_exc()
- cursor.execute(dedent(f'''\
- UPDATE {TABLE_NAME}
- SET state = %s, error_detail = %s
- WHERE id = %s
- '''), (
- 'ERROR',
- exception,
- _id,
- ))
- cursor.execute('COMMIT')
- else:
- cursor.execute(dedent(f'''\
- UPDATE {TABLE_NAME}
- SET state = %s
- WHERE id = %s
- '''), (
- 'PROCESSED',
- _id,
- ))
- cursor.execute('COMMIT')
- finally:
- cursor.execute('ROLLBACK')
- ######
- if args.install:
- with closing(get_conn()) as conn:
- with closing(conn.cursor()) as cursor:
- cursor.execute(SETUP_SCRIPT)
- conn.commit()
- elif args.worker:
- with closing(get_conn()) as conn:
- conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
- with closing(conn.cursor()) as cursor:
- process_queue(cursor)
- print(f'Waiting for notifications on channel {CHANNEL_NAME!r}')
- cursor.execute(f'LISTEN {CHANNEL_NAME};')
- import select # package I've never heard of -- it's a UNIX system call wrapper around select(),
- # i.e. listen for port traffic
- from psycopg2.extensions import Notify # Doesn't need to be imported; Just importing for annotation's sake
- while True:
- # Tell the kernel to sleep until the connection receives data (I guess)
- # ARBITRARY_TIMEOUT_FROM_EXAMPLE = 5
- ARBITRARY_TIMEOUT_FROM_EXAMPLE = None
- if select.select([conn], [], [], ARBITRARY_TIMEOUT_FROM_EXAMPLE) == ([], [], []):
- # print(f'Timeout after {ARBITRARY_TIMEOUT_FROM_EXAMPLE} seconds (?)')
- pass
- else:
- print(f'Polling')
- conn.poll()
- while conn.notifies:
- notify: Notify = conn.notifies.pop(0)
- print(f'Got NOTIFY: channel={notify.channel!r}, payload={notify.payload!r}, pid={notify.pid!r}')
- process_queue(cursor)
- elif args.message:
- with closing(get_conn()) as conn:
- with closing(conn.cursor()) as cursor:
- for message in args.message:
- print(f"Inserting {message} into {TABLE_NAME}")
- cursor.execute(dedent(f'''\
- INSERT INTO {TABLE_NAME} (type, job)
- VALUES (%s, %s)
- '''), (
- 'ECHO',
- json.dumps({"message": message}),
- ))
- conn.commit()
Add Comment
Please, Sign In to add comment