当前位置: 移动技术网 > IT编程>开发语言>Java > springboot中实现kafa指定offset消费

springboot中实现kafa指定offset消费

2019年12月14日  | 移动技术网IT编程  | 我要评论

kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费。

首先创建kafka消费服务

@service
@slf4j
//实现commandlinerunner接口,在springboot启动时自动运行其run方法。
public class tsplogbookanalysisservice implements commandlinerunner {
    @override
    public void run(string... args) {
        //do something
    }
}

kafka消费模型建立

kafka server中每个主题存在多个分区(partition),每个分区自己维护一个偏移量(offset),我们的目标是实现kafka consumer指定offset消费。

在这里使用consumer-->partition一对一的消费模型,每个consumer各自管理自己的partition。

kafka consumer partition

@service
@slf4j
public class tsplogbookanalysisservice implements commandlinerunner {
    //声明kafka分区数相等的消费线程数,一个分区对应一个消费线程
    private  static final int consumethreadnum = 9;
    //特殊指定每个分区开始消费的offset
    private list<long> partitionoffsets = lists.newarraylist(1111,1112,1113,1114,1115,1116,1117,1118,1119);
   
    private executorservice executorservice = executors.newfixedthreadpool(consumethreadnum);

    @override
    public void run(string... args) {
        //循环遍历创建消费线程
        intstream.range(0, consumethreadnum)
                .foreach(partitionindex -> executorservice.submit(() -> startconsume(partitionindex)));
    }
}

kafka consumer对offset的处理

声明kafka consumer的配置类

private properties buildkafkaconfig() {
    properties kafkaconfiguration = new properties();
    kafkaconfiguration.put(consumerconfig.bootstrap_servers_config, "");
    kafkaconfiguration.put(consumerconfig.group_id_config, "");
    kafkaconfiguration.put(consumerconfig.max_poll_records_config, "");
    kafkaconfiguration.put(consumerconfig.auto_commit_interval_ms_config, "");
    kafkaconfiguration.put(consumerconfig.key_deserializer_class_config, "");
    kafkaconfiguration.put(consumerconfig.value_deserializer_class_config, "");
    kafkaconfiguration.put(consumerconfig.auto_offset_reset_config,"");
    kafkaconfiguration.put(consumerconfig.enable_auto_commit_config, "");
    ...更多配置项

    return kafkaconfiguration;
}

创建kafka consumer,处理offset,开始消费数据任务

private void startconsume(int partitionindex) {
    //创建kafka consumer
    kafkaconsumer<string, byte[]> consumer = new kafkaconsumer<>(buildkafkaconfig());

    try {
        //指定该consumer对应的消费分区
        topicpartition partition = new topicpartition(kafkaproperties.getkafkatopic(), partitionindex);
        consumer.assign(lists.newarraylist(partition));

        //consumer的offset处理
        if (collectionutils.isnotempty(partitionoffsets)  &&  partitionoffsets.size() == consumethreadnum) {
            long seekoffset = partitionoffsets.get(partitionindex);
            log.info("partition:{} , offset seek from {}", partition, seekoffset);
            consumer.seek(partition, seekoffset);
        }
        
        //开始消费数据任务
        kafkarecordconsume(consumer, partition);
    } catch (exception e) {
        log.error("kafka consume error:{}", exceptionutils.getfullstacktrace(e));
    } finally {
        try {
            consumer.commitsync();
        } finally {
            consumer.close();
        }
    }
}

消费数据逻辑,offset操作

private void kafkarecordconsume(kafkaconsumer<string, byte[]> consumer, topicpartition partition) {
    while (true) {
        try {
            consumerrecords<string, byte[]> records = consumer.poll(tsplogbookconstants.poll_timeout);
            //具体的处理流程
            records.foreach((k) -> handlekafkainput(k.key(), k.value()));

            //

                    

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网