当前位置: 移动技术网 > IT编程>开发语言>C/C++ > c++使用librdkafka库实现kafka的消费实例

c++使用librdkafka库实现kafka的消费实例

2018年12月26日  | 移动技术网IT编程  | 我要评论

谢娜娜写年华,toto坐便器,奥巴马什么时候卸任

关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的消费

librdkafka在的基础上封装了一层c++的api,可以实现kafka的消费操作,基本操作步骤如下

1、创建kafka 配置

rdkafka::conf *conf = nullptr;
conf = rdkafka::conf::create(rdkafka::conf::conf_global);

2、设置kafka各项参数

/*设置broker list*/
conf->set("bootstrap.servers", brokers_, errstr); 

/*设置consumer group*/
conf->set("group.id", groupid_, errstr);

/*每次从单个分区中拉取消息的最大尺寸*/
conf->set("max.partition.fetch.bytes", strfetch_num, errstr);

3、创建kafka topic配置

rdkafka::conf *tconf = nullptr;
tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);

4、设置kafka topic参数

if(tconf->set("auto.offset.reset", "smallest", errstr)

5、创建kafka consumer实例

kafka_consumer_ = rdkafka::consumer::create(conf, errstr);

6、创建kafka topic

rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);

7、启动kafka consumer实例

rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);

8、消费kafka

kafka_consumer_->consume(topic_, partition_, timeout_ms);

9、阻塞等待消息

kafka_consumer_->poll(0);

10、停止消费

kafka_consumer_->stop(topic_, partition_);

11、销毁consumer实例

rdkafka::wait_destroyed(5000);


完整代码

my_consumer.h:

#include 
#include 
#include 
#include 
#include 
#include 
#include "../src-cpp/rdkafkacpp.h"

class kafka_consumer_client{
public:
	kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset);
	//kafka_consumer_client();
	virtual ~kafka_consumer_client();

	bool initclient();
	bool consume(int timeout_ms);
	void finalize();
private:
	void consumer(rdkafka::message *msg, void *opt);

	std::string brokers_;
	std::string topics_;
	std::string groupid_;

	int64_t last_offset_ = 0;
	rdkafka::consumer *kafka_consumer_ = nullptr;	
	rdkafka::topic    *topic_ 		   = nullptr;
	int64_t 		  offset_ 		   = rdkafka::topic::offset_beginning;
	int32_t		      partition_       = 0;
	
};

my_consumer.cpp
#include "my_consumer_cpp.h"


bool run_ = true;

static void sigterm (int sig) {
  run_ = false;
}

kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset)
:brokers_(brokers),
 topics_(topics),
 groupid_(groupid),
 offset_(offset){
 }

//kafka_consumer_client::kafka_consumer_client(){}

kafka_consumer_client::~kafka_consumer_client(){}

bool kafka_consumer_client::initclient(){
	rdkafka::conf *conf = nullptr;
	conf = rdkafka::conf::create(rdkafka::conf::conf_global);
	if(!conf){
		fprintf(stderr, "rdkafka create global conf failed\n");
		return false;
	}

	std::string errstr;
	/*设置broker list*/
	if (conf->set("bootstrap.servers", brokers_, errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set brokerlist failed : %s\n", errstr.c_str());
	}

	/*设置consumer group*/
	if (conf->set("group.id", groupid_, errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set group.id failed : %s\n", errstr.c_str());
	}

	std::string strfetch_num = "10240000";
	/*每次从单个分区中拉取消息的最大尺寸*/
	if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set max.partition failed : %s\n", errstr.c_str());
	}

	/*创建kafka consumer实例*/
	kafka_consumer_ = rdkafka::consumer::create(conf, errstr);
	if(!kafka_consumer_){
		fprintf(stderr, "failed to ceate consumer\n");
	}
	delete conf;

	rdkafka::conf *tconf = nullptr;
	/*创建kafka topic的配置*/
	tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);
	if(!tconf){
		fprintf(stderr, "rdkafka create topic conf failed\n");
		return false;
	}

	/*kafka + zookeeper,当消息被消费时,会想zk提交当前groupid的consumer消费的offset信息,
	当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
	consumerconfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
	有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupid下的消费者,
	在zk中没有offset值时(比如新的groupid,或者是zk数据被清空),consumer应该从哪个offset开始
	消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
	开始位置消费所有消息.*/
	if(tconf->set("auto.offset.reset", "smallest", errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set auto.offset.reset failed : %s\n", errstr.c_str());
	}

	topic_ = rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);
	if(!topic_){
		fprintf(stderr, "rdkafka create topic failed : %s\n", errstr.c_str());
	}
	delete tconf;

	rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);
	if (resp != rdkafka::err_no_error){
		fprintf(stderr, "failed to start consumer : %s\n", rdkafka::err2str(resp).c_str());
	}

	return true;
}

void kafka_consumer_client::consumer(rdkafka::message *message, void *opt){
	switch(message->err()){
		case rdkafka::err__timed_out:
			break;
		case rdkafka::err_no_error:
			printf("%.*s\n", 
				static_cast(message->len()),
				static_cast(message->payload()));
			last_offset_ = message->offset();
			break;
		case rdkafka::err__partition_eof:
			std::cerr << "%% reached the end of the queue, offset: " << last_offset_ << std::endl;
			break;
		case rdkafka::err__unknown_topic:
		case rdkafka::err__unknown_partition:
			std::cerr << "consume failed: " << message->errstr() << std::endl;
			run_ = false;
			break;
		default:
			std::cerr << "consume failed: " << message->errstr() << std::endl;
			run_ = false;
			break;
	}
}

bool kafka_consumer_client::consume(int timeout_ms){
	rdkafka::message *msg = nullptr;

	while(run_){
		msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
		consumer(msg, nullptr);
		kafka_consumer_->poll(0);
		delete msg;
	}

	kafka_consumer_->stop(topic_, partition_);
	if(topic_){
		delete topic_;
		topic_ = nullptr;
	}
	if(kafka_consumer_){
		delete kafka_consumer_;
		kafka_consumer_ = nullptr;
	}

	/*销毁kafka实例*/
	rdkafka::wait_destroyed(5000);
	return true;
}

int main(int argc, char **argv){
	int opt;
	//std::vector topics;
	std::string topics;
	std::string brokers = "localhost:9092";
	std::string group = "1";

	while ((opt = getopt(argc, argv, "g:b:t:qd:ex:as:do")) != -1){
	    switch (opt) {
		    case 'b':
		      brokers = optarg;
		      break;
		    case 'g':
		      group = optarg;
		      break;
		    case 't':
		      topics = optarg;
		      break;
		    default:
		      break;
	  	}
  	}

  	/*for (; optind < argc ; optind++)
    topics.push_back(std::string(argv[optind]));*/

	signal(sigint, sigterm);
  	signal(sigterm, sigterm);

  	std::shared_ptr kafka_consumer_client_ = std::make_shared(brokers, topics, group, 0);
  	//std::shared_ptr kafka_consumer_client_ = std::make_shared();
  	if (!kafka_consumer_client_->initclient()){
  		fprintf(stderr, "kafka server initialize error\n");
  	}else{
  		printf("start kafka consumer\n");
  		kafka_consumer_client_->consume(1000);
  	}
  	
  	fprintf(stderr, "kafka consume exit! \n");

  	return 0;
}

编译:
g++ my_consumer.cpp -o my_consumer_cpp -std=c++11  -lrdkafka++ -lz -lpthread -lrt

在运行my_producer或my_consumer时可能会报错"error while loading shared librariesxxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录


在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test”的topic

启动consumer:

\

开启kafka 自带的producer,并发送消息“hello world”

\

consumer处收到的消息:

\

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网