Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def "step-read and flush every 5 elems with manual decoding"() {
- given: "a TcpServer and a TcpClient"
- def latch = new CountDownLatch(10)
- def server = NetStreams.tcpServer(port)
- def client = NetStreams.tcpClient("localhost", port)
- def codec = new JsonCodec<Pojo, Pojo>(Pojo)
- when: "the client/server are prepared"
- server.pipeline { input ->
- //bounded 5 in-flight data and flush every 5 elements (capacity(5l)
- //the following pipeline decode json, log, re-encode and echo any passed data
- input
- .decode(codec)
- .log('serve')
- .map(codec)
- .capacity(5l)
- }
- client.pipeline { input ->
- //unbounded client receiver
- input
- .decode(codec)
- .log('receive')
- .consume { latch.countDown() }
- //flush every 10 elements with capacity(10l)
- Streams.range(1, 10)
- .map { new Pojo(name: 'test' + it) }
- .log('send')
- .map(codec)
- .capacity(10l)
- }
- then: "the client/server were started"
- server?.start()?.flatMap { client.open() }?.awaitSuccess(5, TimeUnit.SECONDS)
- latch.await(10, TimeUnit.SECONDS)
- cleanup: "the client/server where stopped"
- client?.close()?.flatMap { server.shutdown() }?.awaitSuccess(5, TimeUnit.SECONDS)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement