Guest User

Untitled

a guest
Jan 19th, 2018
150
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.56 KB | None | 0 0
  1. //##### TEST DATABASE SETUP #####
  2. //
  3. //psql -tc "SELECT 1 FROM pg_database WHERE datname = 'notify_listen_test'" | grep -q 1 || psql -c "CREATE DATABASE notify_listen_test"
  4. //psql notify_listen_test -c 'CREATE EXTENSION hstore'
  5. //
  6. //##### MUST CREATE TRIGGER FUNCTION INTERACTIVELY #####
  7. //psql notify_listen_test
  8. //
  9. //CREATE OR REPLACE FUNCTION notify_table_changed()
  10. //RETURNS TRIGGER AS $$
  11. // DECLARE
  12. //id BIGINT;
  13. //BEGIN
  14. //IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE'
  15. //THEN
  16. //id = NEW.id;
  17. //ELSE
  18. //id = OLD.id;
  19. //END IF;
  20. //
  21. //IF TG_OP = 'UPDATE'
  22. //THEN
  23. //PERFORM pg_notify(tg_relname,
  24. // json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP,
  25. //'old_row', row_to_json(OLD), 'new_row', row_to_json(NEW), 'changes',
  26. //hstore_to_json(hstore(NEW) - hstore(OLD))) :: TEXT);
  27. //RETURN NEW;
  28. //END IF;
  29. //
  30. //IF TG_OP = 'INSERT'
  31. //THEN
  32. //PERFORM pg_notify(tg_relname,
  33. // json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP,
  34. //'row', row_to_json(NEW)) :: TEXT);
  35. //RETURN NEW;
  36. //END IF;
  37. //
  38. //IF TG_OP = 'DELETE'
  39. //THEN
  40. //PERFORM pg_notify(tg_relname,
  41. // json_build_object('schema', TG_TABLE_SCHEMA, 'table', TG_TABLE_NAME, 'id', id, 'type', TG_OP,
  42. //'row', row_to_json(OLD)) :: TEXT);
  43. //RETURN OLD;
  44. //END IF;
  45. //END;
  46. //$$
  47. //LANGUAGE plpgsql;
  48. //
  49. ///q
  50. //##### MUST CREATE TRIGGER FUNCTION INTERACTIVELY #####
  51. //
  52. //psql notify_listen_test -c 'CREATE TABLE IF NOT EXISTS products (id serial PRIMARY KEY, name text NOT NULL, category_id smallint NOT NULL)'
  53. //psql notify_listen_test -c 'CREATE TRIGGER products_notify_table_changed AFTER INSERT OR UPDATE OR DELETE ON products FOR EACH ROW EXECUTE PROCEDURE notify_table_changed()'
  54. //psql notify_listen_test -c 'CREATE TABLE IF NOT EXISTS orders (id serial PRIMARY KEY, product_id int NOT NULL REFERENCES products (id), quantity int NOT NULL)'
  55. //psql notify_listen_test -c 'CREATE TRIGGER orders_notify_table_changed AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE notify_table_changed()'
  56. //
  57. //##### TEST DATABASE SETUP #####
  58.  
  59. //##### START SPARK SHELL WITH REQUISITE PACKAGES #####
  60. //
  61. //spark-shell --packages "org.scalikejdbc:scalikejdbc_2.11:3.1.0,org.postgresql:postgresql:42.1.4,com.typesafe.akka:akka-actor_2.11:2.5.8"
  62. //
  63. //##### START SPARK SHELL WITH REQUISITE PACKAGES #####
  64.  
  65. //##### AFTER PASTING ALL THE SCALA CODE BELOW INTO THE SPARK-SHELL SESSION, START DOING DB STUFF TO SEE NOTIFICATIONS #####
  66. //
  67. //psql notify_listen_test -c "INSERT INTO products(name, category_id) VALUES ('coffee cup', 1), ('pencil', 2), ('shoe', 3)"
  68. //psql notify_listen_test -c "INSERT INTO orders(product_id, quantity) VALUES (3, 5), (1, 4), (2, 5)"
  69. //psql notify_listen_test -c "UPDATE orders SET quantity = 10 WHERE id = 3"
  70. //psql notify_listen_test -c "DELETE FROM orders WHERE id = 3"
  71. //
  72. //##### AFTER PASTING ALL THE SCALA CODE BELOW INTO THE SPARK-SHELL SESSION, START DOING DB STUFF TO SEE NOTIFICATIONS #####
  73. //
  74. //##### IF YOU WANT TO CLEAN UP THE PRODUCTS AND ORDERS TABLES #####
  75. //psql notify_listen_test -c 'TRUNCATE TABLE products RESTART IDENTITY CASCADE'
  76. //##### IF YOU WANT TO CLEAN UP THE PRODUCTS AND ORDERS TABLES #####
  77. //
  78. //##### IF YOU WANT TO REMOVE THE DB AND START OVER (must kill all connections first) #####
  79. //dropdb notify_listen_test
  80. //##### IF YOU WANT TO REMOVE THE DB AND START OVER (must kill all connections first) #####
  81.  
  82.  
  83. class Poller(DBUri: String, properties: java.util.Properties, channel: String, pollingInterval: Int) extends akka.actor.Actor {
  84. import java.sql.{Driver, DriverManager}
  85.  
  86. import org.postgresql.{PGNotification, PGConnection}
  87. import scalikejdbc._
  88. import scala.concurrent.duration._
  89.  
  90. // execution context for the ticks
  91. import context.dispatcher
  92.  
  93. val driver = "org.postgresql.Driver"
  94. Class.forName(driver)
  95. val connectionDriver = Class.forName(driver).newInstance().asInstanceOf[Driver]
  96. DriverManager.registerDriver(connectionDriver)
  97. val connection = DriverManager.getConnection(DBUri, properties)
  98. val pollingIntervalDuration = Duration(pollingInterval, "millis")
  99.  
  100. val db: DB = DB(connection)
  101. val tick = context.system.scheduler.schedule(500 millis, pollingIntervalDuration, self, "tick")
  102.  
  103. override def preStart() = {
  104. // make sure connection isn't closed when executing queries
  105. db.autoClose(false)
  106. db.localTx { implicit session =>
  107. SQL(s"LISTEN $channel").execute().apply()
  108. }
  109. }
  110.  
  111. override def postStop() = {
  112. tick.cancel()
  113. db.close()
  114. }
  115.  
  116. def receive = {
  117. case "tick" =>
  118. db.readOnly { implicit session =>
  119. val pgConnection = connection.unwrap(Class.forName("org.postgresql.PGConnection")).asInstanceOf[PGConnection]
  120. val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
  121.  
  122. notifications.foreach( not => {
  123. println(s"Received for: ${not.getName} from process with PID: ${not.getPID}")
  124. println(s"Received data: ${not.getParameter} ")
  125. }
  126. )
  127. }
  128. }
  129. }
  130.  
  131. import java.util.Properties
  132. import akka.actor.{Props, ActorSystem, Actor}
  133.  
  134. object PostgresNotifications extends App {
  135. // initialize the actor system
  136. val system = ActorSystem("Hello")
  137. // val a = system.actorOf(Props[Poller], "poller")
  138.  
  139. val DBUri = "jdbc:postgresql://localhost:5432/notify_listen_test"
  140. val properties = new Properties
  141.  
  142. system.actorOf(Props(new Poller(DBUri, properties, "products", 1000)), "products_poller")
  143. system.actorOf(Props(new Poller(DBUri, properties, "orders", 1000)), "orders_poller")
  144.  
  145. // wait for the user to stop the server
  146. println("Press <enter> to exit.")
  147. Console.in.read.toChar
  148. system.terminate
  149. }
  150.  
  151. PostgresNotifications.main(Array.empty[String])
Add Comment
Please, Sign In to add comment