Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ConfigFileAuthenticationDetailsProvider provider = new ConfigFileAuthenticationDetailsProvider('/path/to/.oci/config', 'DEFAULT')
- StreamClient client = new StreamClient(provider)
- AtomicBoolean closed = new AtomicBoolean(false)
- CreateGroupCursorDetails cursorDetails = CreateGroupCursorDetails.builder(
- .type(CreateGroupCursorDetails.Type.TrimHorizon)
- .commitOnGet(true)
- .groupName(this.groupName)
- .build()
- CreateGroupCursorRequest groupCursorRequest = CreateGroupCursorRequest.builder()
- .streamId(streamId)
- .createGroupCursorDetails(cursorDetails)
- .build()
- CreateGroupCursorResponse cursorResponse = this.client.createGroupCursor(groupCursorRequest)
- GetMessagesRequest getRequest = GetMessagesRequest.builder()
- .cursor(cursorResponse.cursor.value)
- .streamId(this.streamId)
- .build()
- while(!closed.get()) {
- def getResult = this.client.getMessages(getRequest)
- getResult.items.each { Message record ->
- def msg = new String(record.value, "UTF-8")
- }
- getRequest.cursor = getResult.opcNextCursor
- sleep(500)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement