Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- class Test {
- public function actionKlow()
- {
- $conf = new \RdKafka\Conf();
- $conf->set('group.id', 'scorer');
- $conf->set('metadata.broker.list', 'kafka:29092');
- $rk = new \RdKafka\Consumer($conf);
- $rk->setLogLevel(LOG_DEBUG);
- $rk->addBrokers("kafka:29092");
- $topicConf = new \RdKafka\TopicConf();
- $topicConf->set("auto.commit.interval.ms", 1e3);
- $topicConf->set("offset.store.sync.interval.ms", 60e3);
- $topicConf->set('enable.auto.commit', true);
- $topic = $rk->newTopic("events", $topicConf);
- $topic->consumeStart(0, 10); // position
- while (true) {
- // The first argument is the partition (again).
- // The second argument is the timeout.
- $msg = $topic->consume(0, 1000);
- if ($msg->err) {
- echo $msg->errstr(), "\n";
- break;
- } else {
- echo sprintf("partition: %d offset: %d\n", $msg->partition, $msg->offset);
- $topic->offsetStore(0, $msg->offset);
- sleep(2);
- }
- }
- }
- public function actionKhigh()
- {
- $conf = new \RdKafka\Conf();
- $conf->set('group.id', 'scorer');
- $conf->set('metadata.broker.list', 'kafka:29092');
- $conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
- echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
- });
- $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- echo "Assign: ";
- var_dump($partitions);
- /**
- * @var $partition TopicPartition
- */
- foreach ($partitions as $partition)
- $partition->setOffset(1);
- $kafka->assign($partitions);
- break;
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- echo "Revoke: ";
- var_dump($partitions);
- $kafka->assign(NULL);
- break;
- default:
- throw new \Exception($err);
- }
- });
- $topicConf = new \RdKafka\TopicConf();
- $topicConf->set('offset.store.method', 'broker');
- $conf->set('enable.auto.offset.store', true);
- $conf->set('queued.max.messages.kbytes', 104857);
- $topicConf->set('auto.offset.reset', 'smallest');
- $conf->setDefaultTopicConf($topicConf);
- $consumer = new \RdKafka\KafkaConsumer($conf);
- $consumer->subscribe(['events']);
- echo "Waiting for partition assignment... (make take some time when\n";
- echo "quickly re-joining the group after leaving it.)\n";
- while (true) {
- $message = $consumer->consume(120*1000);;
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- echo sprintf("partition: %d offset: %d\n", $message->partition, $message->offset);
- $consumer->commit($message);
- sleep(1);
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more\n";
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement