当前位置: 移动技术网 > IT编程>开发语言>Java > Kafka Java Producer代码实例详解

Kafka Java Producer代码实例详解

2020年06月13日  | 移动技术网IT编程  | 我要评论

王杰恩,发廊女,o2o购物节

根据业务需要可以使用kafka提供的java producer api进行产生数据,并将产生的数据发送到kafka对应topic的对应分区中,入口类为:producer

kafka的producer api主要提供下列三个方法:

  •   public void send(keyedmessage<k,v> message) 发送单条数据到kafka集群
  •   public void send(list<keyedmessage<k,v>> messages) 发送多条数据(数据集)到kafka集群
  •   public void close() 关闭kafka连接资源

一、javakafkaproducerpartitioner:自定义的数据分区器,功能是:决定输入的key/value键值对的message发送到topic的那个分区中,返回分区id,范围:[0,分区数量); 这里的实现比较简单,根据key中的数字决定分区的值。具体代码如下:

import kafka.producer.partitioner;
import kafka.utils.verifiableproperties;

/**
 * created by gerry on 12/21.
 */
public class javakafkaproducerpartitioner implements partitioner {

  /**
   * 无参构造函数
   */
  public javakafkaproducerpartitioner() {
    this(new verifiableproperties());
  }

  /**
   * 构造函数,必须给定
   *
   * @param properties 上下文
   */
  public javakafkaproducerpartitioner(verifiableproperties properties) {
    // nothings
  }

  @override
  public int partition(object key, int numpartitions) {
    int num = integer.valueof(((string) key).replaceall("key_", "").trim());
    return num % numpartitions;
  }
}

二、 javakafkaproducer:通过kafka提供的api进行数据产生操作的测试类;具体代码如下:

import kafka.javaapi.producer.producer;
import kafka.producer.keyedmessage;
import kafka.producer.producerconfig;
import org.apache.log4j.logger;

import java.util.properties;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
import java.util.concurrent.atomic.atomicboolean;
import java.util.concurrent.threadlocalrandom;

/**
 * created by gerry on 12/21.
 */
public class javakafkaproducer {
  private logger logger = logger.getlogger(javakafkaproducer.class);
  public static final string topic_name = "test";
  public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".tochararray();
  public static final int chartslength = charts.length;


  public static void main(string[] args) {
    string brokerlist = "192.168.187.149:9092";
    brokerlist = "192.168.187.149:9092,192.168.187.149:9093,192.168.187.149:9094,192.168.187.149:9095";
    brokerlist = "192.168.187.146:9092";
    properties props = new properties();
    props.put("metadata.broker.list", brokerlist);
    /**
     * 0表示不等待结果返回<br/>
     * 1表示等待至少有一个服务器返回数据接收标识<br/>
     * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
     * */
    props.put("request.required.acks", "0");
    /**
     * 内部发送数据是异步还是同步
     * sync:同步, 默认
     * async:异步
     */
    props.put("producer.type", "async");
    /**
     * 设置序列化的类
     * 可选:kafka.serializer.stringencoder
     * 默认:kafka.serializer.defaultencoder
     */
    props.put("serializer.class", "kafka.serializer.stringencoder");
    /**
     * 设置分区类
     * 根据key进行数据分区
     * 默认是:kafka.producer.defaultpartitioner ==> 安装key的hash进行分区
     * 可选:kafka.serializer.bytearraypartitioner ==> 转换为字节数组后进行hash分区
     */
    props.put("partitioner.class", "javakafkaproducerpartitioner");

    // 重试次数
    props.put("message.send.max.retries", "3");

    // 异步提交的时候(async),并发提交的记录数
    props.put("batch.num.messages", "200");

    // 设置缓冲区大小,默认10kb
    props.put("send.buffer.bytes", "102400");

    // 2. 构建kafka producer configuration上下文
    producerconfig config = new producerconfig(props);

    // 3. 构建producer对象
    final producer<string, string> producer = new producer<string, string>(config);

    // 4. 发送数据到服务器,并发线程发送
    final atomicboolean flag = new atomicboolean(true);
    int numthreads = 50;
    executorservice pool = executors.newfixedthreadpool(numthreads);
    for (int i = 0; i < 5; i++) {
      pool.submit(new thread(new runnable() {
        @override
        public void run() {
          while (flag.get()) {
            // 发送数据
            keyedmessage message = generatekeyedmessage();
            producer.send(message);
            system.out.println("发送数据:" + message);

            // 休眠一下
            try {
              int least = 10;
              int bound = 100;
              thread.sleep(threadlocalrandom.current().nextint(least, bound));
            } catch (interruptedexception e) {
              e.printstacktrace();
            }
          }

          system.out.println(thread.currentthread().getname() + " shutdown....");
        }
      }, "thread-" + i));

    }

    // 5. 等待执行完成
    long sleepmillis = 600000;
    try {
      thread.sleep(sleepmillis);
    } catch (interruptedexception e) {
      e.printstacktrace();
    }
    flag.set(false);

    // 6. 关闭资源

    pool.shutdown();
    try {
      pool.awaittermination(6, timeunit.seconds);
    } catch (interruptedexception e) {
    } finally {
      producer.close(); // 最后之后调用
    }
  }

  /**
   * 产生一个消息
   *
   * @return
   */
  private static keyedmessage<string, string> generatekeyedmessage() {
    string key = "key_" + threadlocalrandom.current().nextint(10, 99);
    stringbuilder sb = new stringbuilder();
    int num = threadlocalrandom.current().nextint(1, 5);
    for (int i = 0; i < num; i++) {
      sb.append(generatestringmessage(threadlocalrandom.current().nextint(3, 20))).append(" ");
    }
    string message = sb.tostring().trim();
    return new keyedmessage(topic_name, key, message);
  }

  /**
   * 产生一个给定长度的字符串
   *
   * @param numitems
   * @return
   */
  private static string generatestringmessage(int numitems) {
    stringbuilder sb = new stringbuilder();
    for (int i = 0; i < numitems; i++) {
      sb.append(charts[threadlocalrandom.current().nextint(chartslength)]);
    }
    return sb.tostring();
  }
}

三、pom.xml依赖配置如下

<properties>
  <kafka.version>0.8.2.1</kafka.version>
</properties>

<dependencies>
  <dependency>
    <groupid>org.apache.kafka</groupid>
    <artifactid>kafka_2.10</artifactid>
    <version>${kafka.version}</version>
  </dependency>
</dependencies>

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网