当前位置: 移动技术网 > IT编程>开发语言>Java > Java使用kafka发送和生产消息的示例

Java使用kafka发送和生产消息的示例

2019年07月19日  | 移动技术网IT编程  | 我要评论

耽美漫画在线,桐俊,强上黑老大全文阅读

1. maven依赖包

<dependency> 
 <groupid>org.apache.kafka</groupid> 
 <artifactid>kafka-clients</artifactid> 
 <version>0.9.0.1</version> 
</dependency> 

2. 生产者代码

package com.lnho.example.kafka;  
import org.apache.kafka.clients.producer.kafkaproducer; 
import org.apache.kafka.clients.producer.producer; 
import org.apache.kafka.clients.producer.producerrecord;   
import java.util.properties;   
public class kafkaproducerexample { 
 public static void main(string[] args) { 
  properties props = new properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("acks", "all"); 
  props.put("retries", 0); 
  props.put("batch.size", 16384); 
  props.put("linger.ms", 1); 
  props.put("buffer.memory", 33554432); 
  props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); 
  props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");   
  producer<string, string> producer = new kafkaproducer<>(props); 
  for(int i = 0; i < 100; i++) 
   producer.send(new producerrecord<>("topic1", integer.tostring(i), integer.tostring(i)));   
  producer.close(); 
 } 
} 

3. 消费者代码

package com.lnho.example.kafka;   
import org.apache.kafka.clients.consumer.consumerrecord; 
import org.apache.kafka.clients.consumer.consumerrecords; 
import org.apache.kafka.clients.consumer.kafkaconsumer; 
import java.util.arrays; 
import java.util.properties;   
public class kafkaconsumerexample { 
 public static void main(string[] args) { 
  properties props = new properties(); 
  props.put("bootstrap.servers", "master:9092"); 
  props.put("group.id", "test"); 
  props.put("enable.auto.commit", "true"); 
  props.put("auto.commit.interval.ms", "1000"); 
  props.put("session.timeout.ms", "30000"); 
  props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); 
  kafkaconsumer<string, string> consumer = new kafkaconsumer<>(props); 
  consumer.subscribe(arrays.aslist("topic1")); 
  while (true) { 
   consumerrecords<string, string> records = consumer.poll(100); 
   for (consumerrecord<string, string> record : records) 
    system.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); 
  } 
 } 
} 

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网