Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //##### TEST DATABASE SETUP #####
- //
- //psql -tc "SELECT 1 FROM pg_database WHERE datname = 'notify_listen_test'" | grep -q 1 || psql -c "CREATE DATABASE notify_listen_test"
- //psql notify_listen_test -c 'CREATE EXTENSION hstore'
- //
- //##### MUST CREATE TRIGGER FUNCTION INTERACTIVELY #####
- //psql notify_listen_test
- //
- //CREATE OR REPLACE FUNCTION notify_table_changed()
- //RETURNS TRIGGER AS $$
- // DECLARE
- //id BIGINT;
- //BEGIN
- //IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE'
- //THEN
- //id = NEW.id;
- //ELSE
- //id = OLD.id;
- //END IF;
- //
- //IF TG_OP = 'UPDATE'
- //THEN
- //PERFORM pg_notify(tg_relname,
- // json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP,
- //'old_row', row_to_json(OLD), 'new_row', row_to_json(NEW), 'changes',
- //hstore_to_json(hstore(NEW) - hstore(OLD))) :: TEXT);
- //RETURN NEW;
- //END IF;
- //
- //IF TG_OP = 'INSERT'
- //THEN
- //PERFORM pg_notify(tg_relname,
- // json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP,
- //'row', row_to_json(NEW)) :: TEXT);
- //RETURN NEW;
- //END IF;
- //
- //IF TG_OP = 'DELETE'
- //THEN
- //PERFORM pg_notify(tg_relname,
- // json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP,
- //'row', row_to_json(OLD)) :: TEXT);
- //RETURN OLD;
- //END IF;
- //END;
- //$$
- //LANGUAGE plpgsql;
- //
- ///q
- //##### MUST CREATE TRIGGER FUNCTION INTERACTIVELY #####
- //
- //psql notify_listen_test -c 'CREATE TABLE IF NOT EXISTS products (id serial PRIMARY KEY, name text NOT NULL, category_id smallint NOT NULL)'
- //psql notify_listen_test -c 'CREATE TRIGGER products_notify_table_changed AFTER INSERT OR UPDATE OR DELETE ON products FOR EACH ROW EXECUTE PROCEDURE notify_table_changed()'
- //psql notify_listen_test -c 'CREATE TABLE IF NOT EXISTS orders (id serial PRIMARY KEY, product_id int NOT NULL REFERENCES products (id), quantity int NOT NULL)'
- //psql notify_listen_test -c 'CREATE TRIGGER orders_notify_table_changed AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE notify_table_changed()'
- //
- //##### TEST DATABASE SETUP #####
- //##### START SPARK SHELL WITH REQUISITE PACKAGES #####
- //
- //spark-shell --packages "org.scalikejdbc:scalikejdbc_2.11:3.1.0,org.postgresql:postgresql:42.1.4,com.typesafe.akka:akka-actor_2.11:2.5.8"
- //
- //##### START SPARK SHELL WITH REQUISITE PACKAGES #####
- //##### AFTER PASTING ALL THE SCALA CODE BELOW INTO THE SPARK-SHELL SESSION, START DOING DB STUFF TO SEE NOTIFICATIONS #####
- //
- //psql notify_listen_test -c "INSERT INTO products(name, category_id) VALUES ('coffee cup', 1), ('pencil', 2), ('shoe', 3)"
- //psql notify_listen_test -c "INSERT INTO orders(product_id, quantity) VALUES (3, 5), (1, 4), (2, 5)"
- //psql notify_listen_test -c "UPDATE orders SET quantity = 10 WHERE id = 3"
- //psql notify_listen_test -c "DELETE FROM orders WHERE id = 3"
- //
- //##### AFTER PASTING ALL THE SCALA CODE BELOW INTO THE SPARK-SHELL SESSION, START DOING DB STUFF TO SEE NOTIFICATIONS #####
- //
- //##### IF YOU WANT TO CLEAN UP THE PRODUCTS AND ORDERS TABLES #####
- //psql notify_listen_test -c 'TRUNCATE TABLE products RESTART IDENTITY CASCADE'
- //##### IF YOU WANT TO CLEAN UP THE PRODUCTS AND ORDERS TABLES #####
- //
- //##### IF YOU WANT TO REMOVE THE DB AND START OVER (must kill all connections first) #####
- //dropdb notify_listen_test
- //##### IF YOU WANT TO REMOVE THE DB AND START OVER (must kill all connections first) #####
- class Poller(DBUri: String, properties: java.util.Properties, channel: String, pollingInterval: Int) extends akka.actor.Actor {
- import java.sql.{Driver, DriverManager}
- import org.postgresql.{PGNotification, PGConnection}
- import scalikejdbc._
- import scala.concurrent.duration._
- // execution context for the ticks
- import context.dispatcher
- val driver = "org.postgresql.Driver"
- Class.forName(driver)
- val connectionDriver = Class.forName(driver).newInstance().asInstanceOf[Driver]
- DriverManager.registerDriver(connectionDriver)
- val connection = DriverManager.getConnection(DBUri, properties)
- val pollingIntervalDuration = Duration(pollingInterval, "millis")
- val db: DB = DB(connection)
- val tick = context.system.scheduler.schedule(500 millis, pollingIntervalDuration, self, "tick")
- override def preStart() = {
- // make sure connection isn't closed when executing queries
- db.autoClose(false)
- db.localTx { implicit session =>
- SQL(s"LISTEN $channel").execute().apply()
- }
- }
- override def postStop() = {
- tick.cancel()
- db.close()
- }
- def receive = {
- case "tick" =>
- db.readOnly { implicit session =>
- val pgConnection = connection.unwrap(Class.forName("org.postgresql.PGConnection")).asInstanceOf[PGConnection]
- val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
- notifications.foreach( not => {
- println(s"Received for: ${not.getName} from process with PID: ${not.getPID}")
- println(s"Received data: ${not.getParameter} ")
- }
- )
- }
- }
- }
- import java.util.Properties
- import akka.actor.{Props, ActorSystem, Actor}
- object PostgresNotifications extends App {
- // initialize the actor system
- val system = ActorSystem("Hello")
- // val a = system.actorOf(Props[Poller], "poller")
- val DBUri = "jdbc:postgresql://localhost:5432/notify_listen_test"
- val properties = new Properties
- system.actorOf(Props(new Poller(DBUri, properties, "products", 1000)), "products_poller")
- system.actorOf(Props(new Poller(DBUri, properties, "orders", 1000)), "orders_poller")
- // wait for the user to stop the server
- println("Press <enter> to exit.")
- Console.in.read.toChar
- system.terminate
- }
- PostgresNotifications.main(Array.empty[String])
Add Comment
Please, Sign In to add comment