Advertisement
Guest User

Untitled

a guest
Mar 31st, 2015
178
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.23 KB | None | 0 0
  1. def "step-read and flush every 5 elems with manual decoding"() {
  2. given: "a TcpServer and a TcpClient"
  3. def latch = new CountDownLatch(10)
  4.  
  5. def server = NetStreams.tcpServer(port)
  6. def client = NetStreams.tcpClient("localhost", port)
  7. def codec = new JsonCodec<Pojo, Pojo>(Pojo)
  8.  
  9. when: "the client/server are prepared"
  10. server.pipeline { input ->
  11. //bounded 5 in-flight data and flush every 5 elements (capacity(5l)
  12. //the following pipeline decode json, log, re-encode and echo any passed data
  13. input
  14. .decode(codec)
  15. .log('serve')
  16. .map(codec)
  17. .capacity(5l)
  18. }
  19.  
  20. client.pipeline { input ->
  21. //unbounded client receiver
  22. input
  23. .decode(codec)
  24. .log('receive')
  25. .consume { latch.countDown() }
  26.  
  27. //flush every 10 elements with capacity(10l)
  28. Streams.range(1, 10)
  29. .map { new Pojo(name: 'test' + it) }
  30. .log('send')
  31. .map(codec)
  32. .capacity(10l)
  33. }
  34.  
  35. then: "the client/server were started"
  36. server?.start()?.flatMap { client.open() }?.awaitSuccess(5, TimeUnit.SECONDS)
  37. latch.await(10, TimeUnit.SECONDS)
  38.  
  39.  
  40. cleanup: "the client/server where stopped"
  41. client?.close()?.flatMap { server.shutdown() }?.awaitSuccess(5, TimeUnit.SECONDS)
  42. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement