当前位置: 移动技术网 > IT编程>数据库>其他数据库 > Kafka的接口回调 +自定义分区、拦截器

Kafka的接口回调 +自定义分区、拦截器

2019年03月01日  | 移动技术网IT编程  | 我要评论

一、接口回调+自定义分区

  1.接口回调:在使用消费者的send方法时添加callback回调

 

producer.send(new producerrecord<string, string>("xinnian", "20" + i + "年新年好!"), new callback() {
public void oncompletion(recordmetadata recordmetadata, exception e) {
if (recordmetadata!=null){
system.out.println(recordmetadata.topic()+"-----"+recordmetadata.offset()+"-----"+recordmetadata.partition());
}
}
 2.自定义分区:定义类实现patitioner接口,实现接口的方法:
   设置configure、分区逻辑partition(return 1;)、释放资源close、在生产者的配置过程中添加入分区属性。
 在定义生产者属性时添加分区的属性即可
/**
 * @author: princesshug
 * @date: 2019/2/28, 16:24
 * @blog: https://www.cnblogs.com/hellobigtable/
 */
public class partitiondemo implements partitioner {
    public int partition(string s, object o, byte[] bytes, object o1, byte[] bytes1, cluster cluster) {
        return 1;
    }

    public void close() {

    }

    public void configure(map<string, ?> map) {

    }
}

public class producerdemo {
    public static void main(string[] args) {
        properties prop = new properties();

        //参数配置
        //kafka节点的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //发送消息是否等待应答
        prop.put("acks", "all");
        //配置发送消息失败重试
        prop.put("retries", "0");
        //配置批量处理消息大小
        prop.put("batch.size", "10241");
        //配置批量处理数据延迟
        prop.put("linger.ms","5");
        //配置内存缓冲大小
        prop.put("buffer.memory", "12341235");
        //消息在发送前必须序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
        prop.put("partitioner.class", "partitiondemo");

        kafkaproducer<string, string> producer = new kafkaproducer<string, string>(prop);

        for (int i=10;i<100;i++){
            producer.send(new producerrecord<string, string>("xinnian", "20" + i + "年新年好!"), new callback() {
                public void oncompletion(recordmetadata recordmetadata, exception e) {
                    if (recordmetadata!=null){
                        system.out.println(recordmetadata.topic()+"-----"+recordmetadata.offset()+"-----"+recordmetadata.partition());
                    }
                }
            });
        }
        producer.close();
    }
}

  注意:在自定义分区后,你的消费者会收不到消息,因为消费者默认接收的分区为0。

 

二、拦截器

  1)创建生产者类;
     2)创建自定义拦截器类实现producerinterceptor接口,重写抽象方法;
     3)在业务逻辑方法producerrecord方法中,修改返回值,
        return new producerrecord<string,string>(
        record.topic(),
        record.partiiton(),
        record.key(),
        system.currenttimemillis() + "-" + record.value() + "-" + record.topic());
     4)在生产者类中将自定义拦截器生效
       prop.put(producerconfig.interceptor_classea_config,"com.wyh.com.wyh.kafka.interceptor.timeinterceptor");
     5)运行生产者main方法,或者在linux端用shell测试。

/**
 * @author: princesshug
 * @date: 2019/2/28, 20:59
 * @blog: https://www.cnblogs.com/hellobigtable/
 */
public class timeinterceptor implements producerinterceptor<string, string> {

    //业务逻辑
    public producerrecord<string, string> onsend(producerrecord<string, string> producerrecord) {
        return new producerrecord<string,string>(
                producerrecord.topic(),
                producerrecord.partition(),
                producerrecord.key(),
                system.currenttimemillis()+"--"+producerrecord.value()
        );
    }

    //发送失败调用
    public void onacknowledgement(recordmetadata recordmetadata, exception e) {

    }

    //释放资源
    public void close() {

    }

    //获取配置信息
    public void configure(map<string, ?> map) {

    }
}

public class itctorproducer {
    public static void main(string[] args) {
        //配置生产者属性
        properties prop = new properties();
        //kafka节点的地址
        prop.put("bootstrap.servers", "192.168.126.128:9092");
        //发送消息是否等待应答
        prop.put("acks", "all");
        //配置发送消息失败重试
        prop.put("retries", "0");
        //配置批量处理消息大小
        prop.put("batch.size", "1024");
        //配置批量处理数据延迟
        prop.put("linger.ms","5");
        //配置内存缓冲大小
        prop.put("buffer.memory", "12341235");
        //消息在发送前必须序列化
        prop.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
        prop.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

        //添加拦截器
        arraylist<string> inlist = new arraylist<string>();
        inlist.add("interceptor.timeinterceptor");
        prop.put(producerconfig.interceptor_classes_config,inlist);

        //实例化producer
        kafkaproducer<string, string> producer = new kafkaproducer<string, string>(prop);

        //发送消息
        for (int i=0;i<99;i++){
            producer.send(new producerrecord<string, string>("xinnian","you are genius!"+i));
        }

        //释放资源
        producer.close();
        
    }
}

 


  

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网