kafa 消息发送和消费


消息生产者

生产者有两种,1,kafka-producer,2.producer-record,其中kafkaproducer 是用来发送消息的类,producerrecord 是用来封装kafka的消息。 kafkaproducer 的创建需要指定的参数和含义:

  1. bootstrap.servers 用来配置生产者如何与broker建立连接,该参数设置的是初始化参数,如果生产者需要连接的是kafka集群,则这里配置集群中几个broker的地址,而不是全部,当生产者连接上此处指定的broker 之后,在通过该连接发现集群中的其他节点
  2. key.serializer 要发送消息的key数据的序列化类,设置的时候可以写类名,也可以使用该类的class对象
  3. value.serializer 要发送消息的value数据的序列化,设置的时候可以写类名,也可以使用该类的class 对象。
  4. acks 默认值:all, acks=0: 生产者不等待broker 对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。该情况不能保证broker是否真的收到了消息,retries配置也不会生效,发送的消息返回的消息偏移量永远是-1. acks=1 表示消息只需要写道主分区即可,然后就响应客户端,而不等待副本分区的确认,在该情形下,如果主分区收到消息确认之后就宕机,而副本区还没得及同步该消息,则消息丢失 acks=all 首领分区会等待所有的ISR 副本分区确认记录,该处理保证了只要有一个ISR副本分区存活,消息就不会丢失,这是kafka最强的可靠性保证等效于acks = -1
  5. retries 重试次数,当消息发出失败需要重试,从缓冲区重试,系统会重新发送消息,跟客户端收到错误时重发一样,如果设置了重试,还想保证消息的有序性,需要设置,Max_in_flight_requests_per_connection =1 否则在重试此失败消息的时候,其他消息就可能发送成功了。

其他参数也可以从org.apache.kafka.clients.producer.producerConfig 中找到,消费者生产消息后,需要broker端确认,可以同步确认,也可以异步确认,同步确认效率低,异步确认效率高,但是要设置回调对象。