Advertisement
Guest User

Untitled

a guest
Aug 25th, 2019
205
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.37 KB | None | 0 0
  1. public static void main(String as[]) throws Exception {
  2.  
  3. String db = "identity_management_test";
  4. String slotName = db+"slot";
  5. // String wal2json = "wal2json";
  6. String wal2json = "test_decoding";
  7. LogSequenceNumber lastLSNNumber = null;//LogSequenceNumber.valueOf("EF/B79FB8");
  8. //bm_slot_ndo_ft_pune_paso_slm
  9. String url = "jdbc:postgresql://10.105.32.s:s/" + db;
  10. Properties props = new Properties();
  11. PGProperty.USER.set(props, "postgres");
  12. PGProperty.PASSWORD.set(props, "postgres");
  13. PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "10");
  14. PGProperty.REPLICATION.set(props, "database");
  15. PGProperty.PREFER_QUERY_MODE.set(props, "simple");
  16. Connection slotVerifyConnection = null;
  17. Connection slotConnection = null;
  18. try {
  19. //TODO connection pool
  20. slotVerifyConnection = connect(url, "postgres", "postgres");
  21. slotConnection = DriverManager.getConnection(url, props);
  22. Statement statement = slotVerifyConnection.createStatement();
  23. ResultSet result = statement.executeQuery("select confirmed_flush_lsn,active,active_pid from pg_replication_slots where slot_type = 'logical' and slot_name ='" + slotName + "'");
  24. while (result.next()) {
  25.  
  26. lastLSNNumber = LogSequenceNumber.valueOf(result.getString(1));
  27.  
  28. }
  29. if(lastLSNNumber==null){
  30. statement.executeQuery("select * from pg_create_logical_replication_slot('"+slotName+"', 'test_decoding');");
  31. }
  32.  
  33. result = statement.executeQuery("select confirmed_flush_lsn,active,active_pid from pg_replication_slots where slot_type = 'logical' and slot_name ='" + slotName + "'");
  34. while (result.next()) {
  35.  
  36. lastLSNNumber = LogSequenceNumber.valueOf(result.getString(1));
  37.  
  38. }
  39. result.close();
  40. statement.close();
  41. Connection con = DriverManager.getConnection(url, props);
  42. PGConnection replConnection = con.unwrap(PGConnection.class);
  43.  
  44. PGReplicationStream stream = null;
  45.  
  46.  
  47. if (lastLSNNumber != null) {
  48. stream =
  49. replConnection.getReplicationAPI()
  50. .replicationStream()
  51. .logical()
  52. .withSlotName(slotName)
  53. .withSlotOption("include-xids", true)
  54. .withSlotOption("skip-empty-xacts", true)
  55. .withStartPosition(lastLSNNumber)
  56.  
  57. // .withStatusInterval(20, TimeUnit.SECONDS)
  58. .withStatusInterval(Integer.parseInt("5"), TimeUnit.SECONDS).start();
  59. } else {
  60.  
  61. replConnection.getReplicationAPI()
  62. .createReplicationSlot()
  63. .logical()
  64. .withSlotName(slotName)
  65. .withOutputPlugin(wal2json)
  66. .make();
  67. stream =
  68. replConnection.getReplicationAPI()
  69. .replicationStream()
  70. .logical()
  71. .withSlotName(slotName)
  72. .withSlotOption("include-xids", false)
  73. .withSlotOption("skip-empty-xacts", true)
  74. .withStatusInterval(20, TimeUnit.SECONDS)
  75. .start();
  76. }
  77. while (true) {
  78. //non blocking receive message
  79. ByteBuffer msg = stream.read();
  80.  
  81. if (msg == null) {
  82. TimeUnit.MILLISECONDS.sleep(10L);
  83. continue;
  84. }
  85.  
  86. int offset = msg.arrayOffset();
  87. byte[] source = msg.array();
  88. int length = source.length - offset;
  89. System.out.println(new String(source, offset, length));
  90. String sql=new String(source, offset, length);
  91. //QueryUtils.getQueries(sql);
  92.  
  93. stream.setAppliedLSN(stream.getLastReceiveLSN());
  94. stream.setFlushedLSN(stream.getLastReceiveLSN());
  95. stream.forceUpdateStatus();
  96. }
  97.  
  98. } catch (Exception ex) {
  99. ex.printStackTrace();
  100. }
  101. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement