Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static void main(String as[]) throws Exception {
- String db = "identity_management_test";
- String slotName = db+"slot";
- // String wal2json = "wal2json";
- String wal2json = "test_decoding";
- LogSequenceNumber lastLSNNumber = null;//LogSequenceNumber.valueOf("EF/B79FB8");
- //bm_slot_ndo_ft_pune_paso_slm
- String url = "jdbc:postgresql://10.105.32.s:s/" + db;
- Properties props = new Properties();
- PGProperty.USER.set(props, "postgres");
- PGProperty.PASSWORD.set(props, "postgres");
- PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "10");
- PGProperty.REPLICATION.set(props, "database");
- PGProperty.PREFER_QUERY_MODE.set(props, "simple");
- Connection slotVerifyConnection = null;
- Connection slotConnection = null;
- try {
- //TODO connection pool
- slotVerifyConnection = connect(url, "postgres", "postgres");
- slotConnection = DriverManager.getConnection(url, props);
- Statement statement = slotVerifyConnection.createStatement();
- ResultSet result = statement.executeQuery("select confirmed_flush_lsn,active,active_pid from pg_replication_slots where slot_type = 'logical' and slot_name ='" + slotName + "'");
- while (result.next()) {
- lastLSNNumber = LogSequenceNumber.valueOf(result.getString(1));
- }
- if(lastLSNNumber==null){
- statement.executeQuery("select * from pg_create_logical_replication_slot('"+slotName+"', 'test_decoding');");
- }
- result = statement.executeQuery("select confirmed_flush_lsn,active,active_pid from pg_replication_slots where slot_type = 'logical' and slot_name ='" + slotName + "'");
- while (result.next()) {
- lastLSNNumber = LogSequenceNumber.valueOf(result.getString(1));
- }
- result.close();
- statement.close();
- Connection con = DriverManager.getConnection(url, props);
- PGConnection replConnection = con.unwrap(PGConnection.class);
- PGReplicationStream stream = null;
- if (lastLSNNumber != null) {
- stream =
- replConnection.getReplicationAPI()
- .replicationStream()
- .logical()
- .withSlotName(slotName)
- .withSlotOption("include-xids", true)
- .withSlotOption("skip-empty-xacts", true)
- .withStartPosition(lastLSNNumber)
- // .withStatusInterval(20, TimeUnit.SECONDS)
- .withStatusInterval(Integer.parseInt("5"), TimeUnit.SECONDS).start();
- } else {
- replConnection.getReplicationAPI()
- .createReplicationSlot()
- .logical()
- .withSlotName(slotName)
- .withOutputPlugin(wal2json)
- .make();
- stream =
- replConnection.getReplicationAPI()
- .replicationStream()
- .logical()
- .withSlotName(slotName)
- .withSlotOption("include-xids", false)
- .withSlotOption("skip-empty-xacts", true)
- .withStatusInterval(20, TimeUnit.SECONDS)
- .start();
- }
- while (true) {
- //non blocking receive message
- ByteBuffer msg = stream.read();
- if (msg == null) {
- TimeUnit.MILLISECONDS.sleep(10L);
- continue;
- }
- int offset = msg.arrayOffset();
- byte[] source = msg.array();
- int length = source.length - offset;
- System.out.println(new String(source, offset, length));
- String sql=new String(source, offset, length);
- //QueryUtils.getQueries(sql);
- stream.setAppliedLSN(stream.getLastReceiveLSN());
- stream.setFlushedLSN(stream.getLastReceiveLSN());
- stream.forceUpdateStatus();
- }
- } catch (Exception ex) {
- ex.printStackTrace();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement