Advertisement
Guest User

Untitled

a guest
Jul 22nd, 2015
337
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 22.84 KB | None | 0 0
  1. diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/SimpleWindowTestTask.java b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleWindowTestTask.java
  2. new file mode 100644
  3. index 0000000..4e80ba2
  4. --- /dev/null
  5. +++ b/samza-test/src/main/java/org/apache/samza/test/integration/SimpleWindowTestTask.java
  6. @@ -0,0 +1,188 @@
  7. +package org.apache.samza.test.integration;
  8. +
  9. +import java.util.HashMap;
  10. +import java.util.Map;
  11. +import java.util.Set;
  12. +import java.util.Iterator;
  13. +import java.io.StringWriter;
  14. +// import org.json.simple.JSONValue;
  15. +import org.codehaus.jackson.JsonGenerator;
  16. +import kafka.producer.KeyedMessage;
  17. +import org.codehaus.jackson.JsonNode;
  18. +import org.codehaus.jackson.JsonParseException;
  19. +import org.apache.samza.config.Config;
  20. +import org.apache.samza.system.IncomingMessageEnvelope;
  21. +import org.apache.samza.system.OutgoingMessageEnvelope;
  22. +import org.apache.samza.system.SystemStream;
  23. +import org.apache.samza.task.MessageCollector;
  24. +import org.apache.samza.task.StreamTask;
  25. +import org.apache.samza.task.TaskCoordinator;
  26. +import org.apache.samza.task.TaskContext;
  27. +import org.codehaus.jackson.map.JsonMappingException;
  28. +import org.codehaus.jackson.map.ObjectMapper;
  29. +
  30. +import org.apache.samza.storage.kv.KeyValueStore;
  31. +import org.apache.samza.storage.kv.KeyValueIterator;
  32. +import org.apache.samza.storage.kv.Entry;
  33. +import org.apache.samza.task.InitableTask;
  34. +import org.apache.samza.task.WindowableTask;
  35. +import org.slf4j.Logger;
  36. +import org.slf4j.LoggerFactory;
  37. +import org.codehaus.jackson.JsonParser;
  38. +
  39. +// import redis.clients.jedis.Jedis;
  40. +
  41. +//import com.google.gson.Gson;
  42. +import java.io.FileNotFoundException;
  43. +import java.io.IOException;
  44. +import java.io.FileInputStream;
  45. +import java.util.Properties;
  46. +import java.util.Set;
  47. +//import java.util.Map;
  48. +
  49. +
  50. +public class SimpleWindowTestTask implements StreamTask, InitableTask, WindowableTask {
  51. +
  52. +  private KeyValueStore<String, String> store;
  53. +  Config config;
  54. +  TaskContext context;
  55. +  String val;
  56. +  String outTopic;
  57. +  HashMap propVals;
  58. +  String useProp;
  59. +  private int eventsSeen = 0;
  60. +  private String appName;
  61. +  public SystemStream OUTPUT_STREAM;
  62. +  // private Jedis jedis;
  63. +  // private String redis;
  64. +  private HashMap hm = new HashMap();
  65. +  Map<String, String> appMap = new HashMap();
  66. +
  67. +  
  68. +  private static final Logger log = LoggerFactory.getLogger(SimpleWindowTestTask.class);
  69. +  
  70. +   /* Constructor */
  71. +
  72. +
  73. +  public HashMap readPropertyFile() {
  74. +   log.info("Reading the property file ...");
  75. +   Properties prop = new Properties();
  76. +   FileInputStream input;
  77. +   HashMap<String, String> propvals = new HashMap<String, String>();
  78. +   try {
  79. +       //adjust this to that of remote server
  80. +       input = new FileInputStream("./__package/config/abc.properties");
  81. +       prop.load(input);
  82. +       log.info("Property file loaded succesfully.");
  83. +       Set<String> propertyNames = prop.stringPropertyNames();
  84. +       log.debug("Property file content: ");
  85. +       for (String Property : propertyNames) {
  86. +           log.debug(Property + ":" + prop.getProperty(Property));
  87. +           propvals.put(Property, prop.getProperty(Property));
  88. +       }
  89. +       log.debug("HashMap generated: " + propvals);
  90. +   } catch (FileNotFoundException e) {
  91. +       log.warn(e.getMessage());
  92. +       e.printStackTrace();
  93. +   } catch (IOException e) {
  94. +       log.warn(e.getMessage());
  95. +       e.printStackTrace();
  96. +   } catch (Exception e) {
  97. +       log.warn(e.getMessage());
  98. +       e.printStackTrace();
  99. +   }
  100. +   return propvals;
  101. +  }
  102. +  
  103. +  public void init(Config config, TaskContext context) {
  104. +   log.info("INIT SimpleWindowTestTask");
  105. +    this.outTopic = config.get("task.outputs");
  106. +    this.store = (KeyValueStore<String, String>) context.getStore("store");
  107. +    OUTPUT_STREAM = new SystemStream("kafka", outTopic);
  108. +    this.useProp = config.get("load.property");
  109. +    //populate the propVals from Property File else pass in an empty HashMap
  110. +    if (Boolean.parseBoolean(useProp)) propVals = readPropertyFile();
  111. +    else propVals = new HashMap<String, String>();
  112. +   /**
  113. +   log.info("Connecting to redis at " + redis + "...");
  114. +   jedis = new Jedis(redis);
  115. +   try{
  116. +       jedis.connect();
  117. +   }catch (Exception e){
  118. +       log.warn ("Unable to connect to redis. Try checking with service-now directly (not implemented yet)");
  119. +       e.printStackTrace();
  120. +   }
  121. +   log.info("Connected to redis");
  122. +   **/
  123. +  }
  124. +  @SuppressWarnings("unchecked")
  125. +  @Override
  126. +  //public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException, JsonParseException, JsonMappingException, IOException {
  127. +  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) throws java.lang.InterruptedException {
  128. +
  129. +   ObjectMapper mapper = new ObjectMapper();
  130. +   log.info("Envelope Key : " + envelope.getKey());
  131. +   mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); 
  132. +   JsonNode node;
  133. +   try {
  134. +       //log.info("Raw message: " + envelope.toString());
  135. +       node = mapper.readValue(envelope.getMessage().toString(), JsonNode.class);
  136. +       //log.info("Raw json: " + node.get("event").get("application_name").toString());
  137. +       //String appName=node.findValues("application_name").get(0).toString();
  138. +       String appName=node.get("event").get("application_name").toString();
  139. +
  140. +       try{
  141. +           eventsSeen=Integer.parseInt(store.get(appName));
  142. +          
  143. +           // eventsSeen=Integer.parseInt(store.get(appName));
  144. +           // eventsSeen=Integer.parseInt(appMap.get(uniqueKey));
  145. +           eventsSeen++;
  146. +           log.info("Application Name: " + appName + "eventSeen " + eventsSeen);
  147. +           /**
  148. +           try{
  149. +               //hm.put(appName,eventsSeen);
  150. +               appMap.put(uniqueKey ,  eventsSeen + "");
  151. +           }catch (Exception e){
  152. +               log.info("Error while adding elements to locah Hash " + e.getMessage());
  153. +               e.printStackTrace();
  154. +           }
  155. +           **/
  156. +           try{
  157. +               store.put(appName, String.valueOf(eventsSeen)); // Needs to be (string,string)
  158. +               log.info("Added new value " + store.get(appName));
  159. +           }catch (Exception e){
  160. +               log.info("No value found for application in kv store: " + e.getMessage());
  161. +               e.printStackTrace();
  162. +           }
  163. +       }catch (Exception e){          
  164. +           log.warn("No value found for application in kv store: " + e.getMessage());
  165. +           // appMap.put(uniqueKey, "0");
  166. +           store.put(appName, "1");
  167. +           //e.printStackTrace();
  168. +       }
  169. +
  170. +   } catch (JsonParseException e1) {
  171. +       // TODO Auto-generated catch block
  172. +       e1.printStackTrace();
  173. +   } catch (JsonMappingException e1) {
  174. +       // TODO Auto-generated catch block
  175. +       e1.printStackTrace();
  176. +   } catch (IOException e1) {
  177. +       // TODO Auto-generated catch block
  178. +       e1.printStackTrace();
  179. +   }
  180. +  }
  181. +  
  182. +  public void window(MessageCollector collector,
  183. +      TaskCoordinator coordinator) {
  184. +    KeyValueIterator<String, String> iter = store.all();
  185. +    while(iter.hasNext()) {
  186. +      Entry<String, String> entry = iter.next();
  187. +      log.info(String.format("Iterating through store entry: <%s, %s>", entry.getKey(), entry.getValue()));
  188. +      store.delete(entry.getKey());
  189. +    }
  190. +    iter.close();
  191. +      appMap.clear();   
  192. +     eventsSeen = 0;    
  193. +  }
  194. +}
  195. diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestSimpleWindowTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestSimpleWindowTask.scala
  196. new file mode 100644
  197. index 0000000..5d7f804
  198. --- /dev/null
  199. +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestSimpleWindowTask.scala
  200. @@ -0,0 +1,395 @@
  201. +/*
  202. + * Licensed to the Apache Software Foundation (ASF) under one
  203. + * or more contributor license agreements.  See the NOTICE file
  204. + * distributed with this work for additional information
  205. + * regarding copyright ownership.  The ASF licenses this file
  206. + * to you under the Apache License, Version 2.0 (the
  207. + * "License"); you may not use this file except in compliance
  208. + * with the License.  You may obtain a copy of the License at
  209. + *
  210. + *   http://www.apache.org/licenses/LICENSE-2.0
  211. + *
  212. + * Unless required by applicable law or agreed to in writing,
  213. + * software distributed under the License is distributed on an
  214. + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  215. + * KIND, either express or implied.  See the License for the
  216. + * specific language governing permissions and limitations
  217. + * under the License.
  218. + */
  219. +
  220. +package org.apache.samza.test.integration
  221. +
  222. +import java.util.Properties
  223. +import java.util.concurrent.CountDownLatch
  224. +import java.util.concurrent.TimeUnit
  225. +import java.util
  226. +
  227. +import kafka.admin.AdminUtils
  228. +import kafka.consumer.Consumer
  229. +import kafka.consumer.ConsumerConfig
  230. +import kafka.message.MessageAndMetadata
  231. +import kafka.server.KafkaConfig
  232. +import kafka.server.KafkaServer
  233. +import kafka.utils.TestUtils
  234. +import kafka.utils.TestZKUtils
  235. +import kafka.utils.Utils
  236. +import kafka.utils.ZKStringSerializer
  237. +import kafka.zk.EmbeddedZookeeper
  238. +import org.I0Itec.zkclient.ZkClient
  239. +import org.apache.samza.Partition
  240. +import org.apache.samza.checkpoint.Checkpoint
  241. +import org.apache.samza.config.Config
  242. +import org.apache.samza.job.local.ThreadJobFactory
  243. +import org.apache.samza.config.MapConfig
  244. +import org.apache.samza.container.TaskName
  245. +import org.apache.samza.job.ApplicationStatus
  246. +import org.apache.samza.job.StreamJob
  247. +import org.apache.samza.storage.kv.KeyValueStore
  248. +import org.apache.samza.system.kafka.TopicMetadataCache
  249. +import org.apache.samza.system.{SystemStreamPartition, IncomingMessageEnvelope}
  250. +import org.apache.samza.config.KafkaProducerConfig
  251. +import org.apache.samza.task.InitableTask
  252. +import org.apache.samza.task.MessageCollector
  253. +import org.apache.samza.task.StreamTask
  254. +import org.apache.samza.task.TaskContext
  255. +import org.apache.samza.task.TaskCoordinator
  256. +import org.apache.samza.task.TaskCoordinator.RequestScope
  257. +import org.apache.samza.util.ClientUtilTopicMetadataStore
  258. +import org.apache.samza.util.TopicMetadataStore
  259. +import org.apache.samza.job.JobRunner
  260. +import org.junit.Assert._
  261. +import org.junit.{BeforeClass, AfterClass, Test}
  262. +import scala.collection.JavaConversions._
  263. +import scala.collection.mutable.ArrayBuffer
  264. +import scala.collection.mutable.HashMap
  265. +import scala.collection.mutable.SynchronizedMap
  266. +import org.apache.samza.job.JobRunner
  267. +import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
  268. +import org.apache.samza.util.KafkaUtil
  269. +
  270. +object TestSimpleWindowTask {
  271. +  val INPUT_TOPIC = "input"
  272. +  val STORE_NAME = "store"
  273. +  val STATE_TOPIC_STREAM = "mystoreChangelog"
  274. +  val TOTAL_TASK_NAMES = 1
  275. +  val REPLICATION_FACTOR = 3
  276. +
  277. +  val zkConnect: String = TestZKUtils.zookeeperConnect
  278. +  var zkClient: ZkClient = null
  279. +  val zkConnectionTimeout = 6000
  280. +  val zkSessionTimeout = 6000
  281. +
  282. +  val brokerId1 = 0
  283. +  val brokerId2 = 1
  284. +  val brokerId3 = 2
  285. +  val ports = TestUtils.choosePorts(3)
  286. +  val (port1, port2, port3) = (ports(0), ports(1), ports(2))
  287. +
  288. +  val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
  289. +  val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
  290. +  val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
  291. +  props1.setProperty("auto.create.topics.enable","false")
  292. +  props2.setProperty("auto.create.topics.enable","false")
  293. +  props3.setProperty("auto.create.topics.enable","false")
  294. +
  295. +  val config = new util.HashMap[String, Object]()
  296. +  val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
  297. +  config.put("bootstrap.servers", brokers)
  298. +  config.put("request.required.acks", "-1")
  299. +  config.put("serializer.class", "kafka.serializer.StringEncoder")
  300. +  config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
  301. +  config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(Integer.MAX_VALUE-1)).toString())
  302. +  val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
  303. +  var producer: Producer[Array[Byte], Array[Byte]] = null
  304. +  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
  305. +  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
  306. +  var zookeeper: EmbeddedZookeeper = null
  307. +  var server1: KafkaServer = null
  308. +  var server2: KafkaServer = null
  309. +  var server3: KafkaServer = null
  310. +  var metadataStore: TopicMetadataStore = null
  311. +
  312. +  @BeforeClass
  313. +  def beforeSetupServers {
  314. +    zookeeper = new EmbeddedZookeeper(zkConnect)
  315. +    server1 = TestUtils.createServer(new KafkaConfig(props1))
  316. +    server2 = TestUtils.createServer(new KafkaConfig(props2))
  317. +    server3 = TestUtils.createServer(new KafkaConfig(props3))
  318. +    zkClient = new ZkClient(zkConnect + "/", 6000, 6000, ZKStringSerializer)
  319. +    producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
  320. +    metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
  321. +
  322. +    createTopics
  323. +    validateTopics
  324. +  }
  325. +
  326. +  def createTopics {
  327. +    AdminUtils.createTopic(
  328. +      zkClient,
  329. +      INPUT_TOPIC,
  330. +      TOTAL_TASK_NAMES,
  331. +      REPLICATION_FACTOR)
  332. +  }
  333. +
  334. +  def validateTopics {
  335. +    val topics = Set(INPUT_TOPIC)
  336. +    var done = false
  337. +    var retries = 0
  338. +
  339. +    while (!done && retries < 100) {
  340. +      try {
  341. +        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(topics, "kafka", metadataStore.getTopicInfo)
  342. +
  343. +        topics.foreach(topic => {
  344. +          val topicMetadata = topicMetadataMap(topic)
  345. +          val errorCode = topicMetadata.errorCode
  346. +
  347. +          KafkaUtil.maybeThrowException(errorCode)
  348. +        })
  349. +
  350. +        done = true
  351. +      } catch {
  352. +        case e: Exception =>
  353. +          System.err.println("Got exception while validating test topics. Waiting and retrying.", e)
  354. +          retries += 1
  355. +          Thread.sleep(500)
  356. +      }
  357. +    }
  358. +
  359. +    if (retries >= 100) {
  360. +      fail("Unable to successfully create topics. Tried to validate %s times." format retries)
  361. +    }
  362. +  }
  363. +
  364. +  @AfterClass
  365. +  def afterCleanLogDirs {
  366. +    producer.close()
  367. +    server1.shutdown
  368. +    server1.awaitShutdown()
  369. +    server2.shutdown
  370. +    server2.awaitShutdown()
  371. +    server3.shutdown
  372. +    server3.awaitShutdown()
  373. +    Utils.rm(server1.config.logDirs)
  374. +    Utils.rm(server2.config.logDirs)
  375. +    Utils.rm(server3.config.logDirs)
  376. +    zkClient.close
  377. +    zookeeper.shutdown
  378. +  }
  379. +}
  380. +
  381. +/**
  382. + * Test that does the following:
  383. + *
  384. + * 1. Starts ZK, and 3 kafka brokers.
  385. + * 2. Create two topics: input and mystore.
  386. + * 3. Validate that the topics were created successfully and have leaders.
  387. + * 4. Start a single partition of TestTask using ThreadJobFactory.
  388. + * 5. Send four messages to input (1,2,3,2), which contain one dupe (2).
  389. + * 6. Validate that all messages were received by TestTask.
  390. + * 7. Validate that TestTask called store.put() for all four messages, and that the messages ended up in the mystore topic.
  391. + * 8. Kill the job.
  392. + * 9. Start the job again.
  393. + * 10. Validate that the job restored all messages (1,2,3) to the store.
  394. + * 11. Send three more messages to input (4,5,5), and validate that TestTask receives them.
  395. + * 12. Kill the job again.
  396. + */
  397. +class TestSimpleWindowTask {
  398. +  import TestSimpleWindowTask._
  399. +
  400. +  val INPUT_TOPIC = "input"
  401. +  val STORE_NAME = "store"
  402. +  val STATE_TOPIC_STREAM = "mystoreChangelog"
  403. +
  404. +  val jobConfig = Map(
  405. +    "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
  406. +    "job.name" -> "hello-stateful-world",
  407. +    "task.class" -> "org.apache.samza.test.integration.TestWindowTask",
  408. +    "task.inputs" -> "kafka.input",
  409. +    "serializers.registry.string.class" -> "org.apache.samza.serializers.StringSerdeFactory",
  410. +    "stores.store.factory" -> "org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory",
  411. +    "stores.store.key.serde" -> "string",
  412. +    "stores.store.msg.serde" -> "string",
  413. +    "stores.store.write.batch.size" -> "1",
  414. +    "stores.store.changelog" -> "kafka.mystoreChangelog",
  415. +    "stores.store.changelog.replication.factor" -> "1",
  416. +    "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
  417. +    // Always start consuming at offset 0. This avoids a race condition between
  418. +    // the producer and the consumer in this test (SAMZA-166, SAMZA-224).
  419. +    "systems.kafka.samza.offset.default" -> "oldest", // applies to a nonempty topic
  420. +    "systems.kafka.consumer.auto.offset.reset" -> "smallest", // applies to an empty topic
  421. +    "systems.kafka.samza.msg.serde" -> "string",
  422. +    "systems.kafka.consumer.zookeeper.connect" -> zkConnect,
  423. +    "systems.kafka.producer.bootstrap.servers" -> ("localhost:%s" format port1),
  424. +    // Since using state, need a checkpoint manager
  425. +    "task.checkpoint.factory" -> "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory",
  426. +    "task.checkpoint.system" -> "kafka",
  427. +    "task.checkpoint.replication.factor" -> "1",
  428. +    "task.window.ms" -> "10",
  429. +    "systems.kafka.streams.input.samza.reset.offset" -> "false")
  430. +
  431. +  @Test
  432. +  def testShouldStartAndRestore {
  433. +    // Have to do this in one test to guarantee ordering.
  434. +    testSimpleWindowStateStore
  435. +  }
  436. +
  437. +  def testSimpleWindowStateStore {
  438. +    val (job, task) = startJob
  439. +
  440. +    // Validate that restored is empty.
  441. +    assertEquals(0, task.initFinished.getCount)
  442. +    // assertEquals(0, task.restored.size)
  443. +    // assertEquals(0, task.received.size)
  444. +
  445. +    // Send some messages to input stream.
  446. +    send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 0}")
  447. +    send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 1}")
  448. +    send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 2}")
  449. +    send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 3}")
  450. +    send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 4}")
  451. +    send(task, "{\"event\": {\"application_name\": \"abc\"}, \"seqno\": 5}")
  452. +
  453. +    // Validate that messages appear in store stream.
  454. +    val messages = readAll(STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime")
  455. +
  456. +    assertEquals(6, messages.length)
  457. +//    assertEquals("1", messages(0))
  458. +//    assertEquals("2", messages(1))
  459. +//    assertEquals("3", messages(2))
  460. +//    assertEquals("2", messages(3))
  461. +//    assertEquals("99", messages(4))
  462. +//    assertNull(messages(5))
  463. +
  464. +    task.awaitWindow
  465. +
  466. +    stopJob(job)
  467. +  }
  468. +
  469. +  /**
  470. +   * Start a job for TestJob, and do some basic sanity checks around startup
  471. +   * time, number of partitions, etc.
  472. +   */
  473. +  def startJob = {
  474. +    // Start task.
  475. +    val job = new JobRunner(new MapConfig(jobConfig)).run
  476. +    assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000))
  477. +    TestWindowTask.awaitTaskRegistered
  478. +    val tasks = TestWindowTask.tasks
  479. +    assertEquals("Should only have a single partition in this task", 1, tasks.size)
  480. +    val task = tasks.values.toList.head
  481. +    task.initFinished.await(60, TimeUnit.SECONDS)
  482. +    assertEquals(0, task.initFinished.getCount)
  483. +    (job, task)
  484. +  }
  485. +
  486. +  /**
  487. +   * Kill a job, and wait for an unsuccessful finish (since this throws an
  488. +   * interrupt, which is forwarded on to ThreadJob, and marked as a failure).
  489. +   */
  490. +  def stopJob(job: StreamJob) {
  491. +    // Shutdown task.
  492. +    job.kill
  493. +    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(60000))
  494. +  }
  495. +
  496. +  /**
  497. +   * Send a message to the input topic, and validate that it gets to the test task.
  498. +   */
  499. +  def send(task: TestWindowTask, msg: String) {
  500. +    producer.send(new ProducerRecord(INPUT_TOPIC, msg.getBytes)).get()
  501. +    task.awaitMessage
  502. +  }
  503. +
  504. +  /**
  505. +   * Read all messages from a topic starting from last saved offset for group.
  506. +   * To read all from offset 0, specify a unique, new group string.
  507. +   */
  508. +  def readAll(topic: String, maxOffsetInclusive: Int, group: String): List[String] = {
  509. +    val props = new Properties
  510. +
  511. +    props.put("zookeeper.connect", zkConnect)
  512. +    props.put("group.id", group)
  513. +    props.put("auto.offset.reset", "smallest")
  514. +
  515. +    val consumerConfig = new ConsumerConfig(props)
  516. +    val consumerConnector = Consumer.create(consumerConfig)
  517. +    var stream = consumerConnector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0).iterator
  518. +    var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null
  519. +    var messages = ArrayBuffer[String]()
  520. +
  521. +    while (message == null || message.offset < maxOffsetInclusive) {
  522. +      message = stream.next
  523. +      if (message.message == null) {
  524. +        messages += null
  525. +      } else {
  526. +        messages += new String(message.message, "UTF-8")
  527. +      }
  528. +      System.err.println("TestSimpleWindowTask.readAll(): offset=%s, message=%s" format (message.offset, messages.last))
  529. +    }
  530. +
  531. +    System.err.println("TestSimpleWindowTask.readAll(): complete the read loop")
  532. +
  533. +    consumerConnector.shutdown
  534. +
  535. +    messages.toList
  536. +  }
  537. +}
  538. +
  539. +object TestWindowTask {
  540. +  val tasks = new HashMap[TaskName, TestWindowTask] with SynchronizedMap[TaskName, TestWindowTask]
  541. +  @volatile var allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
  542. +
  543. +  /**
  544. +   * Static method that tasks can use to register themselves with. Useful so
  545. +   * we don't have to sneak into the ThreadJob/SamzaContainer to get our test
  546. +   * tasks.
  547. +   */
  548. +  def register(taskName: TaskName, task: TestWindowTask) {
  549. +    tasks += taskName -> task
  550. +    allTasksRegistered.countDown
  551. +  }
  552. +
  553. +  def awaitTaskRegistered {
  554. +    allTasksRegistered.await(60, TimeUnit.SECONDS)
  555. +    assertEquals(0, allTasksRegistered.getCount)
  556. +    assertEquals(TestStatefulTask.TOTAL_TASK_NAMES, tasks.size)
  557. +    // Reset the registered latch, so we can use it again every time we start a new job.
  558. +    TestWindowTask.allTasksRegistered = new CountDownLatch(TestStatefulTask.TOTAL_TASK_NAMES)
  559. +  }
  560. +}
  561. +
  562. +class TestWindowTask extends SimpleWindowTestTask {
  563. +  val initFinished = new CountDownLatch(1)
  564. +  var gotMessage = new CountDownLatch(1)
  565. +  var gotWindow = new CountDownLatch(1)
  566. +
  567. +  override def init(config: Config, context: TaskContext) {
  568. +    TestWindowTask.register(context.getTaskName, this)
  569. +    super.init(config, context)
  570. +    initFinished.countDown()
  571. +  }
  572. +
  573. +  override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
  574. +    super.process(envelope, collector, coordinator)
  575. +    // Notify sender that we got a message.
  576. +    gotMessage.countDown
  577. +  }
  578. +
  579. +  override def window(collector: MessageCollector, coordinator: TaskCoordinator): Unit = {
  580. +    super.window(collector, coordinator)
  581. +    gotWindow.countDown()
  582. +  }
  583. +
  584. +  def awaitMessage {
  585. +    assertTrue("Timed out of waiting for message rather than received one.", gotMessage.await(60, TimeUnit.SECONDS))
  586. +    assertEquals(0, gotMessage.getCount)
  587. +    gotMessage = new CountDownLatch(1)
  588. +  }
  589. +
  590. +  def awaitWindow: Unit = {
  591. +    assertTrue("Timed out of waiting for window operation.", gotWindow.await(60, TimeUnit.SECONDS))
  592. +    assertEquals(0, gotWindow.getCount)
  593. +    gotWindow = new CountDownLatch(1)
  594. +  }
  595. +}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement