Guest User

Untitled

a guest
Jul 10th, 2018
156
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 8.44 KB | None | 0 0
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
  3. @DirtiesContext
  4. public abstract class AggregatorProcessorTests {
  5.  
  6. @Autowired
  7. protected Processor channels;
  8.  
  9. @Autowired
  10. protected MessageCollector collector;
  11.  
  12. @TestPropertySource(properties = {
  13. "spring.profiles.active=aggregator",
  14. "aggregator.header.enricher.jdbc.sql=SELECT " +
  15. "d.iot_service_code AS IOT_SERVICE_CODE," +
  16. "d.id AS DEVICE_ID," +
  17. "CASE d.device_type " +
  18. "WHEN 1 " +
  19. "THEN CONCAT(d.iot_service_code, '-', d.ipaddress) " +
  20. "ELSE s.phone_number END AS SIM_ID," +
  21. "o.storage_interval AS STORAGE_INTERVAL," +
  22. "o.output_format_flag AS OUTPUT_FORMAT_FLAG " +
  23. "FROM device AS d " +
  24. "INNER JOIN ios AS o ON d.iot_service_code = o.iot_service_code " +
  25. "LEFT JOIN sim AS s ON d.id = s.device_id " +
  26. "WHERE d.storage_flag IS TRUE AND o.bucket IS NOT NULL",
  27. "aggregator.header.enricher.jdbc.headers=storageInterval,iotServiceCode,outputFormatFlag,simId",
  28. "aggregator.header.enricher.jdbc.sqlKeyProperty=deviceId",
  29. "aggregator.timestamp.id-expression=headers.iotServiceCode",
  30. "aggregator.timestamp.interval=SECONDS",
  31. "aggregator.timestamp.header-key=storageInterval",
  32. "aggregator.message-store-type=simple",
  33. "aggregator.keyExpression=payload.id",
  34. "aggregator.idle-timeout=100000",
  35. "spring.datasource.driver-class-name=org.mariadb.jdbc.Driver",
  36. "spring.datasource.url=jdbc:mysql://localhost/some_db",
  37. "spring.datasource.username=vagrant",
  38. "spring.datasource.password=vagrant"
  39. })
  40. public static class AggregatorTests extends AggregatorProcessorTests {
  41.  
  42. @Test
  43. public void passThroughTest() throws Exception {
  44. String data5 = " {\n" +
  45. " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
  46. " \"serviceCode\":\"iot00000001\",\n" +
  47. " \"timestamp\":1473215677644,\n" +
  48. " \"id\":\"37FF48A93166\", \n" +
  49. " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
  50. "}";
  51. String data4 = " {\n" +
  52. " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
  53. " \"serviceCode\":\"iot00000001\",\n" +
  54. " \"timestamp\":1473215677644,\n" +
  55. " \"id\":\"37FF48A93166\", \n" +
  56. " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
  57. "}";
  58. String data6 = " {\n" +
  59. " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
  60. " \"serviceCode\":\"iot00000001\",\n" +
  61. " \"timestamp\":1473215677645,\n" +
  62. " \"id\":\"086674D10A39\", \n" +
  63. " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
  64. "}";
  65. String data2 = " {\n" +
  66. " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
  67. " \"serviceCode\":\"iot00000001\",\n" +
  68. " \"timestamp\":1473215677645,\n" +
  69. " \"id\":\"5CC5C5CD9798\", \n" +
  70. " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
  71. "}";
  72.  
  73. String data3 = " {\n" +
  74. " \"payload\": {\"name\":\"Sensor ABC\",\"value\":10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.3, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n"
  75. +
  76. " \"serviceCode\":\"iot00000001\",\n" +
  77. " \"timestamp\":1473215677646,\n" +
  78. " \"id\":\"B81D0F20A821\", \n" +
  79. " \"headers\":[{\"X-HEADER\":\"BAR\"},{\"X-Auth-Token\":\"ba2be94fc3ccdb93b983b57171fb75e9\"},{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n"
  80. +
  81. "}";
  82.  
  83. String data1 = " {\n" +
  84. " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
  85. " \"serviceCode\":\"iot00000001\",\n" +
  86. " \"timestamp\":1473215677646,\n" +
  87. " \"id\":\"B81D0F20A821\", \n" +
  88. " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
  89. "}";
  90.  
  91. Thread t = new Thread(() -> {
  92. HashMap<String, Object> headers = new HashMap();
  93. headers.put("kafka_offset", 1L);
  94. MessageHeaders messageHeaders = new MessageHeaders(headers);
  95. GenericMessage message = new GenericMessage<>(data1, messageHeaders);
  96. channels.input()
  97. .send(message);
  98.  
  99. headers = new HashMap();
  100. headers.put("kafka_offset", 2L);
  101. messageHeaders = new MessageHeaders(headers);
  102. message = new GenericMessage<>(data2, messageHeaders);
  103. channels.input()
  104. .send(message);
  105.  
  106. headers = new HashMap();
  107. headers.put("kafka_offset", 3L);
  108. messageHeaders = new MessageHeaders(headers);
  109. message = new GenericMessage<>(data3, messageHeaders);
  110. channels.input()
  111. .send(message);
  112.  
  113. headers = new HashMap();
  114. headers.put("kafka_offset", 4L);
  115. messageHeaders = new MessageHeaders(headers);
  116. message = new GenericMessage<>(data4, messageHeaders);
  117. channels.input()
  118. .send(message);
  119.  
  120. headers = new HashMap();
  121. headers.put("kafka_offset", 5L);
  122. messageHeaders = new MessageHeaders(headers);
  123. message = new GenericMessage<>(data5, messageHeaders);
  124. channels.input()
  125. .send(message);
  126.  
  127. headers = new HashMap();
  128. headers.put("kafka_offset", 6L);
  129. messageHeaders = new MessageHeaders(headers);
  130. message = new GenericMessage<>(data6, messageHeaders);
  131. channels.input()
  132. .send(message);
  133. });
  134. t.start();
  135. BlockingQueue<Message<?>> blockingQueue = this.collector.forChannel(this.channels.output());
  136. StopWatch stopWatch1 = new StopWatch();
  137. StopWatch stopWatch2 = new StopWatch();
  138. stopWatch1.start();
  139. stopWatch2.start();
  140.  
  141. Message<?> message1 = blockingQueue.take();
  142. stopWatch1.stop();
  143. System.out.println(stopWatch1.getTotalTimeMillis());
  144. assertThat(message1, notNullValue());
  145. assertThat(message1.getPayload(), instanceOf(List.class));
  146. @SuppressWarnings("unchecked")
  147. List result = (List) message1.getPayload();
  148. Assert.assertEquals(1, result.size());
  149. Message<?> message2 = blockingQueue.take();
  150. stopWatch2.stop();
  151. System.out.println(stopWatch2.getTotalTimeMillis());
  152. assertThat(message2, notNullValue());
  153. assertThat(message2.getPayload(), instanceOf(List.class));
  154. @SuppressWarnings("unchecked")
  155. List result2 = (List) message2.getPayload();
  156. Assert.assertEquals(5, result2.size());
  157. }
  158. }
  159. }
Add Comment
Please, Sign In to add comment