当前位置: 移动技术网 > IT编程>开发语言>Java > spring boot与kafka集成的简单实例

spring boot与kafka集成的简单实例

2019年07月19日  | 移动技术网IT编程  | 我要评论

本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下:

引入相关依赖

<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter</artifactid>
</dependency>

<dependency>
  <groupid>org.springframework.kafka</groupid>
  <artifactid>spring-kafka</artifactid>
  <version>1.1.1.release</version>
</dependency>

从依赖项的引入即可看出,当前spring boot(1.4.2)还不支持完全以配置项的配置来实现与kafka的无缝集成。也就意味着必须通过java config的方式进行手工配置。

定义kafka基础配置

与redistemplate及jdbctemplate等类似。spring同样提供了org.springframework.kafka.core.kafkatemplate作为kafka相关api操作的入口。

import java.util.hashmap;
import java.util.map;

import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;

@configuration
@enablekafka
public class kafkaproducerconfig {

  public map<string, object> producerconfigs() {
    map<string, object> props = new hashmap<>();
    props.put(producerconfig.bootstrap_servers_config, "192.168.179.200:9092");
    props.put(producerconfig.retries_config, 0);
    props.put(producerconfig.batch_size_config, 4096);
    props.put(producerconfig.linger_ms_config, 1);
    props.put(producerconfig.buffer_memory_config, 40960);
    props.put(producerconfig.key_serializer_class_config, stringserializer.class);
    props.put(producerconfig.value_serializer_class_config, stringserializer.class);
    return props;
  }

  public producerfactory<string, string> producerfactory() {
    return new defaultkafkaproducerfactory<>(producerconfigs());
  }

  @bean
  public kafkatemplate<string, string> kafkatemplate() {
    return new kafkatemplate<string, string>(producerfactory());
  }
}

kafkatemplate依赖于producerfactory,而创建producerfactory时则通过一个map指定kafka相关配置参数。通过kafkatemplate对象即可实现消息发送。

kafkatemplate.send("test-topic", "hello");
or
kafkatemplate.send("test-topic", "key-1", "hello");

监听消息配置

import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.listener.concurrentmessagelistenercontainer;

import java.util.hashmap;
import java.util.map;

@configuration
@enablekafka
public class kafkaconsumerconfig {

  @bean
  public kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() {
    concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();
    factory.setconsumerfactory(consumerfactory());
    factory.setconcurrency(3);
    factory.getcontainerproperties().setpolltimeout(3000);
    return factory;
  }

  public consumerfactory<string, string> consumerfactory() {
    return new defaultkafkaconsumerfactory<>(consumerconfigs());
  }


  public map<string, object> consumerconfigs() {
    map<string, object> propsmap = new hashmap<>();
    propsmap.put(consumerconfig.bootstrap_servers_config, "192.168.179.200:9092");
    propsmap.put(consumerconfig.enable_auto_commit_config, false);
    propsmap.put(consumerconfig.auto_commit_interval_ms_config, "100");
    propsmap.put(consumerconfig.session_timeout_ms_config, "15000");
    propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
    propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
    propsmap.put(consumerconfig.group_id_config, "test-group");
    propsmap.put(consumerconfig.auto_offset_reset_config, "latest");
    return propsmap;
  }

  @bean
  public listener listener() {
    return new listener();
  }
}

实现消息监听的最终目标是得到监听器对象。该监听器对象自行实现。

import org.apache.kafka.clients.consumer.consumerrecord;
  import org.springframework.kafka.annotation.kafkalistener;

  import java.util.optional;

  public class listener {

  @kafkalistener(topics = {"test-topic"})
  public void listen(consumerrecord<?, ?> record) {
    optional<?> kafkamessage = optional.ofnullable(record.value());
    if (kafkamessage.ispresent()) {
      object message = kafkamessage.get();
      system.out.println("listen1 " + message);
    }
  }
}

只需用@kafkalistener指定哪个方法处理消息即可。同时指定该方法用于监听kafka中哪些topic。

注意事项

定义监听消息配置时,group_id_config配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。

@kafkalistener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkatemplate在发送消息时指定。

key_deserializer_class_config与value_deserializer_class_config指定key和value的编码、解码策略。kafka用key值确定value存放在哪个分区中。

后记

时间是解决问题的有效手段之一。

在spring boot 1.5版本中即可实现spring boot与kafka auto-configuration

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网