Advertisement
Guest User

Untitled

a guest
Jul 18th, 2018
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 3.75 KB | None | 0 0
  1. <?php
  2. class Test {
  3.  
  4.     public function actionKlow()
  5.     {
  6.         $conf = new \RdKafka\Conf();
  7.         $conf->set('group.id', 'scorer');
  8.         $conf->set('metadata.broker.list', 'kafka:29092');
  9.  
  10.         $rk = new \RdKafka\Consumer($conf);
  11.         $rk->setLogLevel(LOG_DEBUG);
  12.         $rk->addBrokers("kafka:29092");
  13.  
  14.         $topicConf = new \RdKafka\TopicConf();
  15.         $topicConf->set("auto.commit.interval.ms", 1e3);
  16.         $topicConf->set("offset.store.sync.interval.ms", 60e3);
  17.         $topicConf->set('enable.auto.commit', true);
  18.  
  19.         $topic = $rk->newTopic("events", $topicConf);
  20.  
  21.         $topic->consumeStart(0, 10); // position
  22.  
  23.         while (true) {
  24.             // The first argument is the partition (again).
  25.             // The second argument is the timeout.
  26.             $msg = $topic->consume(0, 1000);
  27.             if ($msg->err) {
  28.                 echo $msg->errstr(), "\n";
  29.                 break;
  30.             } else {
  31.                 echo sprintf("partition: %d offset: %d\n", $msg->partition, $msg->offset);
  32.                 $topic->offsetStore(0, $msg->offset);
  33.                 sleep(2);
  34.             }
  35.         }
  36.     }
  37.  
  38.     public function actionKhigh()
  39.     {
  40.         $conf = new \RdKafka\Conf();
  41.         $conf->set('group.id', 'scorer');
  42.         $conf->set('metadata.broker.list', 'kafka:29092');
  43.  
  44.         $conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
  45.             echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
  46.         });
  47.  
  48.         $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  49.             switch ($err) {
  50.                 case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  51.                     echo "Assign: ";
  52.                     var_dump($partitions);
  53.                     /**
  54.                      * @var $partition TopicPartition
  55.                      */
  56.                     foreach ($partitions as $partition)
  57.                         $partition->setOffset(1);
  58.                     $kafka->assign($partitions);
  59.                     break;
  60.  
  61.                 case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  62.                     echo "Revoke: ";
  63.                     var_dump($partitions);
  64.                     $kafka->assign(NULL);
  65.                     break;
  66.  
  67.                 default:
  68.                     throw new \Exception($err);
  69.             }
  70.         });
  71.  
  72.         $topicConf = new \RdKafka\TopicConf();
  73.  
  74.         $topicConf->set('offset.store.method', 'broker');
  75.         $conf->set('enable.auto.offset.store', true);
  76.         $conf->set('queued.max.messages.kbytes', 104857);
  77.         $topicConf->set('auto.offset.reset', 'smallest');
  78.         $conf->setDefaultTopicConf($topicConf);
  79.  
  80.         $consumer = new \RdKafka\KafkaConsumer($conf);
  81.         $consumer->subscribe(['events']);
  82.         echo "Waiting for partition assignment... (make take some time when\n";
  83.         echo "quickly re-joining the group after leaving it.)\n";
  84.  
  85.         while (true) {
  86.             $message = $consumer->consume(120*1000);;
  87.             switch ($message->err) {
  88.                 case RD_KAFKA_RESP_ERR_NO_ERROR:
  89.                     echo sprintf("partition: %d offset: %d\n", $message->partition, $message->offset);
  90.                     $consumer->commit($message);
  91.                     sleep(1);
  92.                     break;
  93.                 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  94.                     echo "No more messages; will wait for more\n";
  95.                     break;
  96.                 case RD_KAFKA_RESP_ERR__TIMED_OUT:
  97.                     echo "Timed out\n";
  98.                     break;
  99.                 default:
  100.                     throw new \Exception($message->errstr(), $message->err);
  101.                     break;
  102.             }
  103.         }
  104.     }
  105. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement