当前位置: 移动技术网 > IT编程>开发语言>.net > 笔记 -- kafka

笔记 -- kafka

2020年07月17日  | 移动技术网IT编程  | 我要评论

kafka

概念

分布式消息 发布/订阅 系统

高吞吐量、内置分区、冗余及容错性(每秒处理几十万消息)

在kafka集群中,没有中心节点的概念,集群中所有的服务器都是对等的,可以再不做任何配置更改的情况下实现服务器的添加和删除。

架构组成

Producer

将消息push到broker

	1.producer将消息封装到producerRecord类实例中。
	2.producerRecord类序列化后,再发送到内存缓冲区。
	3.由sender线程负责将缓冲区中的消息封装到一个批次中发送给broker。

Broker

Consumer

监听broker,主动从broker pull消息消费。

	Consumer.poll() 获取数据消费,但实际是通过发起fetch请求执行,并将从partition获取的数据放在本地缓存。
	Consumer.poll() 需要循环调用,如果长时间不出发fetch请求,心跳连接仍在,consumer会被认为处于livelock状态,从而被broker从consumer group中剔除。
	
	并不是每次poll都会发起fetch请求。
	原因: 在满足max.partition.fetch.bytes限制的情况下,假如一次fetch请求到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么Consumer需要执行7次才能将这100个record消费完毕。

Zookeeper

管理协调Producer、Broker、Consumer的请求

代码示例

KafkaProducerDemo.class

public class KafkaProducerDemo extends Thread{
    private final KafkaProducer<Integer,String> producer;
    private final String topic;
    private final boolean isAysnc;

    public KafkaProducerDemo(String topic,boolean isAysnc){
       Properties properties=new Properties();
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"49.235.16.28:9092");		//kafka集群地址
       properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaConsumerDemo2");	//这是客户端ID
       properties.put(ProducerConfig.ACKS_CONFIG,"0"); 	//0:无需broker确认。1:m节点broker确认即可。All(-1):需要所有节点确认
       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
        												 "org.apache.kafka.common.serialization.IntegerSerializer"); //设置key序列化
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       													"org.apache.kafka.common.serialization.StringSerializer");	//设置value序列化
        producer=new KafkaProducer<Integer, String>(properties);
        this.topic=topic;
        this.isAysnc=isAysnc;
    }

    @Override
    public void run() {
        int num=0;
        while(num<10){
            String message="message_"+num;
            System.out.println("begin send message:"+message);
            if(isAysnc){//异步发送
     		 producer.send(new ProducerRecord<Integer, String>(topic, message), new Callback() {
                 @Override
          		 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(recordMetadata!=null){
								System.out.println("async-offset:"+recordMetadata.offset()+"->partition"+recordMetadata.partition());
                        }
                    }
                });
            }else{//同步发送  future/callable
                try {
        			RecordMetadata recordMetadata = producer.send(new ProducerRecord<Integer, String>(topic,message)).get();
                   System.out.println("sync-offset:"+recordMetadata.offset()+ "->partition"+recordMetadata.partition());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }
            num++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new KafkaProducerDemo("test0815",true).start();
    }
}

KafkaConsumerDemo.class

public class KafkaConsumerDemo extends Thread {
   private final KafkaConsumer kafkaConsumer;
   public KafkaConsumerDemo(String topic) {
      Properties properties = new Properties();
      properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"49.235.16.28:9092");		//kafka集群IP
      properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaConsumerDemo2");	//分组ID
      properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");			//间隔时间
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.IntegerDeserializer"); //key反序列化
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 "org.apache.kafka.common.serialization.StringDeserializer"); //value反序列
      properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
						//earliest 从头开始消费
      kafkaConsumer = new KafkaConsumer(properties);
      kafkaConsumer.subscribe(Collections.singletonList(topic));	 //订阅
   }

   @Override
   public void run() {
      while (true) {
         ConsumerRecords<Integer, String> consumerRecord = kafkaConsumer.poll(1000);
         for (ConsumerRecord record : consumerRecord) {
            System.out.println("message receive:" + record.value());
            kafkaConsumer.commitAsync();
         }
      }
   }

   public static void main(String[] args) {
      new KafkaConsumerDemo("test0815").start();
   }
}

配置分析

发送端配置

acks

对数据要求很高,用all(-1),其他用1。

	0 :写入的消息不需要等待副本确认。
	1 :写入的消息被leader副本记录则认为提交成功。
	all(-1) :写入的消息需要被复制到ISR的所有副本,才认为提交成功。性能低,最安全,但如果副本只有一个,副本宕机时有可能数据丢失

batch.size

以大小为单位,Producer把发送到同一个分区的消息封装进一个batch,在不考虑linger.ms的情况下,batch满了再统一发送。

	batch越小,吞吐量越低。越大,吞吐量越高。默认16KB。

linger.ms

以时间为单位,Producer将时间间隔内的所有请求进行一次聚合,再统一发送。

	默认0,表示不做停留,这样会导致大量的小batch被发送出去(导致batch.size不生效),给网络IO带来极大压力。
	假如设置linger.ms=5,表示producer的请求延迟5ms发送。

batch.size和linger.ms的作用是一样的,是kafka性能调优的关键参数。如果两个都配置了,只要满足其中一个,Producer就会发送请求。

compression.type (Producer压缩器)

	目前支持none(不压缩)、gzip、snappy、lz4(Lz4的效果最好)。

max.request.size (请求数据的最大字节数)

	默认1MB, 防止大的数据包影响吞吐量

buffer.memory (producer 缓冲区大小)

指定producer缓存消息的缓冲区大小,默认33554432字节(32M)。
Producer启动时在内存中创建一块缓冲区,用于存放消息,然后由专属线程sender负责从缓冲区拿消息,进行真正的发送。

	消息持续发送过程中,当缓冲区填满后,producer立即进入阻塞状态,直到缓冲区的内存释放出来。阻塞时间一旦超过max.block.ms设置的值,producer就会抛异常:TimeoutException。这种情况下就需要调高buffer.memory的值,增大缓冲区。

retries(失败后重试次数)

	max.in.flight.requests.per.connection设置如果大于1,重试可能会造成消息乱序。
	0.11.1.0版本已经支持“精确到一次语义”,重试不会造成消息重复发送。

消费端配置

Consumer.poll(1000)

Consumer拿到足够多的数据就会返回一个ConsumerRecords,但如果阻塞了 1000ms,哪怕仍没有拿到足够多的数据,也依旧返回。

	1000:最长阻塞时间。

bootstrap.servers

	设置broker地址:host1:port1;host2:port2…

heartbeat.interval.ms (心跳间隔)

Consumer与coordinator之间的心跳,是为了确认consumer存活、加入或者退出groupd。

	这个值必须小于session.timeout.ms,如果心跳间隔时间超过session.timeout.ms,coordinator会认为该consumer退出,并由group内consumer重新rebalance。通常心跳间隔时间(3S)小于session.timeout.ms的1/3.

session.timeout.ms

Consumer session 过期时间。默认10S。

	这个值得设置必须在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。

max.partition.fetch.bytes

fetch操作时,指定每个分区返回最大字节数,默认1M。

	这个值必须比broker能够接受的最大消息的配置(max.message.size)大,否则会引起消费者无法消费数据(出口>入口)。	
	max.partition.fetch.bytes设置不能过大,会导致consumer消费数据时间过长,没有及时再次poll而会话过期。

groupid

一个组内可以有多个消费者,并共用一个group id

	如果topic某个消息被组内某个consumer消费了,那么组内其他consumer不可再消费。
	各组之间消费互不影响。

在这里插入图片描述

enable.auto.commit (自动提交)

默认值:true

	Consumer消费后自动提交offset+1。只有提交后,该消息才不会被再次接收。
	可以配合auto.commit.interval.ms 控制自动提交的频率(默认5S),当然也可以consumer.commitSync()的方式实现手动提交

自动提交与手动提交

自动提交其实在很多场景都不适用,因为自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。

auto.offfset.reset (消费)

	latest:	从topic最新的数据开始消费(默认)
	earliest:从topic最早的消息开始消
	none:	如果offset不存在,则抛异常

max.poll.records

每次调用poll()返回的消息数,减少poll间隔

Topic(主题)

存储消息的逻辑概念,可认为是消息集合。

	每个topic可划分多个partition(分区),分区越多,吞吐量越大。

在这里插入图片描述

Partition(分区)

partition是一块保存具体数据的空间,本质是磁盘上存放数据的文件夹,

	所以partition不能跨Broker,也不能在同一个Broker上跨磁盘。

partition中的每个消息会被分配一个offset(偏移量),它是消息在此partition的唯一编号。

	offset只保证同一partition内消息是有序的。

Kafka支持动态添加partition,但不支持删减partition,

	因为如果将删减的partition上的数据转移到其他partition上,会破坏其他partition上消息的有序性。

在这里插入图片描述

	消息由key+value组成,key、value皆可为空。
	根据partition规则,broker将收到的消息存储到其中一个partition,类似于将数据做分片处理。

partition分布

单节点

如果topic(firstTopic)有3个partition,那么配置dir路径下(默认:/tmp/kafka-log )有3个目录,firstTopic-0、firstTopic-1、firstTopic-2。

集群

集群中有n个broker,一个topic中的多个partition如何分布在这些broker上?
将partition排序,第i个partition放到(i mod n)个broker上

在这里插入图片描述

消息如何写入partition?

消息由key、value组成,key、value皆可为空,那么消息存储在哪个partition中?

方式一:producer自定义分区 (Partitioner 接口)


properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPartition");	//partition类名全路径


public class MyPartition implements Partitioner {
  private Random random = new Random();
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic); 	// 获得分区列表
    int partitionNum = 0;
    if (key == null) {
      partitionNum = random.nextInt(partitionInfos.size());	 // key为空,随机分区
    } else {
      partitionNum = Math.abs((key.hashCode()) % partitionInfos.size());	//hash取模
    }
    System.out.println("key:" + key + ",value:" + value + "," + partitionNum);
    return partitionNum; 		// 指定发送的分区值
  }

  @Override
  public void close() {}

  @Override
  public void configure(Map<String, ?> configs) {}
}

方式二:默认分区算法 (Hash取模算法)

	Key不为空,默认采用hash取模算法。
	key为空,则在”metadata.max.age.ms”时间范围内,随机选一个partition,在默认情况下(10分钟内),数据只会发送到当前partition上。
	因为broker - partition的对应关系可能会发生变化,所以10分钟刷新一次。(metadata.class存储了topic/partition和broker的映射关系)

从Partition消费消息

Consumer指定Partition

TopicPartition topicPartition=new TopicPartition(topic,0);	  //指定0分区
kafkaConsumer.assign(Arrays.asList(topicPartition));  // 可接收多个指定

消息分配策略

range(默认)

在同一topic中,按partition和consumer的数量分配。

	缺陷:
		订阅多个topic时,分配不均。
roundRobin(轮询)

整合所有topic的partition,按字典排序,最后将partition轮询给各个消费者。

	缺陷:
		组内consumer订阅不同分区时,分配不均。
		组内一consumer宕机,会导致所有分区重新轮询分配,严重浪费资源。
stickyAssignor(粘性)
	优势:
		相比roundRobin,stickyAssignor更加平均。
		组内一consumer宕机,将其分区分配给其他consumer,其他consumer原有的分区保持不动,

消息分配策略的触发条件

  1. group新增/剔除consumer
  2. Topic新增Patition

Coordinator

一个broker节点,负责管理consumer group。

Coordinator如何定义?

组内第一个consumer启动后,consumer向kafka集群任意一borker发送一个GroupCoordinatorRequest请求,服务端会返回负载最小的broker节点id,并将此节点定义为coordinator。之后该组内的所有consumer都会和该coordinator协调通信。

如何确定consumer group的coidinator是哪个borker?

consumer group的位移信息写入哪个consumer_offsets_*,那么其分区leader所在的borker就是coordinator。

Rebalance(均衡)

将partition均分给每个consumer的过程就叫Rebalance。
本质上是一组协议,分配策略为的就是rebalance。

Rebalance的过程

Join

consumer向coordinator发送joinGroup请求,coordinator会从consumer中选择一个担任leader角色。并把组成员信息、订阅信息发送给leader。

Sync

leader consumer负责分配消费方案,即哪个consumer消费哪些partition,一旦分配完成,leader会将方案封装进syncGroup请求发送给coordinator,非leader也会发syncGroup请求,只不过内容为null。Coordinator会把收到的分配方案response给各个consumer,这样各个consumer就知道自己该消费哪些partition。

	partition分配方案是放在客户端进行的,这样有更好的灵活性。

在这里插入图片描述

offset 偏移量

offset存储在哪里?consumer_offsets

kafka默认提供了50个consumer_offsets_*的topic,用于存放consumer group 某一时刻提交的offset信息。
在这里插入图片描述

不同groupid用哪个consumer_offsets_ 呢?

计算公式: (“groupid”.hashCode())%50 ;

如果计算结果5,那么当前group的offset信息保存在consumer_offsets_5里面。

LogSegment (分段)

	log.segment.bytes=1073741824    //设置分段大小,默认1G

kafka以Segment为单位,将partition进一步细分。从而避免的单个文件数据量过大而导致的操作难问题。

Segment的命名从0000开始,后续文件的命名以上一个Segment文件中最后一条消息的offset值命名。

Segment是一个逻辑概念,对应着partition目录(如log/test-0)下的.index和.log文件。如果partition被分为多个Segment,那么此目录下也会有多个.index和.log文件。

Kafka0.10.1.0之后,对于每个Segment文件新增了.timeindex文件,基于时间戳操作消息。

.timeIndex

文件映射时间戳和对应offset:
在这里插入图片描述

.index

文件记录了offset和对应的物理位置:
在这里插入图片描述

.index 与 .log 映射关系

在这里插入图片描述

Log文件内容分析

在这里插入图片描述
keysize :key大小。
compresscodec :压缩编码
payload :消息具体内容

日志清除/压缩

日志的分段存储,方便了kafka进行日志清理。Kafka启动一个后台线程,定期检查是够存在可以删除的消息。

日志清理策略

1.根据消息保留时间

	配置	log.retention.hours=168 (默认7天)

2.根据topic存储大小

	配置	log.retention.bytes=1073741824(默认1G,不开启)

日志压缩策略

实际场景中,key对应的value值不断变化,并且消费者只关心最新的value,所以kafka会在后台启动线程,定期将相同key合并,只保留最新value。
在这里插入图片描述

零拷贝

消费者从kafka服务器获取消息时,服务器先从磁盘读取数据到内存,再将内存的数据通过socket发送给消费者。看似简单的操作实际上有很多步骤。

一次交互的步骤

在这里插入图片描述

▪ 操作系统将数据从磁盘读入到内核空间的页缓存。
▪ 应用程序将数据从内核空间读入到用户空间缓存中。
▪ 应用程序将数据写回到内核空间到 socket 缓存中。
▪ 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出。

整个过程4次上下文切换及4次数据复制,其中CPU复制了两次。

零拷贝的优势

将磁盘数据复制到页面缓存中,最后将页面缓存的数据发送到网络中。

发送给不同订阅者时,可以使用同一个页面的缓存,避免了大量复制操作。
如果有10个消费者,传统方式下,数据复制次数4*10=40次。但零拷贝下,数据复制次数1+10=11次。
1:从磁盘复制到页面缓存
10:10个消费者个读了一次页面缓存

副本机制(Replication)

对于单个partition而言,在集群中是单点的。一旦该partition不可用,那么partition中的消息就消费不了了,所以kafka通过副本机制备份。

副本角色

Leader副本

每个Partition有且只有一个副本可以作为leader。
负责所有Producer、Consumer的请求。
Producer提交消息后,复制消息到所有的同步副本。

Follower副本

每个Partition中,除了Leader以外的所有Replica均为Follower副本。
不处理任何来自客户端的请求,只通过Fetch Request拉取Leader replica的数据进行同步。

ISR副本

包含leader副本和所有与leader副本保持同步的follower副本。

OSR副本

由于同步落后而被剔除的副本列表。

AR副本

所有副本集合:ISR + OSR

如何判断follower副本是不是同步副本?

过去10S从M副本获取过消息,并在过去6S与ZK发送过心跳。

副本因子(replication-factor)

决定了副本的个数。
如果副本因子是3,那么包含Leader副本在内,所有副本个数是3。

副本分配策略

多个副本如何分配到不同的broker上?

Partition排序的时候,第i个partition分配到(i mod n)broker上,那么第i个partition的第j个副本分配到(i+j mod n)borker。

如何知道leader副本在哪个borker上?

在zk上查,get /brokers/topics/secondTopic/partitions/1/state,查询结果
在这里插入图片描述

	leader_epoch:0 表示partition 1 的leader副本在broker0 上。
	Isr:当前可用且消息量与leader差不多的partition的副本集,也就是说如果某副本最后一条消息的offset与leader副本最后一条消息的offset之差超过阀值(replica.lag.time.max.ms),那么该副本会被踢出isr。

在这里插入图片描述

	绿色是leader副本

副本数据同步

首先,写请求先写入leader副本,再同步到follower副本,那么follower副本的数据略少于leader副本是可以容忍的,只要不超过阀值。

	当然如果follower副本长时间没有同步数据,会被leader副本踢出,因为当Acks设置为all(-1)时,如果某个follower故障导致HW无法递增,那么消息就无法提交,也就不会有后续的数据写进来。

副本同步机制

Consumer消费数据时,只能消费到HW的位置。HW之后的数据对consumer来说是不可见的。

Acks = 1时,消息被leader副本记录后则提交成功,然后leader副本再将消息同步给follower副本(类似异步复制)。所以leader宕机后,HW~LEO之间的数据可能会丢。

Acks设置为all(-1)时,消息被ISR副本记录后则提交成功(类似同步复制)。所以all的数据安全性是最高的。

副本属性

LEO

日志末端位移(log end offset)
记录了该副本底层日志(.log)中下一条消息的offset。
如果LEO=10,那么该副本保存了10条数据,offset为[0,9]。

HW

水位值
新消息被ISR副本同步后,HW才会移到这条消息的位置。
HW的值小于等于LEO。

副本(replica)都宕机了怎么办?

1.等待ISR中任一副本活过来,选它作为leader
2.选择第一个活过来的副本作为leader(不一定是ISR中的)

数据传输的事务定义

at most once

最多一次,这个和JMS中”非持久化”消息类似.发送一次,无论成败,将不会重发。
消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后”未处理”的消息将不能被fetch到,这就是“atmost once”。

at least once

消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功。
消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是“atleast once”,原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态。

exactly once

消息只会发送一次。
kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的。

	通常情况下“at-least-once”是我们首选。(相比 at most once而言,重复接收数据总比丢失数据要好)。

Kafka通信协议

Producer、Broker和Consumer之间采用的是一套自行设计基于TCP层的协议,根据业务需求定制。

基本数据类型

定长数据类型

int8,int16,int32和int64,对应到Java中就是byte, short, int和long。

变长数据类型

bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(表示内容的长度)和N个字节的内容。其中,N为-1表示内容为null。bytes的长度由int32表示,string的长度由int16表示。

数组

数组由两部分组成,分别是一个由int32类型的数字表示的数组长度N和N个元素。

kafka如何保证数据不丢失?

如果master副本挂了,其他follower副本都是非同步副本,那么在开启unclean.leader.election=true的情况下,非同步副本被选举为master副本,那必然会丢数据。

Producer端的处理

设置参数ACKS

request,timeout.ms:如果网络异常收不到响应,则等待,这里有个配置等待时间 request.timeout.ms:发送消息等待时间。

metadata.fetch.time.out 从kafka 获取元数据的等待时间。

max.block.ms : 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长时间。由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止。用户提供的序列化序或分区程序中的阻塞将不计入此超时。

retries :重试次数 ,

retry.backoff.ms: 重试直接的等待时间, 默认是100 ms

batch.size: 多个消息发送给同一个分区的时候,生产者会把消息打成一个批,批大小设置过大占内存,过小发送频繁,并且生产者不是必须满批发送,有个等待时间。

linger.ms设置 等待多久批不满则发送。

Consumer端的处理:

auto.offset.reset:没有偏移量可以提交的时候,系统从哪里开始消费。 有两种设置 :earliest 和latest 。

group-id:一个topic被 同一个消费组的不同消费者消费 ,相当于是队列模式。被不同消费组消费相当于是 订阅模式。 一个partition在同一个时刻只有一个consumer instance在消费。

enable.auto.commit:自动提交 ,如果开启了自动提交,那么系统会自动进行提交offset。可能会引起,并未消费掉,就提交了offset.引起数据的丢失。

auto.commit.interval.ms:默认是5秒钟提交一次。

kafka高可用的原因

1.采用操作系统层面的页面缓存来缓存数据。

2.日志顺序写入,并采用零拷贝的方式提升IO性能。

3.Topic分成多个Partition,Partition分成多个logSegment。

4.发送端和消费端都可采用并行的方式生产消费消息。

本文地址:https://blog.csdn.net/weixin_41943050/article/details/107342211

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网