Guest User

Untitled

a guest
Nov 10th, 2017
107
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.85 KB | None | 0 0
  1. '''
  2. Postgres automatic message queue demo
  3. Wherein a record-insertion trigger notifies a worker to process new table records
  4.  
  5. Requires:
  6. - Python 3.6
  7. - Postgres 9.6
  8. - Psycopg2
  9.  
  10. Usage:
  11. CONNECTION_ARGS="--host localhost --port 5432 --user $USER --password xxxxx'
  12. message_queue_demo.py $CONNECTION_ARGS --install # Create requisite tables
  13. message_queue_demo.py $CONNECTION_ARGS --worker # One or more workers
  14. message_queue_demo.py $CONNECTION_ARGS --message "Message 1" "Message 2" ...
  15. '''
  16.  
  17. import argparse
  18. import getpass
  19. import json
  20. import psycopg2
  21. import sys
  22. import traceback
  23.  
  24. from contextlib import closing
  25. from textwrap import dedent
  26.  
  27. parser = argparse.ArgumentParser();
  28.  
  29. parser.add_argument('--host', help='Postgres hostname', default='localhost')
  30. parser.add_argument('--port', help='Postgres port', default=5432)
  31. parser.add_argument('--user', help='Postgres username', default=getpass.getuser())
  32. parser.add_argument('--password', help='Postgres password')
  33.  
  34. group = parser.add_mutually_exclusive_group();
  35. group.add_argument('--install', action='count')
  36. group.add_argument('--worker', action='count')
  37. group.add_argument('--message', type=str, nargs='*')
  38. args = parser.parse_args()
  39.  
  40. # Configuration
  41. TABLE_NAME = f'my_event_queue'
  42. CHANNEL_NAME = f'{TABLE_NAME}_channel'
  43. PROC_NAME = f'notify_{CHANNEL_NAME}'
  44. TRIGGER_NAME = f'trigger_{CHANNEL_NAME}'
  45.  
  46. SETUP_SCRIPT = dedent(f'''\
  47. DROP TRIGGER IF EXISTS {TRIGGER_NAME} on {TABLE_NAME};
  48. DROP FUNCTION IF EXISTS {PROC_NAME} ();
  49. DROP TABLE IF EXISTS {TABLE_NAME};
  50.  
  51. CREATE TABLE {TABLE_NAME} (
  52. id BIGSERIAL,
  53. state TEXT DEFAULT 'PENDING',
  54. error_detail TEXT,
  55. type TEXT,
  56. job JSONB
  57. );
  58.  
  59. CREATE OR REPLACE FUNCTION {PROC_NAME} ()
  60. RETURNS trigger
  61. LANGUAGE plpgsql
  62. AS
  63. $$
  64. BEGIN
  65. NOTIFY {CHANNEL_NAME};
  66. RETURN NEW;
  67. END;
  68. $$;
  69.  
  70. CREATE TRIGGER {TRIGGER_NAME}
  71. AFTER INSERT
  72. ON {TABLE_NAME}
  73. FOR EACH STATEMENT
  74. EXECUTE PROCEDURE {PROC_NAME}();
  75. ''')
  76.  
  77.  
  78. def get_conn():
  79. return psycopg2.connect(host=args.host, port=args.port, user=args.user, password=args.password)
  80.  
  81.  
  82. def process_queue(cursor):
  83. # Process event queue.
  84. # Due to the isolation level we set earlier, we need to manually create a transaction
  85. print(f'Processing queue')
  86. cursor.execute('START TRANSACTION');
  87. try:
  88. # I should have used an ORM here.
  89.  
  90. # Also, the way this works, you can only really have a single worker
  91. # going at a time -- each worker locks all the processable rows at time
  92. # of the query. A second worker could *maybe* come along and grab any new
  93. # events that are generated when the first is processing, but the balance
  94. # could by highly asymmetrical. It might be better to use LIMIT in the SELECT
  95. # clause to prevent a worker from claiming a bigger workload than it can
  96. # handle.
  97.  
  98. cursor.execute(dedent(f'''\
  99. SELECT id, state, type, job
  100. FROM {TABLE_NAME}
  101. WHERE state = %s AND type = %s
  102. FOR UPDATE
  103. SKIP LOCKED
  104. ;
  105. '''), (
  106. 'PENDING',
  107. 'ECHO',
  108. ))
  109.  
  110. results = cursor.fetchall()
  111. print(f'Processing {len(results)} items')
  112. for result in results:
  113. _id, _state, _type, _job = result
  114.  
  115. # Mark as "PROCESSING" so we know it was at least picked up
  116. # should anything go wrong.
  117. cursor.execute(dedent(f'''\
  118. UPDATE {TABLE_NAME}
  119. SET state = %s
  120. WHERE id = %s
  121. '''), (
  122. 'PROCESSING',
  123. _id,
  124. ))
  125. # Commit early and often so we remember which records have been processed.
  126. cursor.execute('COMMIT')
  127.  
  128. try:
  129. message = _job['message']
  130. assert 'cause error' not in message
  131. print(f'Message = \033[1;33m{message}\033[0m')
  132. except Exception:
  133. # Need to do something different for
  134. exception = traceback.format_exc()
  135.  
  136. cursor.execute(dedent(f'''\
  137. UPDATE {TABLE_NAME}
  138. SET state = %s, error_detail = %s
  139. WHERE id = %s
  140. '''), (
  141. 'ERROR',
  142. exception,
  143. _id,
  144. ))
  145. cursor.execute('COMMIT')
  146. else:
  147. cursor.execute(dedent(f'''\
  148. UPDATE {TABLE_NAME}
  149. SET state = %s
  150. WHERE id = %s
  151. '''), (
  152. 'PROCESSED',
  153. _id,
  154. ))
  155. cursor.execute('COMMIT')
  156. finally:
  157. cursor.execute('ROLLBACK')
  158.  
  159. ######
  160.  
  161. if args.install:
  162. with closing(get_conn()) as conn:
  163. with closing(conn.cursor()) as cursor:
  164. cursor.execute(SETUP_SCRIPT)
  165. conn.commit()
  166.  
  167. elif args.worker:
  168. with closing(get_conn()) as conn:
  169.  
  170. conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
  171.  
  172. with closing(conn.cursor()) as cursor:
  173. process_queue(cursor)
  174.  
  175. print(f'Waiting for notifications on channel {CHANNEL_NAME!r}')
  176. cursor.execute(f'LISTEN {CHANNEL_NAME};')
  177.  
  178. import select # package I've never heard of -- it's a UNIX system call wrapper around select(),
  179. # i.e. listen for port traffic
  180.  
  181. from psycopg2.extensions import Notify # Doesn't need to be imported; Just importing for annotation's sake
  182.  
  183. while True:
  184. # Tell the kernel to sleep until the connection receives data (I guess)
  185. # ARBITRARY_TIMEOUT_FROM_EXAMPLE = 5
  186. ARBITRARY_TIMEOUT_FROM_EXAMPLE = None
  187. if select.select([conn], [], [], ARBITRARY_TIMEOUT_FROM_EXAMPLE) == ([], [], []):
  188. # print(f'Timeout after {ARBITRARY_TIMEOUT_FROM_EXAMPLE} seconds (?)')
  189. pass
  190. else:
  191.  
  192. print(f'Polling')
  193. conn.poll()
  194. while conn.notifies:
  195. notify: Notify = conn.notifies.pop(0)
  196. print(f'Got NOTIFY: channel={notify.channel!r}, payload={notify.payload!r}, pid={notify.pid!r}')
  197.  
  198. process_queue(cursor)
  199.  
  200.  
  201. elif args.message:
  202. with closing(get_conn()) as conn:
  203. with closing(conn.cursor()) as cursor:
  204. for message in args.message:
  205. print(f"Inserting {message} into {TABLE_NAME}")
  206. cursor.execute(dedent(f'''\
  207. INSERT INTO {TABLE_NAME} (type, job)
  208. VALUES (%s, %s)
  209. '''), (
  210. 'ECHO',
  211. json.dumps({"message": message}),
  212. ))
  213. conn.commit()
Add Comment
Please, Sign In to add comment