Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @RunWith(SpringRunner.class)
- @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
- @DirtiesContext
- public abstract class AggregatorProcessorTests {
- @Autowired
- protected Processor channels;
- @Autowired
- protected MessageCollector collector;
- @TestPropertySource(properties = {
- "spring.profiles.active=aggregator",
- "aggregator.header.enricher.jdbc.sql=SELECT " +
- "d.iot_service_code AS IOT_SERVICE_CODE," +
- "d.id AS DEVICE_ID," +
- "CASE d.device_type " +
- "WHEN 1 " +
- "THEN CONCAT(d.iot_service_code, '-', d.ipaddress) " +
- "ELSE s.phone_number END AS SIM_ID," +
- "o.storage_interval AS STORAGE_INTERVAL," +
- "o.output_format_flag AS OUTPUT_FORMAT_FLAG " +
- "FROM device AS d " +
- "INNER JOIN ios AS o ON d.iot_service_code = o.iot_service_code " +
- "LEFT JOIN sim AS s ON d.id = s.device_id " +
- "WHERE d.storage_flag IS TRUE AND o.bucket IS NOT NULL",
- "aggregator.header.enricher.jdbc.headers=storageInterval,iotServiceCode,outputFormatFlag,simId",
- "aggregator.header.enricher.jdbc.sqlKeyProperty=deviceId",
- "aggregator.timestamp.id-expression=headers.iotServiceCode",
- "aggregator.timestamp.interval=SECONDS",
- "aggregator.timestamp.header-key=storageInterval",
- "aggregator.message-store-type=simple",
- "aggregator.keyExpression=payload.id",
- "aggregator.idle-timeout=100000",
- "spring.datasource.driver-class-name=org.mariadb.jdbc.Driver",
- "spring.datasource.url=jdbc:mysql://localhost/some_db",
- "spring.datasource.username=vagrant",
- "spring.datasource.password=vagrant"
- })
- public static class AggregatorTests extends AggregatorProcessorTests {
- @Test
- public void passThroughTest() throws Exception {
- String data5 = " {\n" +
- " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
- " \"serviceCode\":\"iot00000001\",\n" +
- " \"timestamp\":1473215677644,\n" +
- " \"id\":\"37FF48A93166\", \n" +
- " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
- "}";
- String data4 = " {\n" +
- " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
- " \"serviceCode\":\"iot00000001\",\n" +
- " \"timestamp\":1473215677644,\n" +
- " \"id\":\"37FF48A93166\", \n" +
- " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
- "}";
- String data6 = " {\n" +
- " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
- " \"serviceCode\":\"iot00000001\",\n" +
- " \"timestamp\":1473215677645,\n" +
- " \"id\":\"086674D10A39\", \n" +
- " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
- "}";
- String data2 = " {\n" +
- " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
- " \"serviceCode\":\"iot00000001\",\n" +
- " \"timestamp\":1473215677645,\n" +
- " \"id\":\"5CC5C5CD9798\", \n" +
- " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
- "}";
- String data3 = " {\n" +
- " \"payload\": {\"name\":\"Sensor ABC\",\"value\":10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000.3, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n"
- +
- " \"serviceCode\":\"iot00000001\",\n" +
- " \"timestamp\":1473215677646,\n" +
- " \"id\":\"B81D0F20A821\", \n" +
- " \"headers\":[{\"X-HEADER\":\"BAR\"},{\"X-Auth-Token\":\"ba2be94fc3ccdb93b983b57171fb75e9\"},{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n"
- +
- "}";
- String data1 = " {\n" +
- " \"payload\": {\"name\":\"Sensor ABC\",\"value\":20, \"map\":{\"a\":1, \"b\":\"foo\"}, \"list\":[\"a\",\"b\",\"c\"]},\n" +
- " \"serviceCode\":\"iot00000001\",\n" +
- " \"timestamp\":1473215677646,\n" +
- " \"id\":\"B81D0F20A821\", \n" +
- " \"headers\":[{\"Content-Type\":\"application/json;charset=UTF-8\"},{\"Content-Length\":100},{\"User-Agent\":\"curl-xxxx\"}]\n" +
- "}";
- Thread t = new Thread(() -> {
- HashMap<String, Object> headers = new HashMap();
- headers.put("kafka_offset", 1L);
- MessageHeaders messageHeaders = new MessageHeaders(headers);
- GenericMessage message = new GenericMessage<>(data1, messageHeaders);
- channels.input()
- .send(message);
- headers = new HashMap();
- headers.put("kafka_offset", 2L);
- messageHeaders = new MessageHeaders(headers);
- message = new GenericMessage<>(data2, messageHeaders);
- channels.input()
- .send(message);
- headers = new HashMap();
- headers.put("kafka_offset", 3L);
- messageHeaders = new MessageHeaders(headers);
- message = new GenericMessage<>(data3, messageHeaders);
- channels.input()
- .send(message);
- headers = new HashMap();
- headers.put("kafka_offset", 4L);
- messageHeaders = new MessageHeaders(headers);
- message = new GenericMessage<>(data4, messageHeaders);
- channels.input()
- .send(message);
- headers = new HashMap();
- headers.put("kafka_offset", 5L);
- messageHeaders = new MessageHeaders(headers);
- message = new GenericMessage<>(data5, messageHeaders);
- channels.input()
- .send(message);
- headers = new HashMap();
- headers.put("kafka_offset", 6L);
- messageHeaders = new MessageHeaders(headers);
- message = new GenericMessage<>(data6, messageHeaders);
- channels.input()
- .send(message);
- });
- t.start();
- BlockingQueue<Message<?>> blockingQueue = this.collector.forChannel(this.channels.output());
- StopWatch stopWatch1 = new StopWatch();
- StopWatch stopWatch2 = new StopWatch();
- stopWatch1.start();
- stopWatch2.start();
- Message<?> message1 = blockingQueue.take();
- stopWatch1.stop();
- System.out.println(stopWatch1.getTotalTimeMillis());
- assertThat(message1, notNullValue());
- assertThat(message1.getPayload(), instanceOf(List.class));
- @SuppressWarnings("unchecked")
- List result = (List) message1.getPayload();
- Assert.assertEquals(1, result.size());
- Message<?> message2 = blockingQueue.take();
- stopWatch2.stop();
- System.out.println(stopWatch2.getTotalTimeMillis());
- assertThat(message2, notNullValue());
- assertThat(message2.getPayload(), instanceOf(List.class));
- @SuppressWarnings("unchecked")
- List result2 = (List) message2.getPayload();
- Assert.assertEquals(5, result2.size());
- }
- }
- }
Add Comment
Please, Sign In to add comment