生产者
消息发送
数据生产流程解析
- Producer 创建时,会创建一个Sender 线程并设置为守护线程。
- 生产消息时,内部是异步流程,生产的消息先经过拦截器-》序列化器-》分区器,然后将消息存在缓冲区,(该缓冲区也是在 Producer创建时创建)
- 批次发送的条件为:缓冲区数据大小达到batch.size 或者linger.ms 达到上限,那个先达到就算那个。
- 批次发送后,发往指定分区,然后罗盘到broker; 如果生产者配置了retrires 参数 大于0 并且失败原因允许重试,那么客户端内部会对该消息进行重试。
- 落盘到broker 成功,返回生产元数据给生产者。
- 元数据返回有两种方式,一种是通过阻塞直接返回,另一种是通过回调返回。
必要的参数设置
- bootstrap.servers : 生产者初始链接地址,防止节点宕机不可用,形式为host1:port1,host2:port2 不需要写全部kafka集群中broker 的地址。
- key.serializer : 实现接口org.apache.kafka.common.serialization.Serializer 的key 序列化类。
- value.serializer : 实现接口org.apache.kafka.common.serialization.Serializer 的value 序列化类。
- acks: 该选项控制着已发送信息的持久性 acks = 0:表示不等待 broker的任何消息确认,只需要将消息放到socket的缓冲区就认为消息已发送,不保证服务器是否收到消息,retries 设置也没作用,因为客户端不关系消息是否发送失败,客户端收到消息偏移量永远是-1 acks = 1:leader 将记录写到它本地日志,就响应客户端确认消息,不等待follower副本的确认,如果Leader确认了消息就宕机,那么可能会丢失消息,因为follower 副本可能还没来得及同步该消息。 acks = all: leader 等待所有同步副本确认该消息,保证了只要有一个同步副本存在,消息就不会丢失,这是最强的可用性保证,等价于acks=-1,默认为1 字符串,可选值[all,-1,0,1]
- compression.type: 生产者生成数据的压缩格式,默认是none(没有压缩),允许的值:gzip,none,snappy 和lz4,压缩是对整个消息批次来讲的,消息批的效率也影响压缩的比例,消息批越大,压缩效率越好,字符串类型的值,默认是none
- retries: 设置该属性为一个大于1 的值,将在消息发送失败的时候重新发送消息,允许重试但是不设置max.in.flight.requests.per.connenction 为1,存在消息乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了重试,第二个成功了,则第一个消息批在第二个消息批后,int 类型的值,默认0,可选值[0...2147483647]
序列化器
由于kafka的数据都是字节数组,在将消息发送到kafka 之前需要先将数据序列化为字节数组
其中close() 方法必须时幂等,可能需要多次调用
序列化方法:1. ByteArraySerializer 2.ByteBufferSerializer (nio)3.BytesSerializer 4.DoubleSerializer (移位操作)5. FloatSerializer(移位操作) 6、IntegerSerializer(移位操作)7、LongSerializer 8.ShortSerializer
自定义序列化器, avro,实现序列化接口
拦截器
作用一般是做日志,消息处理,主要是用于实现client端的定制化控制逻辑 对于producer 来说,拦截器interceptor 让用户在消息发送前以及producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息。另外,producer 允许用户指定多个拦截器按序作用于同一条消息从而形成一个拦截链条 实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括: onSend(ProducerRecord):运行在用户主线程中,Producer 确保在消息被序列化以计算分区前调用该方法,用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标的计算。 onAcknowledgement(RecordMetadata,Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常是在producer 回调逻辑触发之前,另外onAcknowledgement 运行在Producer的IO线程中,所以不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。 close:关闭Interceptor ,主要用于执行一些资源清理工作。 拦截器interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全,如果指定了多个interceptor ,则producer将按照指定顺序调用他们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。
自定义拦截器
实现ProducerInterceptor接口
在KafkaProducer的设置中设置自定义的拦截器
分区器
消息放到哪个分区做判断,指定分区号,或者轮询方式。 如果record 提供了分区号,则使用record 提供的分区号 如果record 没有提供分区号,则使用key的序列化的值的hash值对分区数量取模 如果record 没有提供分区号,也没有提供key,则使用轮询的方式分配分区号 首先在可用的分区中分配分区号 如果没有可用的分区,则在该主题所有分区中分配分区号。 如果要自定义分区器,则需要
- 首先开发Partitioner 接口的实现类
- 在KafkaProducer 中进行设置:configs.put("partitioner.class","xxx.xx.Xxx.class") 位于org.apache.kafka.clients.producer 中的分区器接口
生产者参数配置补充
通过kafka map 集合配置参数
- retry.backoff.ms 向一个指定的主题分区重发消息的时候,重试之间的等待时间,比如3次重试,每次重试之后等待该时间长度,再接着重试,在一些失败的场景,避免了密集循环的重新发送请求,long 型值,默认100。
- retries : retries 重试次数,当消息发送出现错误的时候,系统会重发消息,跟客户端收到错误时重发一样,如果设置了重试,还想保证消息的有序性,需要设置max_in_flight_requests_per_connection=1,否则在重试此失败消息的时候,其他消息可能发送成功了。
- request.timeout.ms :客户端等待请求响应的最大时长,如果服务端响应超时,则会重发请求,除非达到重试次数,该设置应该比replica.lag.time.ms(a broker configuration)要大,以免在服务器延迟时间内重发消息,int 类型值,默认30000
- interceptor.classes: 在生产者接收到该消息,向kafka集群传输之前,由序列化器处理之前,可以通过拦截器对消息进行处理,要求拦截器类必须实现org.apache.kafka.clients.producer.ProducerInterceptor接口,默认没有拦截器,"Map/
configs" 中通过List集合配置多个拦截器类名。 - batch.size : 当多个消息发送到同一个分区的时候,生产者尝试将多个记录作为一个批来处理,批处理提高了客户端和服务器的处理效率。该配置项以字节为单位控制默认批的大小,所有的批小于等于该值,发送给broker 请求将包含多个批次,每个分区一个,并包含可发送的数据,如果该值设置的比较小,会限制吞吐量,设置为0 会完全禁用批处理,如果设置很大,浪费内存,
- child.id : 生产者发送请求的时候传递给broker的id字符串,用于在broker的请求日志中追踪什么应用发送了什么消息,一般该ID是跟业务有关的字符串。
- send.buffer.bytes: TCP 发送数据的时候使用缓冲区SO_SNDBUF大小,如果设置为0,则使用操作系统默认的。
- buffer.memory: 生产者可以用来缓存等待发送到服务器的记录的总内存字节,如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞max.block.ms的时间,此后将引发异常,此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲,一些额外的内存将用于压缩以及维护运行中的请求long型数据默认33554432
- connections.max.idle.ms 当连接空闲时间达到这个值,就关闭连接,long型数据,默认540000
- linger.ms 生产者在发送请求传输间隔会对需要发送消息进行累加,然后最为一个批次发送,一般情况是消息发送的速度比消息累计速度慢,有时客户端需要减少请求次数,即使是在发送负载不大的情况下,该配置设置了一个延迟,生产者不会立即将消息发送到broker,而是等一段时间累计消息,然后将这段时间内的消息作为一个批次发送,该设置是批处理的另一个上线,一旦批消息打到了batch.size 指定的值,消息批会立即发送,如果达不到,可以设置该毫秒值,等待这么长时间后也会发送消息批,默认为0,如果设置linger.ms=5,则在一个请求发送之前先等待5ms
- max.block.ms 控制kafkaproducer.send() 和kafkaProducer.partitionsFor()阻塞的时长,当缓存满了或者元数据不可用的时候,这些方法阻塞,在用户提供的序列化器和分区器的阻塞时间不计入。默认60000
- max.request.size 单个请求的最大字节数,该设置会限制单个请求中消息批的消息个数,以免单个请求发送太多数据,服务器有自己的限制批大小的设置,与该配置可能不一样 ,int类型值,默认1048576
- partitioner.class 实现了接口org.apache.kafka.clients.producer.partitioner 的分区器实现类,默认值org.apache.kafka.clients.producer.internals.DefaultPartitioner
- receive.buffer.bytes: TCP 接收缓存(so_rcvbuf) 如果设置-1 ,使用默认在的值32768
- security.protocol : PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL
- max.in.flight.requests.per.conection: 单个连接未确认请求的最大数量,达到这个数量,客户端阻塞,该值》1 失败请求后可能出现乱序,默认5
- reconnect.backoff.max.ms, 对于每个连续的链接失败,每个主机的退避将成倍增加,直至达到最大值,在计算退避增量后,添加20%的随机抖动亿避免链接风暴,
- reconnect.backoff.ms 尝试链接指定主机的基础等待时间,避免了到该主机的密集重连,该退避时间应用于该客户端到broker的所有链接。
消费者
消费者,消费组
1. 消费者从订阅的主题消费消息,消费消息的偏移量保存在kafka的名字是 "__consumer_offsets" 的主题中。
2. 消费者还可以将自己的偏移量存储到zookeeper ,需要设置offset.storage = zookeeper.
3. 推荐使用kafka存储消费者的偏移量,因为zookeeper不适合高并发
多个从同一个主题消费的消费者可以加入到一个消费组中,消费组的消费者共享group_id,group_id 一般设置为应用的逻辑名称,比如多个订单处理程序组成一个消费者,可以设置group_id为”order_process",group_id通过消费者的配置指定group.id="xxxx" 消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。
一个拥有4个分区的主题,包含一个消费者的消费组,此时,消费组中的消费者消费主题中的所有分区,并且没有重复的可能,如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。
如果消费组有四个消费者,则每个消费者可以分配到一个分区
如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会限制,不会接受任何消息。
向消费组添加消费者是横向扩展消费能力的主要方式。
必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者,但是不要让消费者的数量超过主分区的数量
每个分区可以被多个消费组消费
心跳机制
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。 由于broker宕机,主题x的分区3分区3 对应消费者 4,分区3宕机,此时分区3 没有Leader 副本,触发再平衡,消费者4没有对应的主题分区则消费者4闲置。 kafka 心跳机制 利用长链接,kafka Consumer 和Broker之间的健康检查,只有当broker Coordinator 正常时,consumer才会发送心跳
- session.timeout.ms MemberMetadata.sessionTimeoutMs
- max.poll.interval.ms MemberMetadata.reblanceTimeoutMs broker 端,sessionTimeoutMs 参数,broker 处理心跳的逻辑在GroupGoordinator 类中,如果心跳超期,broker coordinator 会把消费者从group 移除,并触发rebalance
主题和分区
- Topic Kafka 用于分类管理消息的逻辑单元,类似与MySql的数据库。
- Partition,是kafka下数据存储的基本单元,同一个topic的数据,会被分散存储到多个partition中,这些partition 可以在同一个机器上,也可以在多个机器中,
- 好处在于水平拓展,避免单个机器在磁盘空间和性能上限,同时可以通过复制来增加数据冗余性,提高容灾能力,当然为了做到均匀分布,通常partition 数量是broker server的整数倍。
- Consumer Group ,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段,保证一个消费组获取到特定主题的全部信息,在消费者内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只能被消费组中的一个消费者消费。
consumer 采用 pull 模式从broker 中读取数据,采用pull 模式,consumer 可自助控制消费消息的速率,可以自己控制消费方式(批量/逐条)还可以选择不同的提交方式从而实现不同的传输语义。 大部分用在大数据中,海量数据中,主题落到磁盘保存7天再删除 consumer.subscribe("tp_demo_01,tp_demo_02"),变更消费者数量会再平衡,推送保证低延迟比如小兔子做到us 级别适合业务
反序列化
kafka 的 broker中 所有消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交给用户程序消费处理。
消费者的反序列化器包括key和value的反序列化器。
key.deserializer
value.deserializer
StringDeserializer
IntegerDeserializer
需要实现org.apache.kafka.common.serialization.Deserializer
消费者从订阅的主题拉取消息 consumer.poll(3_000),在Fetcher 类中,对拉取到的消息首先进行反序列化处理
自定义反序列化器
自定义反序列化类,需要自己实现
位移提交
- Consumer需要向kafka 记录自己的位移数据,这个汇报过程称为位移提交(commiting offset)
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer 端负责的,kafka 只负责保管。
- 位移提交分为自动提交和手动提交
- 位移提交分为同步提交和异步提交
自动提交
kafka Consumer 后台提交: 开启自动提交: enable.auto.commit=true 配置自动提交间隔: Consumer 端: auto.commit.interval.ms 默认5s 自动提交位移的顺序 配置enable.auto.commit = true kafka 会保证在开始调用poll方法时,提交上次poll返回的所有消息 因此自动提交不会出现消息丢失,但会重复消费 重复消息举例 Consumer 每 5s 提交offset 假设提交offset 后的3s发生了rebalance rebalance 之后的所有consumer 从上一次提交的offset处继续消费 因此rebalance 发生前3s的消息会被重复消费
- 使用kafkaConsumer#commitSync():会提交kafkaConsumer#poll()返回的最新offset
- 改方法为同步操作,等待直到offset 被成功提交才返回。
- commitSync 在处理完所有消息之后
- 手动同步提交可以控制offset提交的时机和频率
- 手动提交会 调用commitSync时,Consumer 处于阻塞状态,直到broker 返回结果 会影响TPS 可以选择拉长提交间隔,但有以下问题: 1. 会导致Consumer 的提交频率下降 2. Consumer 重启后,会有更多的消息被消费。
异步提交
- kafkaConsumer#commitAsync()
- commitAsync出现问题不会自动重试 拉取,处理,异步提交,catch 处理异常 finally 最后一次提交使用同步阻塞式提交
消费者位移管理
kafka中,消费者根据消息的位移顺序消费消息。
消费者的位移由消费者管理,可以存储在zookeeper 中,也可以存储于kafka主题,__consumer_offsets 中。
kafka提供了消费者API,让消费者可以管理自己的位移
- API: public void assgin(Collection
partitions) 给当前消费者手动分配一系列主题分区,手动分区不能增量分配,如果先前有分配分区,那么就覆盖,如果给出的主题分区是空的,则等价于调用unsubscribe 方法,手动分配主题分区的方法不使用消费组管理功能,当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡,手动分区分配assign 不能和自动分区分配subscribe一起使用,如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区分配中的消费偏移量进行异步提交 - API: public Set
assignment() 获取给当前消费者分配的分区集合,如果订阅是通过调用assign 方法直接分配主题分区,则返回相同的集合,如果使用了主题订阅,该方法返回当前分配给该消费者的主题分区集合,如果分区订阅还没开始进行分区分配,或者正在重新分配分区,则会返回none - API: public Map
/>listTopics() 获取对用户授权的所有主题分区元数据,该方法会对服务器发起远程调用。 - API: public List
partitionsFor(String topic) 获取指定主题的分区元数据,如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调用。 - API: public void assgin(Collection
partitions) 手动分配主题分区给当前消费者 - API: public Map
beginningOffsets(Collection partitions) 对于给定的主题分区,列出它们第一个消息的偏移量,如果指定的分区不存在,该方法可能会永远阻塞,该方法不改变分区的当前消费者偏移量 - API: public void seekToEnd(Collection
partitions) 将偏移量移动到每个给定分区的最后一个,该方法延迟执行,只用当调用过poll方法或position 方法之后才可以使用,如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后,如果设置了隔离级别isolation.level=read_committed,则会讲分区的消费偏移量移动到最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。 - API: public void seek(TopicParition partition,long offset) 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏移量,若该方法多次调用,则最后一次覆盖前面的,如果在消费中间随意使用,可能会丢失数据。
- API: public long position(TopicPartition partition)检查指定主题分区的消费偏移量
- API: public void seekToBeginning(Collection
partitions) 移动到起始偏移量,懒执行该方法,只有当调用poll或者position 方法后才执行,如果没有分区,那么将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。
再平衡
重平衡是一个协议,规定了如何让消费组下的所有消费者来分配topic 中的每一个分区,比如一个topic 有100个分区,一个消费组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。 重平衡的触发条件有三个: 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡。
- 消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
- 由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置
- 主题增加分区,需要主题分区和消费组进行再平衡
- 使用正则表达式订阅主题,则增加的主题匹配正则表达式的时候,也要重平衡
因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS 影响极大,如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多,数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态应该尽量避免重平衡发生。
避免重平衡 几种重平衡的方式: 增加分区,或者增加订阅的主题,亦或是增加消费者,更多的是主动控制。 如果消费者真正挂掉了,就没办法了,但实际中,会有一些情况,kafka错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。 哪些情况会出现错误判断挂掉情况 在分布式系统重,通常是通过心跳来维持分布式系统的,kafka也不例外 在分布式系统重,由于网络问题不清楚没接收到心跳,不清楚是不是真正挂了,还是因为负载过重来不及发生心跳或事网络拥塞,所以一般会约定一个时间,超时即判定对方挂了,而在kafka消费者场景中session.timeout.ms参数就是规定这个超时时间是多少。 还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。 此外,还有最后一个参数,max.poll.interval.ms,消费者poll数据后,需要一些处理,再进行拉取,如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费组,也就是说,拉取,然后处理,这个处理时间不能超过max.poll.interval.ms这个参数的值,这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。 三个参数 session.timeout.ms 控制心跳超时时间 heartbeat.interval.ms 控制心跳发送频率 max.poll.interval.ms 控制poll的间隔。 相对较为合理的配置 session.timeout.ms :6s heartbeat.interval.ms : 2s max.poll.interval.ms: 推荐为消费者处理消息最长耗时再加1分钟
消费者拦截器
消费者在拉取了分区消息之后,要首先经过反序列化器对key 和value进行反序列化处理
处理完之后,如果消费端设置了拦截器,需要经过拦截器的处理之后,才能返回消费者应用程序进行处理
消费端定义消息拦截器,需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor