Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleWindowTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleWindowTestTask.java
- new file mode 100644
- index 0000000..4e80ba2
- --- /dev/null
- +++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleWindowTestTask.java
- @@ -0,0 +1,188 @@
- +package org.apache.samza.test.integration;
- +
- +import java.util.HashMap;
- +import java.util.Map;
- +import java.util.Set;
- +import java.util.Iterator;
- +import java.io.StringWriter;
- +// import org.json.simple.JSONValue;
- +import org.codehaus.jackson.JsonGenerator;
- +import kafka.producer.KeyedMessage;
- +import org.codehaus.jackson.JsonNode;
- +import org.codehaus.jackson.JsonParseException;
- +import org.apache.samza.config.Config;
- +import org.apache.samza.system.IncomingMessageEnvelope;
- +import org.apache.samza.system.OutgoingMessageEnvelope;
- +import org.apache.samza.system.SystemStream;
- +import org.apache.samza.task.MessageCollector;
- +import org.apache.samza.task.StreamTask;
- +import org.apache.samza.task.TaskCoordinator;
- +import org.apache.samza.task.TaskContext;
- +import org.codehaus.jackson.map.JsonMappingException;
- +import org.codehaus.jackson.map.ObjectMapper;
- +
- +import org.apache.samza.storage.kv.KeyValueStore;
- +import org.apache.samza.storage.kv.KeyValueIterator;
- +import org.apache.samza.storage.kv.Entry;
- +import org.apache.samza.task.InitableTask;
- +import org.apache.samza.task.WindowableTask;
- +import org.slf4j.Logger;
- +import org.slf4j.LoggerFactory;
- +import org.codehaus.jackson.JsonParser;
- +
- +// import redis.clients.jedis.Jedis;
- +
- +//import com.google.gson.Gson;
- +import java.io.FileNotFoundException;
- +import java.io.IOException;
- +import java.io.FileInputStream;
- +import java.util.Properties;
- +import java.util.Set;
- +//import java.util.Map;
- +
- +
- +public class SimpleWindowTestTask implements StreamTask, InitableTask, WindowableTask {
- +
- + private KeyValueStore<String, String> store;
- + Config config;
- + TaskContext context;
- + String val;
- + String outTopic;
- + HashMap propVals;
- + String useProp;
- + private int eventsSeen = 0;
- + private String appName;
- + public SystemStream OUTPUT_STREAM;
- + // private Jedis jedis;
- + // private String redis;
- + private HashMap hm = new HashMap();
- + Map<String, String> appMap = new HashMap();
- +
- +
- + private static final Logger log = LoggerFactory.getLogger(SimpleWindowTestTask.class);
- +
- + /* Constructor */
- +
- +
- + public HashMap readPropertyFile() {
- + log.info("Reading the property file ...");
- + Properties prop = new Properties();
- + FileInputStream input;
- + HashMap<String, String> propvals = new HashMap<String, String>();
- + try {
- + //adjust this to that of remote server
- + input = new FileInputStream("./__package/config/abc.properties");
- + prop.load(input);
- + log.info("Property file loaded succesfully.");
- + Set<String> propertyNames = prop.stringPropertyNames();
- + log.debug("Property file content: ");
- + for (String Property : propertyNames) {
- + log.debug(Property + ":" + prop.getProperty(Property));
- + propvals.put(Property, prop.getProperty(Property));
- + }
- + log.debug("HashMap generated: " + propvals);
- + } catch (FileNotFoundException e) {
- + log.warn(e.getMessage());
- + e.printStackTrace();
- + } catch (IOException e) {
- + log.warn(e.getMessage());
- + e.printStackTrace();
- + } catch (Exception e) {
- + log.warn(e.getMessage());
- + e.printStackTrace();
- + }
- + return propvals;
- + }
- +
- + public void init(Config config, TaskContext context) {
- + log.info("INIT SimpleWindowTestTask");
- + this.outTopic = config.get("task.outputs");
- + this.store = (KeyValueStore<String, String>) context.getStore("store");
- + OUTPUT_STREAM = new SystemStream("kafka", outTopic);
- + this.useProp = config.get("load.property");
- + //populate the propVals from Property File else pass in an empty HashMap
- + if (Boolean.parseBoolean(useProp)) propVals = readPropertyFile();
- + else propVals = new HashMap<String, String>();
- + /**
- + log.info("Connecting to redis at " + redis + "...");
- + jedis = new Jedis(redis);
- + try{
- + jedis.connect();
- + }catch (Exception e){
- + log.warn ("Unable to connect to redis. Try checking with service-now directly (not implemented yet)");
- + e.printStackTrace();
- + }
- + log.info("Connected to redis");
- + **/
- + }
- + @SuppressWarnings("unchecked")
- + @Override
- + //public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException, JsonParseException, JsonMappingException, IOException {
- + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException {
- +
- + ObjectMapper mapper = new ObjectMapper();
- + log.info("Envelope Key : " + envelope.getKey());
- + mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
- + JsonNode node;
- + try {
- + //log.info("Raw message: " + envelope.toString());
- + node = mapper.readValue(envelope.getMessage().toString(), JsonNode.class);
- + //log.info("Raw json: " + node.get("event").get("application_name").toString());
- + //String appName=node.findValues("application_name").get(0).toString();
- + String appName=node.get("event").get("application_name").toString();
- +
- + try{
- + eventsSeen=Integer.parseInt(store.get(appName));
- +
- + // eventsSeen=Integer.parseInt(store.get(appName));
- + // eventsSeen=Integer.parseInt(appMap.get(uniqueKey));
- + eventsSeen++;
- + log.info("Application Name: " + appName + "eventSeen " + eventsSeen);
- + /**
- + try{
- + //hm.put(appName,eventsSeen);
- + appMap.put(uniqueKey , eventsSeen + "");
- + }catch (Exception e){
- + log.info("Error while adding elements to locah Hash " + e.getMessage());
- + e.printStackTrace();
- + }
- + **/
- + try{
- + store.put(appName, String.valueOf(eventsSeen)); // Needs to be (string,string)
- + log.info("Added new value " + store.get(appName));
- + }catch (Exception e){
- + log.info("No value found for application in kv store: " + e.getMessage());
- + e.printStackTrace();
- + }
- + }catch (Exception e){
- + log.warn("No value found for application in kv store: " + e.getMessage());
- + // appMap.put(uniqueKey, "0");
- + store.put(appName, "1");
- + //e.printStackTrace();
- + }
- +
- + } catch (JsonParseException e1) {
- + // TODO Auto-generated catch block
- + e1.printStackTrace();
- + } catch (JsonMappingException e1) {
- + // TODO Auto-generated catch block
- + e1.printStackTrace();
- + } catch (IOException e1) {
- + // TODO Auto-generated catch block
- + e1.printStackTrace();
- + }
- + }
- +
- + public void window(MessageCollector collector,
- + TaskCoordinator coordinator) {
- + KeyValueIterator<String, String> iter = store.all();
- + while(iter.hasNext()) {
- + Entry<String, String> entry = iter.next();
- + log.info(String.format("Iterating through store entry: <%s, %s>", entry.getKey(), entry.getValue()));
- + store.delete(entry.getKey());
- + }
- + iter.close();
- + appMap.clear();
- + eventsSeen = 0;
- + }
- +}
- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestSimpleWindowTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestSimpleWindowTask.scala
- new file mode 100644
- index 0000000..5d7f804
- --- /dev/null
- +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestSimpleWindowTask.scala
- @@ -0,0 +1,395 @@
- +/*
- + * Licensed to the Apache Software Foundation (ASF) under one
- + * or more contributor license agreements. See the NOTICE file
- + * distributed with this work for additional information
- + * regarding copyright ownership. The ASF licenses this file
- + * to you under the Apache License, Version 2.0 (the
- + * "License"); you may not use this file except in compliance
- + * with the License. You may obtain a copy of the License at
- + *
- + * http://www.apache.org/licenses/LICENSE-2.0
- + *
- + * Unless required by applicable law or agreed to in writing,
- + * software distributed under the License is distributed on an
- + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- + * KIND, either express or implied. See the License for the
- + * specific language governing permissions and limitations
- + * under the License.
- + */
- +
- +package org.apache.samza.test.integration
- +
- +import java.util.Properties
- +import java.util.concurrent.CountDownLatch
- +import java.util.concurrent.TimeUnit
- +import java.util
- +
- +import kafka.admin.AdminUtils
- +import kafka.consumer.Consumer
- +import kafka.consumer.ConsumerConfig
- +import kafka.message.MessageAndMetadata
- +import kafka.server.KafkaConfig
- +import kafka.server.KafkaServer
- +import kafka.utils.TestUtils
- +import kafka.utils.TestZKUtils
- +import kafka.utils.Utils
- +import kafka.utils.ZKStringSerializer
- +import kafka.zk.EmbeddedZookeeper
- +import org.I0Itec.zkclient.ZkClient
- +import org.apache.samza.Partition
- +import org.apache.samza.checkpoint.Checkpoint
- +import org.apache.samza.config.Config
- +import org.apache.samza.job.local.ThreadJobFactory
- +import org.apache.samza.config.MapConfig
- +import org.apache.samza.container.TaskName
- +import org.apache.samza.job.ApplicationStatus
- +import org.apache.samza.job.StreamJob
- +import org.apache.samza.storage.kv.KeyValueStore
- +import org.apache.samza.system.kafka.TopicMetadataCache
- +import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
- +import org.apache.samza.config.KafkaProducerConfig
- +import org.apache.samza.task.InitableTask
- +import org.apache.samza.task.MessageCollector
- +import org.apache.samza.task.StreamTask
- +import org.apache.samza.task.TaskContext
- +import org.apache.samza.task.TaskCoordinator
- +import org.apache.samza.task.TaskCoordinator.RequestScope
- +import org.apache.samza.util.ClientUtilTopicMetadataStore
- +import org.apache.samza.util.TopicMetadataStore
- +import org.apache.samza.job.JobRunner
- +import org.junit.Assert._
- +import org.junit.{BeforeClass, AfterClass, Test}
- +import scala.collection.JavaConversions._
- +import scala.collection.mutable.ArrayBuffer
- +import scala.collection.mutable.HashMap
- +import scala.collection.mutable.SynchronizedMap
- +import org.apache.samza.job.JobRunner
- +import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
- +import org.apache.samza.util.KafkaUtil
- +
- +object TestSimpleWindowTask {
- + val INPUT_TOPIC = "input"
- + val STORE_NAME = "store"
- + val STATE_TOPIC_STREAM = "mystoreChangelog"
- + val TOTAL_TASK_NAMES = 1
- + val REPLICATION_FACTOR = 3
- +
- + val zkConnect: String = TestZKUtils.zookeeperConnect
- + var zkClient: ZkClient = null
- + val zkConnectionTimeout = 6000
- + val zkSessionTimeout = 6000
- +
- + val brokerId1 = 0
- + val brokerId2 = 1
- + val brokerId3 = 2
- + val ports = TestUtils.choosePorts(3)
- + val (port1, port2, port3) = (ports(0), ports(1), ports(2))
- +
- + val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- + val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- + val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- + props1.setProperty("auto.create.topics.enable","false")
- + props2.setProperty("auto.create.topics.enable","false")
- + props3.setProperty("auto.create.topics.enable","false")
- +
- + val config = new util.HashMap[String, Object]()
- + val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- + config.put("bootstrap.servers", brokers)
- + config.put("request.required.acks", "-1")
- + config.put("serializer.class", "kafka.serializer.StringEncoder")
- + config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
- + config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
- + val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
- + var producer: Producer[Array[Byte], Array[Byte]] = null
- + val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
- + val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
- + var zookeeper: EmbeddedZookeeper = null
- + var server1: KafkaServer = null
- + var server2: KafkaServer = null
- + var server3: KafkaServer = null
- + var metadataStore: TopicMetadataStore = null
- +
- + @BeforeClass
- + def beforeSetupServers {
- + zookeeper = new EmbeddedZookeeper(zkConnect)
- + server1 = TestUtils.createServer(new KafkaConfig(props1))
- + server2 = TestUtils.createServer(new KafkaConfig(props2))
- + server3 = TestUtils.createServer(new KafkaConfig(props3))
- + zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
- + producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
- + metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
- +
- + createTopics
- + validateTopics
- + }
- +
- + def createTopics {
- + AdminUtils.createTopic(
- + zkClient,
- + INPUT_TOPIC,
- + TOTAL_TASK_NAMES,
- + REPLICATION_FACTOR)
- + }
- +
- + def validateTopics {
- + val topics = Set(INPUT_TOPIC)
- + var done = false
- + var retries = 0
- +
- + while (!done && retries < 100) {
- + try {
- + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(topics, "kafka", metadataStore.getTopicInfo)
- +
- + topics.foreach(topic => {
- + val topicMetadata = topicMetadataMap(topic)
- + val errorCode = topicMetadata.errorCode
- +
- + KafkaUtil.maybeThrowException(errorCode)
- + })
- +
- + done = true
- + } catch {
- + case e: Exception =>
- + System.err.println("Got exception while validating test topics. Waiting and retrying.", e)
- + retries += 1
- + Thread.sleep(500)
- + }
- + }
- +
- + if (retries >= 100) {
- + fail("Unable to successfully create topics. Tried to validate %s times." format retries)
- + }
- + }
- +
- + @AfterClass
- + def afterCleanLogDirs {
- + producer.close()
- + server1.shutdown
- + server1.awaitShutdown()
- + server2.shutdown
- + server2.awaitShutdown()
- + server3.shutdown
- + server3.awaitShutdown()
- + Utils.rm(server1.config.logDirs)
- + Utils.rm(server2.config.logDirs)
- + Utils.rm(server3.config.logDirs)
- + zkClient.close
- + zookeeper.shutdown
- + }
- +}
- +
- +/**
- + * Test that does the following:
- + *
- + * 1. Starts ZK, and 3 kafka brokers.
- + * 2. Create two topics: input and mystore.
- + * 3. Validate that the topics were created successfully and have leaders.
- + * 4. Start a single partition of TestTask using ThreadJobFactory.
- + * 5. Send four messages to input (1,2,3,2), which contain one dupe (2).
- + * 6. Validate that all messages were received by TestTask.
- + * 7. Validate that TestTask called store.put() for all four messages, and that the messages ended up in the mystore topic.
- + * 8. Kill the job.
- + * 9. Start the job again.
- + * 10. Validate that the job restored all messages (1,2,3) to the store.
- + * 11. Send three more messages to input (4,5,5), and validate that TestTask receives them.
- + * 12. Kill the job again.
- + */
- +class TestSimpleWindowTask {
- + import TestSimpleWindowTask._
- +
- + val INPUT_TOPIC = "input"
- + val STORE_NAME = "store"
- + val STATE_TOPIC_STREAM = "mystoreChangelog"
- +
- + val jobConfig = Map(
- + "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
- + "job.name" -> "hello-stateful-world",
- + "task.class" -> "org.apache.samza.test.integration.TestWindowTask",
- + "task.inputs" -> "kafka.input",
- + "serializers.registry.string.class" -> "org.apache.samza.serializers.StringSerdeFactory",
- + "stores.store.factory" -> "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory",
- + "stores.store.key.serde" -> "string",
- + "stores.store.msg.serde" -> "string",
- + "stores.store.write.batch.size" -> "1",
- + "stores.store.changelog" -> "kafka.mystoreChangelog",
- + "stores.store.changelog.replication.factor" -> "1",
- + "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
- + // Always start consuming at offset 0. This avoids a race condition between
- + // the producer and the consumer in this test (SAMZA-166, SAMZA-224).
- + "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
- + "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
- + "systems.kafka.samza.msg.serde" -> "string",
- + "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
- + "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1),
- + // Since using state, need a checkpoint manager
- + "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
- + "task.checkpoint.system" -> "kafka",
- + "task.checkpoint.replication.factor" -> "1",
- + "task.window.ms" -> "10",
- + "systems.kafka.streams.input.samza.reset.offset" -> "false")
- +
- + @Test
- + def testShouldStartAndRestore {
- + // Have to do this in one test to guarantee ordering.
- + testSimpleWindowStateStore
- + }
- +
- + def testSimpleWindowStateStore {
- + val (job, task) = startJob
- +
- + // Validate that restored is empty.
- + assertEquals(0, task.initFinished.getCount)
- + // assertEquals(0, task.restored.size)
- + // assertEquals(0, task.received.size)
- +
- + // Send some messages to input stream.
- + send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 0}")
- + send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 1}")
- + send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 2}")
- + send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 3}")
- + send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 4}")
- + send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 5}")
- +
- + // Validate that messages appear in store stream.
- + val messages = readAll(STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
- +
- + assertEquals(6, messages.length)
- +// assertEquals("1", messages(0))
- +// assertEquals("2", messages(1))
- +// assertEquals("3", messages(2))
- +// assertEquals("2", messages(3))
- +// assertEquals("99", messages(4))
- +// assertNull(messages(5))
- +
- + task.awaitWindow
- +
- + stopJob(job)
- + }
- +
- + /**
- + * Start a job for TestJob, and do some basic sanity checks around startup
- + * time, number of partitions, etc.
- + */
- + def startJob = {
- + // Start task.
- + val job = new JobRunner(new MapConfig(jobConfig)).run
- + assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000))
- + TestWindowTask.awaitTaskRegistered
- + val tasks = TestWindowTask.tasks
- + assertEquals("Should only have a single partition in this task", 1, tasks.size)
- + val task = tasks.values.toList.head
- + task.initFinished.await(60, TimeUnit.SECONDS)
- + assertEquals(0, task.initFinished.getCount)
- + (job, task)
- + }
- +
- + /**
- + * Kill a job, and wait for an unsuccessful finish (since this throws an
- + * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
- + */
- + def stopJob(job: StreamJob) {
- + // Shutdown task.
- + job.kill
- + assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(60000))
- + }
- +
- + /**
- + * Send a message to the input topic, and validate that it gets to the test task.
- + */
- + def send(task: TestWindowTask, msg: String) {
- + producer.send(new ProducerRecord(INPUT_TOPIC, msg.getBytes)).get()
- + task.awaitMessage
- + }
- +
- + /**
- + * Read all messages from a topic starting from last saved offset for group.
- + * To read all from offset 0, specify a unique, new group string.
- + */
- + def readAll(topic: String, maxOffsetInclusive: Int, group: String): List[String] = {
- + val props = new Properties
- +
- + props.put("zookeeper.connect", zkConnect)
- + props.put("group.id", group)
- + props.put("auto.offset.reset", "smallest")
- +
- + val consumerConfig = new ConsumerConfig(props)
- + val consumerConnector = Consumer.create(consumerConfig)
- + var stream = consumerConnector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0).iterator
- + var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null
- + var messages = ArrayBuffer[String]()
- +
- + while (message == null || message.offset < maxOffsetInclusive) {
- + message = stream.next
- + if (message.message == null) {
- + messages += null
- + } else {
- + messages += new String(message.message, "UTF-8")
- + }
- + System.err.println("TestSimpleWindowTask.readAll(): offset=%s, message=%s" format (message.offset, messages.last))
- + }
- +
- + System.err.println("TestSimpleWindowTask.readAll(): complete the read loop")
- +
- + consumerConnector.shutdown
- +
- + messages.toList
- + }
- +}
- +
- +object TestWindowTask {
- + val tasks = new HashMap[TaskName, TestWindowTask] with SynchronizedMap[TaskName, TestWindowTask]
- + @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
- +
- + /**
- + * Static method that tasks can use to register themselves with. Useful so
- + * we don't have to sneak into the ThreadJob/SamzaContainer to get our test
- + * tasks.
- + */
- + def register(taskName: TaskName, task: TestWindowTask) {
- + tasks += taskName -> task
- + allTasksRegistered.countDown
- + }
- +
- + def awaitTaskRegistered {
- + allTasksRegistered.await(60, TimeUnit.SECONDS)
- + assertEquals(0, allTasksRegistered.getCount)
- + assertEquals(TestStatefulTask.TOTAL_TASK_NAMES, tasks.size)
- + // Reset the registered latch, so we can use it again every time we start a new job.
- + TestWindowTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
- + }
- +}
- +
- +class TestWindowTask extends SimpleWindowTestTask {
- + val initFinished = new CountDownLatch(1)
- + var gotMessage = new CountDownLatch(1)
- + var gotWindow = new CountDownLatch(1)
- +
- + override def init(config: Config, context: TaskContext) {
- + TestWindowTask.register(context.getTaskName, this)
- + super.init(config, context)
- + initFinished.countDown()
- + }
- +
- + override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
- + super.process(envelope, collector, coordinator)
- + // Notify sender that we got a message.
- + gotMessage.countDown
- + }
- +
- + override def window(collector: MessageCollector, coordinator: TaskCoordinator): Unit = {
- + super.window(collector, coordinator)
- + gotWindow.countDown()
- + }
- +
- + def awaitMessage {
- + assertTrue("Timed out of waiting for message rather than received one.", gotMessage.await(60, TimeUnit.SECONDS))
- + assertEquals(0, gotMessage.getCount)
- + gotMessage = new CountDownLatch(1)
- + }
- +
- + def awaitWindow: Unit = {
- + assertTrue("Timed out of waiting for window operation.", gotWindow.await(60, TimeUnit.SECONDS))
- + assertEquals(0, gotWindow.getCount)
- + gotWindow = new CountDownLatch(1)
- + }
- +}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement