Advertisement
Guest User

Untitled

a guest
Aug 16th, 2016
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 6.77 KB | None | 0 0
  1. package com.bwsw.t_sreams.hello
  2.  
  3. import java.util.UUID
  4. import java.util.concurrent.CountDownLatch
  5. import com.bwsw.tstreams.agents.consumer.Offsets.{Newest, Oldest}
  6. import com.bwsw.tstreams.agents.consumer.subscriber.{SubscribingConsumer, Callback}
  7. import com.bwsw.tstreams.agents.producer.NewTransactionProducerPolicy
  8. import com.bwsw.tstreams.env.{TStreamsFactory, TSF_Dictionary}
  9. import com.bwsw.tstreams.generator.LocalTimeUUIDGenerator
  10. import com.bwsw.tstreams.converter.{StringToArrayByteConverter, ArrayByteToStringConverter}
  11.  
  12. /**
  13.   * Created by ivan on 05.08.16.
  14.   */
  15. object HelloProducer {
  16.   def main(args: Array[String]): Unit = {
  17.  
  18.     // create factory
  19.     val f = new TStreamsFactory()
  20.     f.setProperty(TSF_Dictionary.Metadata.Cluster.NAMESPACE, "tk_1").                 // keyspace must exist in C*
  21.       setProperty(TSF_Dictionary.Data.Cluster.NAMESPACE, "test").                     // exists by default in Aerospike
  22.       setProperty(TSF_Dictionary.Producer.BIND_PORT, 18001).                          // producer will listen localhost:18001
  23.       setProperty(TSF_Dictionary.Consumer.Subscriber.BIND_PORT, 18002).               // subscriber will listen localhost:18002
  24.       setProperty(TSF_Dictionary.Consumer.Subscriber.PERSISTENT_QUEUE_PATH, "/tmp").  // subscriber will store data bursts in /tmp
  25.       setProperty(TSF_Dictionary.Stream.NAME, "test-stream-1").                          // producer and consumer will operate on "test-stream" t-stream
  26.       setProperty(TSF_Dictionary.Stream.PARTITIONS, 16).
  27.       setProperty(TSF_Dictionary.Producer.THREAD_POOL, 16)
  28.  
  29.     val l = new CountDownLatch(1)
  30.     var cntr = 0
  31.     val TOTAL_TXNS = 10000
  32.     val TOTAL_ITMS = 1
  33.  
  34.     // create producer
  35.     val producer1 = f.getProducer[String](
  36.                 name = "test_producer-1",                     // name of the producer
  37.                 txnGenerator = new LocalTimeUUIDGenerator,  // where it will get new transactions
  38.                 converter = new StringToArrayByteConverter, // converter from String to internal data presentation
  39.                 partitions = List(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),                       // active partitions
  40.                 isLowPriority = false)                      // agent can be a master
  41.  
  42.     f.setProperty(TSF_Dictionary.Producer.BIND_PORT, 18003) // producer will listen localhost:18001
  43.  
  44.     // create producer
  45.     val producer2 = f.getProducer[String](
  46.       name = "test_producer-1",                   // name of the producer
  47.       txnGenerator = new LocalTimeUUIDGenerator,  // where it will get new transactions
  48.       converter = new StringToArrayByteConverter, // converter from String to internal data presentation
  49.       partitions = List(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),                       // active partitions
  50.       isLowPriority = false)                      // agent can be a master
  51.  
  52.     val startTime = System.currentTimeMillis()
  53.  
  54.     val t1 = new Thread(new Runnable {
  55.       override def run(): Unit = (0 until TOTAL_TXNS).foreach(
  56.         i => {
  57.           val t = producer1.newTransaction(policy = NewTransactionProducerPolicy.CheckpointIfOpened) // create new transaction
  58.           (0 until TOTAL_ITMS).foreach(j => t.send(s"I: ${i}, J: ${j}"))
  59.           if (i % 100 == 0)
  60.             println(i)
  61.           t.checkpoint(false)  // checkpoint the transaction
  62.         })
  63.     })
  64.  
  65.     val t2 = new Thread(new Runnable {
  66.       override def run(): Unit = (0 until TOTAL_TXNS).foreach(
  67.         i => {
  68.           val t = producer2.newTransaction(policy = NewTransactionProducerPolicy.CheckpointIfOpened) // create new transaction
  69.           (0 until TOTAL_ITMS).foreach(j => t.send(s"I: ${i}, J: ${j}"))
  70.           if (i % 100 == 0)
  71.             println(i)
  72.           t.checkpoint(false)  // checkpoint the transaction
  73.         })
  74.     })
  75.  
  76.     t1.start()
  77.     t2.start()
  78.     t1.join()
  79.     t2.join()
  80.     val stopTime = System.currentTimeMillis()
  81.     println(s"Execution time is: ${stopTime - startTime}")
  82.     producer1.stop()   // stop operation
  83.     producer2.stop()
  84.     f.close()         // end operation
  85.     System.exit(0)
  86.   }
  87. }
  88.  
  89. /**
  90.   * Created by ivan on 05.08.16.
  91.   */
  92. object HelloSubscriber {
  93.   def main(args: Array[String]): Unit = {
  94.  
  95.     // create factory
  96.     val f = new TStreamsFactory()
  97.     f.setProperty(TSF_Dictionary.Metadata.Cluster.NAMESPACE, "tk_1").                 // keyspace must exist in C*
  98.       setProperty(TSF_Dictionary.Data.Cluster.NAMESPACE, "test").                     // exists by default in Aerospike
  99.       setProperty(TSF_Dictionary.Producer.BIND_PORT, 18001).                          // producer will listen localhost:18001
  100.       setProperty(TSF_Dictionary.Consumer.Subscriber.BIND_PORT, 18002).               // subscriber will listen localhost:18002
  101.       setProperty(TSF_Dictionary.Consumer.Subscriber.PERSISTENT_QUEUE_PATH, "/tmp").  // subscriber will store data bursts in /tmp
  102.       setProperty(TSF_Dictionary.Stream.NAME, "test-stream-1").                          // producer and consumer will operate on "test-stream" t-stream
  103.       setProperty(TSF_Dictionary.Stream.PARTITIONS, 16).
  104.       setProperty(TSF_Dictionary.Producer.THREAD_POOL, 16)                    // producer and consumer will operate on "test-stream" t-stream
  105.  
  106.     val l = new CountDownLatch(1)
  107.     var cntr = 0
  108.     val TOTAL_TXNS = 1000
  109.  
  110.     val subscriber = f.getSubscriber[String](
  111.       name          = "test_subscriber-1",              // name of the subscribing consumer
  112.       txnGenerator  = new LocalTimeUUIDGenerator,     // where it can get transaction uuids
  113.       converter     = new ArrayByteToStringConverter, // vice versa converter to string
  114.       partitions    = List(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15),                        // active partitions
  115.       offset        = Oldest,                         // it will start from newest available partitions
  116.       isUseLastOffset = true,                        // will ignore history
  117.       callback = new Callback[String] {
  118.         override def onEvent(subscriber: SubscribingConsumer[String], partition: Int, transactionUuid: UUID): Unit = {
  119.           val txn = subscriber.getTransactionById(partition, transactionUuid) // get transaction
  120.           txn.get.getAll().foreach(i => i)                           // get all information from transaction
  121.           cntr += 1
  122.           if (cntr % 100 == 0)
  123.             println(cntr)
  124.           if(cntr == TOTAL_TXNS)                                              // if the producer sent all information, then end
  125.             l.countDown()
  126.         }
  127.       })
  128.  
  129.     subscriber.start() // start subscriber to operate
  130.  
  131.     val startTime = System.currentTimeMillis()
  132.  
  133.     l.await()
  134.     subscriber.checkpoint()
  135.     subscriber.stop() // stop operation
  136.     f.close()         // end operation
  137.     System.exit(0)
  138.   }
  139. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement