Kafka
# 简介
Kafka 是一个分布式流媒体平台。
官网:https://kafka.apache.org/ (opens new window)
中文官网:http://kafka.apachecn.org/ (opens new window)
(1)流媒体平台有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 以容错的持久方式存储记录流。
- 记录发生时处理流。
(2)Kafka通常用于两大类应用:
- 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
- 构建转换或响应数据流的实时流应用程序

(3)kafka名词解释
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
# 技术对比
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
特性场景:
追求高可用性:电商系统中支付服务等业务场景。
追求单机吞吐量:日志采集或大数据等场景。
消息延时:机房温控等工业业务场景。
消息可靠性:银行支付系统等业务场景。
总结:
- 中小型公司首选RabbitMQ:管理界面简单,高并发
- 大型公司可以选择RocketMQ:更高并发,可对rocketmq进行定制化开发
- 日志采集功能:首选kafka,专为大数据准备
# kafka基本架构与概念

在kafka概述里介绍了概念包括:topic、producer、consumer、broker,这些是最基本的一些概念,想要更深入理解kafka还要知道它的一些其他概念定义:
Message 消息:Kafka 中的数据单元被称为消息message,也被称为记录,可以把它看作数据库表中某一行的记录。topic 主题:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。Partition 分区(高并发):主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性。topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个文件进行存储。一个partition中的数据是有序的,多个partition之间的数据是没有顺序的。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。Broker:一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。Broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。Replica 副本(分片)(高可用):Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica);所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表(保持同步的副本列表)中删除,重新创建一个Follower。Zookeeper:kafka对与zookeeper是强依赖的,是以zookeeper作为基础的,即使不做集群,也需要zk的支持。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行重平衡,但是Kafka2.8版本之后也可以不用配置ZK了,Kafka也在慢慢的向脱ZK化进行。Consumer Group消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。Consumer Offset 偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。Rebalance 重平衡:消费者同组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
# Kafka 生产者
# 生产者消息发送原理
完整流程
动画演示
- 生产者以
ProducerRecord为载体发送消息 - 以分区为单位,生产者会将同一分区的
ProducerRecord打包成RecordBatch,存放在Request Queue中。 - 当
RecordBatch数据达到批次大小batch.size或者等待时间linger.ms就会提交批次数据到Kafka Broker 中。 - 然后Kafka Broker的leader节点同步数据到集群中的follower节点后,返回ack确认给生产者。
同步发送:指的是 16k 或 linger.ms 一批次一批次的读取外部数据发送至kafka集群
异步发送:指的是不断的读取外部数据进入RecordAccumulator,内部再按批次进行发送
# 分区器
分区的好处:
- 便于合理使用存储资源,每个
partition在一个broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多态broker上。合理控制分区的任务(配置权重),可以实现负载均衡的效果。 - 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
分区分配策略:
- 指明
partition的情况下,直接指明的值作为partition值; - 没有指定
partition值但有key的情况下,将key的hash值与主题的分区数取模得到partition值; - 既没有
partition值又没有key的情况下,kafka采用 sticky partition(粘性分区器),会随机选择一个分区,并尽可能一直使用该分区,该分区的batch已满或者已完成,kafka再随机一个与上一次不同的分区进行使用;
# 生产者提高吞吐量
聚焦参数配置,通过修改批次配置从而达到提升吞吐量的效果:
batch.size:批次大小,默认16K,建议修改为32Klinger.ms:等待时间,默认0ms,建议修改为5-100ms
这两个参数不建议修改过大,过大会造成消息延迟,影响实时性。
compression.type:压缩 snappyRecordAccumulator:缓冲区大小,默认32M,当主题分区非常多(1w),可以修改成64M
# 生产者提高可靠性
- acks确认机制:指的是producer的消息发送确认机制
acks= 0:可靠性差,效率高
生产者在成功写入消息之前不会等待任何来自服务器的响应,也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。acks= 1:默认值,可靠性中等,效率中等(场景:传输普通日志,允许丢个别数据)
只要集群 leader 节点收到消息,生产者就会收到一个来自服务器的成功响应,如果消息无法到达 leader ,生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。acks= all(-1):可靠性高,效率低(场景:传输和钱相关的数据,对可靠性要求高的场景)
只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过他的延迟比acks=1时更高。
※ 注意:acks = all 仍然有数据丢失风险,因为底层是维护了一个ISR(可通信的leader + follower集合),例如(leader:0,isr:0,1,2),如果follower长时间没有向leader发送通信请求,则视为掉线,会踢出ISR,该时间是由 replica.lag.time.max.ms 参数来决定,默认为30s,同步也是也是向ISR集合中的节点同步,所以当ISR里最小应答副本数量(min.insync.replicas默认为 1)设置为1时,与acks = 1的效果是一样的,仍有丢数据的风险(leader:0,isr:0)
※ 数据完全可靠的配置:acks = all + follower >= 2 + min.insync.replicas >= 2
即:ack级别设置为all + 分区副本数大于等于 2 + ISR最小应答副本数大于等于 2
- retries重试机制:
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
# 生产者保证幂等性

生产者重复消息问题:生产者正常发送消息到分区,kafka保存消息进入磁盘,响应ack给生产者是否保存成功,但如果ack响应出现延迟或异常,生产者会重试,再次发送消息,这时就会出现重复消息。
幂等性就是指不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once):幂等性 + 至少一次(ack级别设置为all + 分区副本数>= 2 + ISR最小应答副本数>= 2)
重复数据的判断标准:具有相同<PID,Partition,SeqNumber>或小于分区最大SeqNumber的消息,Broker 只会持久化一条。其中:
PID:Kafka 每次重启都会分配一个新的,相当于生产者ID;Partition:表示分区号;Sequence Number:序列化号,单调自增的数字;
所以幂等性只能保证的是在单分区单会话内不重复。
开启幂等性:enable.idempotence = true:默认为true,false关闭。
※ 如果想要保证生产者重启后PID不变,就需要开启事务,使用事务ID来记录PID,以达到重启后PID不变的效果。
事务基本的实现思路就是通过配置的事务ID,将生产者ID进行绑定,然后存储在Kafka专门管理事务的内部主题__transaction_state中,而内部主题的操作是由事务协调器(TransactionCoodinator)对象完成的,这个协调器对象有点类似于咱们数据发送时的那个副本Leader。
其实这种设计是很巧妙的,因为kafka将事务ID和生产者ID看成了消息数据,然后将数据发送到一个内部主题中。这样,使用事务处理的流程和咱们自己发送数据的流程是很像。
补充说明:
一般很少会在保证生产消息幂等性为目的开启事务,事务本身会影响消息延迟以及性能,更多幂等性的保证是在业务上做处理,例如订单ID之类的。
# 生产者保证有序性
介于kafka的分区特性,消息是自然无序的,kafka只能保证单分区特定条件下有序。
- kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection = 1(不需要开启幂等性) - kafka在1.x版本之后保证数据单分区有序,条件如下:
- 未开启幂等性
max.in.flight.requests.per.connection = 1 - 开启幂等性
max.in.flight.requests.per.connection <= 5
原因:因为kafka在1.x版本以后,启用幂等性会缓存producer发来的最近5个request的元数据,所以无论如何,都可以保证最近5个request的数据都是有序的。
- 未开启幂等性

# Kafka 消费者
# 消费者组
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

# 消费者组初始化
# coordinator 的选举
coordinator 的作用:辅助实现消费者组的初始化和分区的分配。
coordinator 节点选择 = groupid 的 hashcode 值 % 50( __consumer_offsets的分区数量)
例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的 coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
# coordinator 工作原理

# 消费者组工作流程

# 相关参数
| 参数 | 描述 |
|---|---|
| bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
| key.deserializer 和 value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
| group.id | 标记消费者所属的消费者组。 |
| enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
| auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
| auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理? earliest:自动重置偏 移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
| offsets.topic.num.partitions | __consumer_offsets 的分区数,默认是 50 个分区。 |
| heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3。 |
| session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡。 |
# 消费者组分区分配策略
一个 consumer group 中有多个 consumer 组成,一个 topic 有多个 partition 组成,kafka 会自动分配组内 consumer 的负责分区。
Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用多个分区分配策略。
# Range 主题分区策略
Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
假如现在有 7 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。
例如,7/3 = 2 余 1 ,除不尽,那么 消费者 C0 便会多消费 1 个分区。 8/3=2余2,除不尽,那么C0和C1分别多消费一个。
通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多消费 1 个分区。
※ 注意:如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有 N 多个topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。容易产生数据倾斜!
# RoundRobin 轮询分区策略
RoundRobin 针对集群中所有Topic而言。RoundRobin 轮询分区策略,是把所有的 partition 和所有的
consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
# Sticky 粘性分区策略
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的随机放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
# offset 保存位置
Kafka0.9版本之前,consumer默认将 offset 保存在Zookeeper中。
0.9版本后,consumer默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
消费 offset:
因为本质__consumer_offsets是一个主题,所以就可以消费,但是默认kafka是不允许消费的,需要在配置文件 config/consumer.properties 中添加配置 exclude.internal.topics=false开放,默认是 true,意为不能消费系统主题。
# 自动提交 offset
自动提交offset的相关参数:
enable.auto.commit:是否开启自动提交offset功能,默认是trueauto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

# 手动提交 offset
手动提交offset的方法有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。
两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交) :发送完提交offset请求后,就开始消费下一批数据了。

# 消费者提高吞吐量
措施有两个:
- 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数 = 分区数。(两者缺一不可);
- 如果是下游的数据处理不及时:提高每批次拉取的数量。 批次拉取数据过少(拉取数据/处理时间 < 生产速度), 使处理的数据小于生产的数据,也会造成数据积压。
相关参数
| 参数名称 | 描述 |
|---|---|
| fetch.max.bytes | 默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes(topic config)影响。 |
| max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条 |
# Kafka Broker
# Broker 工作流程
# ZK存储信息
/kafka/brokers/ids:记录有哪些kafka服务器,示例 [0, 1, 2]/kafka/brokers/topics/{主题}/partition/{分区}/state:每一个主题每一个分区下,谁是Leader,有哪些服务可用,即记录ISR信息,示例{"leader":1, "isr":[1, 0, 2]}/kafka/controller:辅助选举Leader,示例{"brokerid": 0}
# 工作原理

- broker 启动后在 ZK 中注册
- 争抢 Controller 注册
- 第一个注册的 Controller 监听
/brokers/idsbrokers 节点变化 - Controller 进行 Leader 选举
- Controller 将节点信息上传至 ZK
- 其他 Controller 从 ZK 中同步相关信息
- 假设 Broker 中 Leader 挂了
- Controller 监听到变化
- Controller 获取 ISR
- 重新选举新的 Leader
- 更新 Leader 和 ISR
# 重要参数配置
| 参数名称 | 描述 |
|---|---|
| replica.lag.time.max.ms | ISR 中,如果 Follower 长时间未向 Leader 发送通 信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值,默认 30s。 |
| auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。 |
| leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡。 |
| leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时 间。 |
| log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。 |
| log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志 (.log),然后就往 index 文件里面记录一个索引。 |
| log.retention.hours | Kafka 中数据保存的时间,默认 7 天。 |
| log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭。 |
| log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭。 |
| log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟。 |
| log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。 |
| og.cleanup.policy | 默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策 略。 |
| num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占 总核数的 50%。 |
| num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
| num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。 |
| log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最 大值,9223372036854775807。一般不建议修改, 交给系统自己管理。 |
| log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建 议修改,交给系统自己管理。 |
# Broker 节点的服役与退役
# 服役新节点
生产中我们可能会遇到一个情况,对broker集群进行扩容,这时就可以通过服役新节点来添加。
执行负载均衡操作
假设当前存在[0, 1, 2]三个节点,要加入新节点3;
- 创建一个复杂均衡的主题
vim topics-to-move.json
{
"topics": [
{"topic": "first"}
],
"version": 1
}
2
3
4
5
6
7
- 生成一个负载均衡计划
bin/kafka-reassign-partitions.sh \
--bootstrap-server hadoop102:9092 \
--topics-to-move-json-file topics-to-move.json \
--broker-list "0,1,2,3" \
--generate
2
3
4
5
这里
--broker-list "0,1,2,3"就是指定负载均衡的broker集合,例如3是我们要新加的节点
然后我们就能得到一个系统生成的执行计划
# 当前负载均衡配置
Current partition replica assignment
{
"version":1,
"partitions":[
{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},
{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},
{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}
]
}
# 服役新节点后的负载均衡配置
Proposed partition reassignment configuration
{
"version":1,
"partitions":[
{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},
{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},
{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- **创建副本存储系统生成的计划 (所有副本存储在 broker0、broker1、broker2、broker3 中) **
vim increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},
{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},
{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}
]
}
2
3
4
5
6
7
8
9
- **执行副本存储计划 **
bin/kafka-reassign-partitions.sh \
--bootstrap-server hadoop102:9092 \
--reassignment-json-file increase-replication-factor.json \
--execute
2
3
4
- 验证副本存储计划
bin/kafka-reassign-partitions.sh \
--bootstrap-server hadoop102:9092 \
--reassignment-json-file increase-replication-factor.json \
--verify
2
3
4
输出
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first
2
3
4
5
6
7
# 退役旧节点
退役旧节点就是服役新节点的反向操作,修改第二步时--broker-list参数的配置,例如当前节点有[0,1,2,3]四个节点,将 id 为 3 节点删掉,则是修改为--broker-list "0,1,2"即可,然后与服役新节点一样执行后续步骤。
# Kafka 副本
- Kafka 副本作用:提高数据可靠性。
- Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会 增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
- AR = ISR + OSR
- ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送 通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由
replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。 - OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
# Leader 选举
选举规则:从 AR 中排序靠前且在 ISR 中存活的节点;
# Follower 故障处理细节

- LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
- HW(High Watermark):所有副本中最小的LEO 。
流程
- Follower 发生故障后会被临时踢出 ISR
- 这个期间Leader和其他 Follower 继续接收数据
- 待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始 向Leader进行同步。
- 等该Follower的LEO大于等于该Partition的HW,即 Follower追上Leader之后,就可以重新加入ISR了。
# Leader 故障处理细节

- LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset + 1。
- HW(High Watermark):所有副本中最小的LEO 。
流程:
- Leader发生故障之后,会从ISR中选出一个新的Leader
- 为保证多个副本之间的数据一致性,其余的Follower会先 将各自的log文件高于HW的部分截掉,然后从新的Leader同步 数据。
※ 注意: 这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。
# 手动调整分区
例如我们有四台机器[0,1,2,3],默认创建分区时,kafka会负载均衡,尽量的让副本平均的分散到各台机器上,但如果生产上可能有几台机器的配置比较差,不希望让他们进行存储,则可以通过执行存储计划手动调整副本的分配。
- 创建副本存储计划
vim increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}
]
}
2
3
4
5
6
7
8
9
- 执行存储计划
bin/kafka-reassign-partitions.sh \
--bootstrap-server hadoop102:9092 \
--reassignment-json-file increase-replication-factor.json \
--execute
2
3
4
# 手动调整副本数
主题分区的副本数无法通过命令行的方式修改,只能通过执行存储计划:
- 创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)
vim increase-replication-factor.json
{
"version":1,
"partitions":[
{"topic":"four","partition":0,"replicas":[0,1,2]},
{"topic":"four","partition":1,"replicas":[0,1,2]},
{"topic":"four","partition":2,"replicas":[0,1,2]}
]
}
2
3
4
5
6
7
8
- 执行计划
bin/kafka-reassign-partitions.sh \
--bootstrap-server hadoop102:9092 \
--reassignment-json-file increase-replication-factor.json \
--execute
2
3
4
# Leader Partition 重平衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的 broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。 例如下面这个情况:
三个broker四个分区,分区 leader都集中在0和1节点上
三个相关参数:
auto.leader.rebalance.enable:默认是true。 自动 Leader Partition 重平衡leader.imbalance.per.broker.percentage: 默认是10%。broker 与 broker 之间允许的不平衡的 leader 的比率。如果每个 broker 都超过了这个值,控制器会触发leader的平衡。leader.imbalance.check.interval.seconds:默认值300秒。检查leader负载是否平衡的间隔时间。
怎么判定为不平衡呢?以下面这个场景为例:
针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是Leader节点, 所以不平衡数加1,AR副本总数是4 所以broker0节点不平衡率为1/4>10%,需要再平衡。
broker2和broker3节点和broker0不平衡率一样,需要再平衡。
broker1的不平衡数为0,不需要再平衡。
因为并不是所有节点都达到了不平衡率,所以不会触发controller的rebalance。
※ 注意:并不建议开启这个重平衡,因为rebalance会造成集群阻塞,因为需要重新选举分配leader节点,如果一定要开启的话,建议将leader.imbalance.per.broker.percentage的值设置高些,减少rebalance的频率。
# Kafka 文件存储机制

相关参数
| 参数名 | 参数作用 | 类型 | 默认值 | 推荐值 |
|---|---|---|---|---|
| min.insync.replicas | 最小同步副本数量 | 推荐 | 1 | 2 |
| log.segment.bytes | 文件段字节数据大小限制 | 可选 | 1G = 102410241024 byte | |
| log.roll.hours | 文件段强制滚动时间阈值 | 可选 | 7天 = 24 * 7 * 60 * 60 * 1000L ms | |
| log.flush.interval.messages | 满足刷写日志文件的数据条数 | 可选 | Long.MaxValue | 不推荐 |
| log.flush.interval.ms | 满足刷写日志文件的时间周期 | 可选 | Long.MaxValue | 不推荐 |
| log.index.interval.bytes | 刷写索引文件的字节数 | 可选 | 4 * 1024(4kb) | |
| replica.lag.time.max.ms | 副本延迟同步时间 | 可选 | 30s |
- 存储结构
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数 据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制, 将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该 文件夹的命名规则为:topic名称+分区序号,例如:first-0。 
例如以下主题
存储结构:
- {log.dirs}/topic1-{分区}/
- 00000000000000000000.index
- 00000000000000000000.log
- 00000000000000000000.timeindex
- ...
说明:index和log文件以当前 segment的第一条消息的offset命名,一个segment可以容纳1G的数据。
- log文件和index文件的关系
log文件:log文件用于存储大量的数据,即producer生产的数据。这些数据会被不断追加到log文件的末端,且每条数据都有一个唯一的offset(偏移量)。消费者组中的每个消费者会实时记录自己消费到的offset,以便在出错恢复时从上次的位置继续消费。
index文件:index文件存储大量的索引信息。索引文件中的元数据指向对应数据文件中message的物理偏移地址,这有助于快速定位到特定消息的位置,提高数据检索的效率。
index为稀疏索引,大约每往log文件中写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes 默认4kb。
Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大, 因此能将offset的值控制在固定大小 。
- 日志文件内部数据结构
.index索引文件由offset、position组成:
输入指令
./kafka-run-class.sh kafka.tools.DumpLogSegments \
--files ../log/topic1-0/00000000000000000000.index
2
输出
.log文件由如下部分组成:
输入命令
./kafka-run-class.sh kafka.tools.DumpLogSegments --files ../log/topic1-0/00000000000000000000.log
输出
.log文件中的每行数据代表的都是一个RecordBatch,也就一批消息。
我们看单批消息
baseOffset: 0 # 当前批次第一条消息的偏移量
lastOffset: 0 # 当前批次最后一条消息的偏移量
count: 1 # 消息总数
baseSequence: 0 # 当前批次所在分区的起始序列号
lastSequence: 0 # 当前批次所在分区的最后一个序列号
producerId: 0 # 生产者的唯一标识符
producerEpoch: 0 # 生产者的当前 epoch(用于实现幂等性和事务性)
partitionLeaderEpoch: 0 # 分区的领导者 epoch(用于管理分区的领导者选举)
isTransactional: true # 标识消息是否属于事务性消息
isControl: false # 标识消息是否是控制消息
deleteHorizonMs: OptionalLong.empty # 消息的删除时间戳(如果已经被删除)
position: 0 # 当前批次在日志文件中的位置
CreateTime: 1711001542510 # 当前批次的创建时间戳
size: 79 # 消息的大小(以字节为单位)
magic: 2 # 消息的魔数,用于指示消息的格式版本
compresscodec: none # 消息的压缩编解码器
crc: 537727051 # 消息的循环冗余校验(CRC)校验值,用于验证消息的完整性
isvalid: true # 标识消息是否有效
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
※ 注意:通过kafka-run-class.sh kafka.tools.DumpLogSegments脚本输出的只是批数据的简要信息,方便查看,并不会将批次内的消息打印出来,Record内容需要调用kafka对应的api才能获取,但产生的消息的确是存放在.log文件中的。
- kafka 是如何利用索引文件快速定位的?
例如现在有如下文件
00000000000000000000.index
00000000000000000000.log
00000000000000160420.index
00000000000000160420.log
00000000000000207460.index
00000000000000207460.log
2
3
4
5
6
现在需要快速定位offset = 160427的数据

- 查找segment文件
00000000000000000000.index 为最开始的文件,
00000000000000160420.index 为第二个文件,它的起始位置偏移量 offset: 160420+1
00000000000000207460.index 为第三个文件,它的起始位置偏移量 offset: 207460+1
所以数据在00000000000000160420.index文件中。 - 根据索引文件中的偏移查找到数据物理偏移地址
在根据索引文件中的 [4, 476] 定位到 00000000000000160420.log 文件中 476 的位置(物理偏移量),接着向下顺序遍历,找到偏移量 offset = 160427 的数据。
# 文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
| 参数 | 描述 |
|---|---|
| log.retention.hours | 最低优先级小时,默认 7 天 |
| log.retention.minutes | 最高优先级分钟 |
| log.retention.ms | 最高优先级毫秒 |
| log.retention.check.interval.ms | 负责设置检查周期,默认 5 分钟 |
文件的清理策略有两种:delete和compact
配置项:log.cleanup.policy = delete
- delete :日志删除
- 基于时间:默认打开;以 segment 中所有记录中的最大时间戳作为该文件时间戳。
- 基于大小:默认关闭;超过设置的所有日志总大小,删除最早的 segment。
log.retention.bytes,默认等于-1,表示无穷大。
- compact:日志压缩;对于相同key的不同value值,只保留最后一个版本。

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大
的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息
集里就保存了所有用户最新的资料。
# 高效读写数据
主要涉及的技术有下面几个:
- kafka本身是分布式集群,可以采用分区技术,并行度高
- 读取数据采用稀疏索引,可以快速定位要消费的数据
- 顺序写磁盘
- 页缓存+零拷贝
# Kafka 事务
事务的功能:保证一组操作的原子性,分为两种,生产者事务和消费者事务。
- 生产者事务:生产者开启事务后向多个主题多个分区发送消息,这些消息要么全都成功要么全都失败,如果其中有个消息发送失败,则不会有一条消息到达kafka。
- 消费者事务:消费者开启事务后可以保证消息读取和消费的原子性,消费者可以将消费消息的偏移量Offset保存在事务中,并在处理完消息成功提交后,偏移量会被提交到消费者组中,以更新消费偏移量,如果出现失败,则不提交偏移量。
事务的原理:二阶段提交
- 第一个阶段提交事务协调器会告诉生产者事务已经提交了,所以也称之预提交操作,事务协调器会修改事务为预提交状态

- 第二个阶段提交事务协调器会向分区Leader节点中发送数据标记,通知Broker事务已经提交,然后事务协调器会修改事务为完成提交状态

特殊情况下,事务已经提交成功,但还是读取不到数据,那是因为当前提交成功只是一阶段提交成功,事务协调器会继续向各个Partition发送marker信息,此操作会无限重试,直至成功。
但是不同的Broker可能无法全部同时接收到marker信息,此时有的Broker上的数据还是无法访问,这也是正常的,因为kafka的事务不能保证强一致性,只能保证最终数据的一致性,无法保证中间的数据是一致的。不过对于常规的场景这里已经够用了,事务协调器会不遗余力的重试,直至成功。
# Kafka-Kraft 模式

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。
这样做的好处有以下几个:
- Kafka 不再依赖外部框架,而是能够独立运行;
- controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升;
- 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制;
- controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强
controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策。
相关配置:节点的server.properties文件
#kafka 的角色(controller 相当于主机、broker 节点相当于从机,主机类似 zk 功能)
process.roles=broker, controller
#节点 ID
node.id=2
#controller 服务协议别名
controller.listener.names=CONTROLLER
#全 Controller 列表
controller.quorum.voters=2@{IP}:9093,3@{IP}:9093,4@{IP}:9093
#不同服务器绑定的端口
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#broker 服务协议别名
inter.broker.listener.name=PLAINTEXT
#broker 对外暴露的地址
advertised.Listeners=PLAINTEXT://{IP}:9092
#协议别名到安全协议的映射
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLA
INTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#kafka 数据存储目录
log.dirs=/opt/module/kafka2/data
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 安装和启动
# Docker 安装
因为Kafka强制依赖了zookeeper,所以要先安装zookeeper,再安装Kafka
1)安装zookeeper
下载zookeeper镜像(无需下载)
docker pull zookeeper
启动zookeeper容器
docker run -d --name zookeeper -p 2181:2181 -t zookeeper
2)安装Kafka
下载Kafka镜像
docker pull wurstmeister/kafka
启动Kafka容器
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.66.133 --env KAFKA_ADVERTISED_PORT=9092 --volume /etc/localtime:/etc/localtime wurstmeister/kafka:latest
# 压缩包安装(推荐)
# 下载kafka安装包
官网:https://kafka.apache.org/downloads (opens new window)
# 解压kafka
下载后解压压缩包
tar -xvf kafka_2.13-3.2.0.tgz
# 查看zookeeper版本
进入libs目录查看zookeeper版本
# 下载zookeeper安装包
注意:需要提前安装好jdk8以上版本
zookeeper下载:https://zookeeper.apache.org/releases.html (opens new window)

# zookeeper安装
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C zookeeper-3.6.3
重命名文件夹
mv apache-zookeeper-3.6.3-bin/ zookeeper-3.6.3
# 配置zookeeper环境变量
echo 'export ZOOKEEPER_HOME=/usr/local/kafka/zookeeper-3.6.3' >> /etc/profile
echo 'export PATH=$PATH:$ZOOKEEPER_HOME/bin' >> /etc/profile
source /etc/profile
2
3
# 修改conf文件夹配置
cd zookeeper-3.6.3/conf
mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
2
3
server.1=0.0.0.0:2888:3888,1是我们data/myid,2888是服务端通信端口,3888服务端之间选举端口
dataDir=/usr/local/kafka/zookeeper-3.6.3/data
dataLogDir=/usr/local/kafka/zookeeper-3.6.3/log
server.1=0.0.0.0:2888:3888
server.2=tencent02:2888:3888
server.3=tencent03:2888:3888
2
3
4
5
6
7
# 创建data 、log文件夹
cd /usr/local/kafka/zookeeper-3.6.3
mkdir data && mkdir log
cd data
2
3
创建唯一标识文件(如果是第二第三台服务这里数字改为 2 .. 3 以此类推)
echo 1 > myid
# 启动zookeeper
如果是要启动集群则重复5~8,注意区分文件夹
/usr/local/kafka/zookeeper-3.6.3/bin/zkServer.sh start

查看模式
standalone:单体
如果要停止zookeeper则调用
zkServer.sh stop
# 搭建kafka
前面我们已经解压过了,所以这里先修改一下解压的包名
mv kafka_2.13-3.2.0 kafka_3.2.0
# 进入kakfa,创建log文件夹
cd kafka_3.2.0
mkdir log
2
# 修改kafka配置
vi config/server.properties
broker.id 不同服务器 分别为1 、2 、3
broker.id=1
listeners=PLAINTEXT://:9092
# 这里内网则用内网ip,外网则用外网ip,尽量不要用localhost,不然外部api无法调用
advertised.listeners=PLAINTEXT://101.35.245.191:9092
log.dirs=/usr/local/kafka/kafka_3.2.0/log
# topic 在当前broker上的分片个数,与broker保持一致
num.partitions=3
# 设置zookeeper集群地址与端口如下(单体则只需要一个):
zookeeper.connect=tencent01:2181,tencent02:2181,tencent03:2181
2
3
4
5
6
7
8
9
10
11
12
13
# 启动kafka
/usr/local/kafka/kafka_3.2.0/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_3.2.0/config/server.properties
启动成功
停止kafka
/usr/local/kafka/kafka_3.2.0/bin/kafka-server-stop.sh
异常情况:
- 如果报出
Feature ZK node at path: /feature does not exist (kafka.server.FinalizedFeatureChangeListener)
原因:可能是之前修改配置的缓存导致
解决方案:删除server.properties文件中log.dirs配置的文件
# Kafka CMD
| 参数 | 描述 |
|---|---|
| --bootstrap-server<String:server toconnect to> | 连接的 Kafka Broker 主机名称和端口号 |
| --topic<String:topic> | 操作的 topic 名称 |
| --create | 创建主题 |
| --delete | 删除主题 |
| --alter | 修改主题 |
| --list | 查看所有主题 |
| --describe | 查看主题详细描述 |
| --partitions <Integer: # if oartitions> | 设置分区数 |
| --replication-factor<Integer:replication factor> | 设置分区副本 |
| --config<String:name=value> | 更新系统默认的配置 |
--bootstrap-server<String:server toconnect to>- <server toconnect to> 参数用来配置连接的broker集群,用逗号分隔
- 示例:
--bootstrap-server server01:9092, server02:9092
# Topic 相关
指令集:kafka-topics.sh
查看所有主题
./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
2
3
创建主题
./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic topic1 \
--partitions 1 \
--replication-factor 1
2
3
4
5
6
- 创建 名为topic1 分区为1 分区副本为1 的主题
查看主题详情
./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic topic1
--describe
2
3
4
修改主题
./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--alter \
--topic topic1 \
--partitions 2
2
3
4
5
※ 注意:修改分区只能增加不能减少,并且副本数无法通过命令行的形式修改
# Producer 相关
指令集:./kafka-console-producer.sh
向指定主题发送消息
./kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic topic1
2
3
退出指令输入:
Ctrl + Z
# Comsumer 相关
指令集:./kafka-console-producer.sh
订阅主题消息
./kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic topic1
2
3
--from-beginning:非增量读取,读取包括历史消息
# Kafka-clients
# 导入依赖
<properties>
<kafka.client.version>3.2.0</kafka.client.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
# 生产者
# 准备连接参数
private static KafkaProducer<String, String> getKafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.128:9092"); //kafka服务地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.RETRIES_CONFIG, 3);//设置生产者失败后重试次数
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());// 自定义分区器
//创建生产者对象
return new KafkaProducer<>(props);
}
2
3
4
5
6
7
8
9
10
11
# 选择发送消息方式
(1)发送消息的工作原理
- 从集群获取分区的leader
- producer把消息发送给指定的topic
- producer发送消息key给kafka
- Kafka到指定的topic内,使用key进行hash运算,得到哈希值,用哈希值%分区总数=分区下标
- Kafka根据分区下标找到该分区下的主分区
(2)三种消息发送类型
# 异步发送(用的最多)
把消息发送给服务器,并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,使用这种方式有时候会丢失一些信息
KafkaProducer<String, String> producer = getKafkaProducer();
//发送消息
try {
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
2
3
4
5
6
7
应用场景:如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式
# 同步发送(用的很少)
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
KafkaProducer<String, String> producer = getKafkaProducer();
//发送消息
try {
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(recordMetadata.offset());//获取偏移量
}catch (Exception e){
e.printStackTrace();
}
2
3
4
5
6
7
8
如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量
应用场景: 如果业务要求消息尽可能不丢失且必须是按顺序发送的,那么可以使用同步的方式 ( 只能在一个partation上 )
# 异步发送+回调
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码
KafkaProducer<String, String> producer = getKafkaProducer();
//发送消息
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
e.printStackTrace();
}
System.out.println(recordMetadata.offset());
}
});
}catch (Exception e){
e.printStackTrace();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。
应用场景: 如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息
# 消费者
# 准备连接参数
private static Properties getProperties() {
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.128:9092");
// key value的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
return properties;
}
2
3
4
5
6
7
8
9
10
11
12
# 消费主题
public static void main(String[] args) {
Properties properties = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 订阅主题
List<String> topics = Collections.singletonList("topic1");
consumer.subscribe(topics);
while (true) {
// 1s 拉一次消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//遍历拉取的消息
Object key = record.key();
Object value = record.value();
System.out.println(key+"---"+value);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 指定分区
public static void main(String[] args) {
Properties properties = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定消费的分区
List<TopicPartition> topics = Collections.singletonList(new TopicPartition("topic1", 0));
consumer.assign(topics);
while (true) {
// 1s 拉一次消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//遍历拉取的消息
Object key = record.key();
Object value = record.value();
System.out.println(key+"---"+value);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 手动提交 offset
异步提交:commitAsync()
public static void main(String[] args) {
Properties properties = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定消费的分区
List<TopicPartition> topics = Collections.singletonList(new TopicPartition("topic1", 0));
consumer.assign(topics);
while (true) {
// 1s 拉一次消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//遍历拉取的消息
Object key = record.key();
Object value = record.value();
System.out.println(key+"---"+value);
}
// 异步提交
consumer.commitAsync();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
同步提交:commitSync()
public static void main(String[] args) {
Properties properties = getProperties();
// 关闭offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定消费的分区
List<TopicPartition> topics = Collections.singletonList(new TopicPartition("topic1", 0));
consumer.assign(topics);
while (true) {
// 1s 拉一次消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//遍历拉取的消息
Object key = record.key();
Object value = record.value();
System.out.println(key+"---"+value);
}
// 同步提交
consumer.commitSync();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 指定 offset 消费
public static void main(String[] args) {
Properties properties = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定消费的分区
List<String> topics = Collections.singletonList("topic1");
consumer.subscribe(topics);
// 获取分区信息
Set<TopicPartition> assignment = consumer.assignment();
// 保证分区方案已经产生
while (assignment.isEmpty()) {
// 尝试拉取消息
consumer.poll(Duration.ofSeconds(1));
// 更新分区信息
assignment = consumer.assignment();
}
// 遍历分区信息指定消费的起始offset
for (TopicPartition topicPartition : assignment) {
// 从100开始消费
consumer.seek(topicPartition, 100);
}
while (true) {
// 1s 拉一次消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//遍历拉取的消息
Object key = record.key();
Object value = record.value();
System.out.println(key+"---"+value);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 指定时间消费
public static void main(String[] args) {
Properties properties = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定消费的分区
List<String> topics = Collections.singletonList("topic1");
consumer.subscribe(topics);
// 获取分区信息
Set<TopicPartition> assignment = consumer.assignment();
// 保证分区方案已经产生
while (assignment.isEmpty()) {
// 尝试拉取消息
consumer.poll(Duration.ofSeconds(1));
// 更新分区信息
assignment = consumer.assignment();
}
// 指定分区对应的时间
HashMap<TopicPartition, Long> partitionTimeMap = new HashMap<>();
// 设置分区offset时间为 一天前
for (TopicPartition topicPartition : assignment) {
partitionTimeMap.put(topicPartition, LocalDateTime.now().minusDays(1).atZone(ZoneId.systemDefault()).toEpochSecond());
}
// 获取分区的offset对应时间map
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = consumer.offsetsForTimes(partitionTimeMap);
// 遍历分区信息指定消费的起始offset
for (TopicPartition topicPartition : assignment) {
// 根据分区找到对应的offset
OffsetAndTimestamp offsetAndTimestamp = offsetAndTimestampMap.get(topicPartition);
// 从100开始消费
consumer.seek(topicPartition, offsetAndTimestamp.offset());
}
while (true) {
// 1s 拉一次消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
//遍历拉取的消息
Object key = record.key();
Object value = record.value();
System.out.println(key+"---"+value);
}
// 同步提交
consumer.commitSync();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 扩展
# 自定义分区器
通过自定义分区器实现一些特殊的需求处理,首先创建一个类实现Partitioner接口
public class MyPartitioner implements Partitioner {
/**
* 指定分区
* @param s 主题
* @param o key
* @param bytes key的字节数组
* @param o1 value
* @param bytes1 value的字节数组
* @param cluster 集群
* @return 分区
*/
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// 获取消息
String msg = o1.toString();
// 判断消息,并指定分区
if (msg.contains("haha")) {
return 0; // 0分区
} else {
return 1; // 1分区
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
然后在使用时指定分区器
private static Properties getProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.128:9092"); //kafka服务地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.RETRIES_CONFIG, 3);//设置生产者失败后重试次数
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());// 自定义分区器
return props;
}
2
3
4
5
6
7
8
9
10
# 吞吐量配置
private static Properties getProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.128:9092"); //kafka服务地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.RETRIES_CONFIG, 3);//设置生产者失败后重试次数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 设置批量发送的大小16k
props.put(ProducerConfig.LINGER_MS_CONFIG, 100); // 设置批量发送的间隔时间100ms
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 设置批量发送的内存大小32M
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 设置压缩方式
return props;
}
2
3
4
5
6
7
8
9
10
11
12
13
# 可靠性配置
private static Properties getProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.66.128:9092"); //kafka服务地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //key序列化器
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 设置ack级别
props.put(ProducerConfig.RETRIES_CONFIG, 3);// 设置生产者失败后重试次数
return props;
}
2
3
4
5
6
7
8
9
10
# 事务
生产者事务
public static void main(String[] args) {
// 准备连接参数
Properties kafkaProperties = getKafkaProperties();
// 设置事务id(必要)
kafkaProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_001");
// 获取生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(kafkaProperties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
//发送消息
try {
kafkaProducer.send(new ProducerRecord<>("topic1", "haha kafka!"), (recordMetadata, e) -> {
if (e == null) {
System.out.println("主题: " + recordMetadata.topic() + ",分区: " + recordMetadata.partition() + ",偏移量: " + recordMetadata.offset());
}
});
} catch (Exception e) {
// 回滚事务
kafkaProducer.abortTransaction();
throw new RuntimeException(e);
} finally {
//释放连接
kafkaProducer.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Kafka Stream
1)概述
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
- Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
- 除了Kafka外,无任何外部依赖
- 充分利用Kafka分区机制实现水平扩展和顺序性保证
- 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
- 支持正好一次处理语义
- 提供记录级的处理能力,从而实现毫秒级的低延迟
- 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
- 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
2)Kafka Streams的关键概念
(1)Stream处理拓扑
- 流是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
- 通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
- 流处理器是处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。
(2)在拓扑中有两个特别的处理器:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
- Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。

# 使用步骤
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>
2
3
4
5
6
配置yml
kafka:
hosts: 192.168.66.133:9092
group: leadnews-article-group
2
3
导入Kafka配置类
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Configuration
@EnableKafkaStreams //启用kafkastream
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
//最大消息的大小
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
//kafka所在服务器地址
private String hosts;
//kafka所在分组名称 给消费者使用 就是applicationName
private String group;
public String getHosts() {
return hosts;
}
public void setHosts(String hosts) {
this.hosts = hosts;
}
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
/**
* 重新定义默认的KafkaStreams配置属性,包括:
* 1、服务器地址
* 2、应用ID
* 3、流消息的副本数等配置
* @return
*/
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 消息副本数量
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
return new KafkaStreamsConfiguration(props);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* 流数据的监听消费者实现的接口类,系统自动会通过
* KafkaStreamListenerFactory类扫描项目中实现该接口的类
* 并注册为流数据的消费端
* <p>
* 其中泛型可是KStream或KTable
*
* @param <T>
*/
public interface KafkaStreamListener<T> {
// 流式处理的时候需要监听的主题是什么 INPUTTOPIC
String listenerTopic();
//流式处理完成之后继续发送到的主题是什么 OUTTOPIC
String sendTopic();
// 流式业务的对象处理逻辑
T getService(T stream);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.apache.kafka.streams.StreamsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程
*/
@Component
public class KafkaStreamListenerFactory implements InitializingBean {
Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);
@Autowired
DefaultListableBeanFactory defaultListableBeanFactory;//IOC容器本身
/**
* 初始化完成后自动调用
*/
@Override
public void afterPropertiesSet() {
Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
for (String key : map.keySet()) {
KafkaStreamListener k = map.get(key);
KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
//将对象交给spring容器管理 <bean id = "beanName"
defaultListableBeanFactory.registerSingleton(beanName,processor.doAction());
logger.info("add kafka stream auto listener [{}]",beanName);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.util.Assert;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
/**
* KafkaStream自动处理包装类
*/
public class KafkaStreamProcessor {
// 流构建器
StreamsBuilder streamsBuilder;
private String type;
KafkaStreamListener listener;
public KafkaStreamProcessor(StreamsBuilder streamsBuilder, KafkaStreamListener kafkaStreamListener) {
this.streamsBuilder = streamsBuilder;
this.listener = kafkaStreamListener;
this.parseType();
Assert.notNull(this.type, "Kafka Stream 监听器只支持kstream、ktable,当前类型是" + this.type);
}
/**
* 通过泛型类型自动注册对应类型的流处理器对象
* 支持KStream、KTable
*
* @return
*/
public Object doAction() {
if ("kstream".equals(this.type)) {
KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
stream = (KStream) listener.getService(stream);
stream.to(listener.sendTopic());
return stream;
} else {
KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
table = (KTable) listener.getService(table);
table.toStream().to(listener.sendTopic());
return table;
}
}
/**
* 解析传入listener类的泛型类
*/
private void parseType() {
Type[] types = listener.getClass().getGenericInterfaces();
if (types != null) {
for (int i = 0; i < types.length; i++) {
if (types[i] instanceof ParameterizedType) {
ParameterizedType t = (ParameterizedType) types[i];
String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
if (name.contains("org.apache.kafka.streams.kstream.kstream") || name.contains("org.apache.kafka.streams.kstream.ktable")) {
this.type = name.substring(0, name.indexOf('<')).replace("org.apache.kafka.streams.kstream.", "").trim();
break;
}
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
实现KafkaListener接口,编写流处理逻辑
import com.heima.article.config.KafkaStreamListener;
import com.heima.common.constants.MQConstants;
import com.heima.model.article.dtos.ArticleVisitStreamMsg;
import com.heima.model.article.dtos.UpdateArticleMsg;
import com.heima.utils.common.JsonUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.checkerframework.checker.units.qual.A;
import org.checkerframework.checker.units.qual.K;
import org.springframework.stereotype.Component;
import javax.swing.table.AbstractTableModel;
import java.time.Duration;
import java.util.Arrays;
/**
* 热点文章行为数据实时更新流处理程序
*/
@Component
public class HotArticleStreamHandler implements KafkaStreamListener<KStream<String,String>> {
@Override
public String listenerTopic() {
return MQConstants.HOT_ARTICLE_INPUT_TOPIC;
}
@Override
public String sendTopic() {
return MQConstants.HOT_ARTICLE_OUTPUT_TOPIC;
}
@Override
public KStream<String, String> getService(KStream<String, String> stream) {
/**
* 原始消息格式:
* key value
* null {"articleId":1,"type":"LIKES"}
* null {"articleId":1,"type":"LIKES"}
* null {"articleId":2,"type":"LIKES"}
* null {"articleId":1,"type":"COMMENT"}
*
* key value
* null ["1:LIKES"]
* null ["1:LIKES"]
* null ["2:LIKES"]
* null ["2:COMMENT"]
*
* key value
* ["1:LIKES"] ["1:LIKES"]
* ["1:LIKES"] ["1:LIKES"]
* ["2:LIKES"] ["2:LIKES"]
* ["2:COMMENT"] ["2:COMMENT"]
*
*
* 分组统计:文章ID+类型进行分组
* key value
* 1:LIKES 100
* 2:LIKES 50
* 1:COMMENT 60
*
* 模板消息格式:
* key value
* null {"articleId":1,"like":100}
* null {"articleId":1,"comment":200}
* null {"articleId":2,"like":50}
*/
KTable<Windowed<Object>, Long> kTable = stream.flatMapValues(new ValueMapper<String, Iterable<?>>() {
@Override
public Iterable<?> apply(String value) { // 传入消息格式:{"articleId":1,"type":"LIKES"}
// 转换为对象
UpdateArticleMsg updateArticleMsg = JsonUtils.toBean(value, UpdateArticleMsg.class);
String msgValue = updateArticleMsg.getArticleId() + ":" + updateArticleMsg.getType().name();
// 拼接格式为1:LIKES
return Arrays.asList(msgValue);
}
}).map(new KeyValueMapper<String, Object, KeyValue<?, ?>>() {
@Override
public KeyValue<?, ?> apply(String key, Object value) {
// 将值作为key进行统计
return new KeyValue<>(value, value);
}
// 分组统计key
}).groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("total"));
// 收集统计结果做一个格式转换
KStream kStream = kTable.toStream().map(new KeyValueMapper<Windowed<Object>, Long, KeyValue<?, ?>>() {
@Override
public KeyValue<?, ?> apply(Windowed<Object> windowed, Long value) {
String key = (String)windowed.key(); // 1:LIKES
String[] array = key.split(":");
ArticleVisitStreamMsg streamMsg = new ArticleVisitStreamMsg();
streamMsg.setArticleId(Long.valueOf(array[0]));
String type = array[1];
switch (UpdateArticleMsg.UpdateArticleType.valueOf(type)){
case VIEWS:
streamMsg.setView(value);
break;
case COLLECTION:
streamMsg.setCollect(value);
break;
case COMMENT:
streamMsg.setComment(value);
break;
case LIKES:
streamMsg.setLike(value);
}
return new KeyValue<>("",JsonUtils.toString(streamMsg));
}
});
return kStream;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
生产者
//行为触发后发送消息给KafkaStream统计
UpdateArticleMsg msg = new UpdateArticleMsg();
msg.setArticleId(dto.getArticleId());
msg.setType(UpdateArticleMsg.UpdateArticleType.LIKES);
2
3
4
消费者
/**
* 监听热点文章行为数据实时更新
*/
@Component
@Slf4j
public class HotArticleHandlerListener {
@Autowired
private HotArticleService hotArticleService;
@KafkaListener(topics = MQConstants.HOT_ARTICLE_OUTPUT_TOPIC)
public void handlerHotArticle(String value){
log.info("开始热点文章行为数据实时更新...");
if(StringUtils.isNotEmpty(value)){
ArticleVisitStreamMsg streamMsg = JsonUtils.toBean(value, ArticleVisitStreamMsg.class);
hotArticleService.updateHotArticle(streamMsg);
log.info("更新热点文章行为数据实时完成...");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Kafka 集成
# Springboot 集成 Kafka
导入依赖
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2
3
4
5
配置yml
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.66.133:9092
producer:
# 重试次数
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test-hello-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
* 普通消费发现
*/
@GetMapping("/hello1")
public String hello1(){
kafkaTemplate.send("boot","1001","hello springboot kafka!");
return "发送成功";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class HelloListener {
@KafkaListener(topics = "boot")
public void handleHello1(ConsumerRecord<String,String> record){
String key = record.key();
String value = record.value();
System.out.println(key+"---"+value);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# Kafka 调优
# 硬件配置
# 操作系统
Kafka的网络客户端底层使用Java NIO的Selector方式,而Selector在Linux的实现是epoll,在Windows上实现机制为select。因此Kafka部署在Linux会有更高效的I/O性能。
数据在磁盘和网络之间进行传输时候,在Linux上可以享受到零拷贝机制带来的快捷和便利高效,而Windows在一定程度上会使用零拷贝操作。所以建议Kafka部署在Linux操作系统上。
# 磁盘选择
Kafka 存储方式为顺序读写,机械硬盘的最大劣势在于随机读写慢。所以使用机械硬盘并不会造成性能低下。所以磁盘选用普通机械硬盘即可,Kafka自身已经有冗余机制,而且通过分区的设计,实现了负载均衡的功能。不做磁盘组raid阵列也是可以的。
磁盘空间需要多少,需要根据具体场景进行简单估算:
设计场景:日志数据每天向kafka发送1亿条数据,每条数据有两个副本防止数据丢失,数据保存两周,每条消息平均大小为1KB。
- 每天1亿条1KB消息,保存两份,则每天总大小为:(100000000 x 1KB x 2)/1024/1024≈200GB
- kafka除了消息数据还有其他类型的数据,故增加10%的冗余空间,则需要220GB
- 两周时间则为 220GBx14≈3TB
如果启用压缩,压缩比约在 0.75 左右,则总存储空间规划为3TBx0.75=2.25TB;
# 网络带宽
带宽情况最容易成为 kafka 的瓶颈。如果网络为万兆带宽,基本不会出现网络瓶颈,如果数据量特别大,按照下文中的设计场景进行计算。如果网络为百兆或者千兆带宽,在处理较大数据量场景下会出现网络瓶颈,可按照下面的传统经验公式进行计算处理,也可按照下述场景按照自己生产实际情况进行设计。
经验公式:服务器台数 = 2 × (生产者峰值生产速率 × 副本数 ÷ 100) + 1
设计场景:如果机房为千兆带宽,我们需要在一小时内处理1TB的数据,需要多少台kafka 服务器?
- 由于带宽为千兆网,1000Mbps=1Gbps,则每秒钟每个服务器能收到的数据量为 1Gb=1000Mb
- 假设 Kafka 占用整个服务器网络的70%(其他 30%为别的服务预留),则Kafka可以使用到700Mb 的带宽,但是如果从常规角度考虑,我们不能总让Kafka顶满带宽峰值,所以需要预留出2/3甚至3/4的资源,也就是说,Kafka单台服务器使用带宽实际应为 700Mb/3=240Mb
- 1 小时需要处理1TB数据,1TB=102410248Mb=8000000Mb,则一秒钟处理数据量为:8000000Mb/3600s=2330Mb 数据。
- 需要的服务器台数为:2330Mb/240Mb≈10 台。
- 考虑到消息的副本数如果为 2,则需要20台服务器,副本如果为3,则需要30台服务器。
# 内存配置
Kafka运行过程中设计到的内存主要为JVM的堆内存和操作系统的页缓存,每个Broker节点的堆内存建议10-15G内存,而数据文件(默认为1G)的25%在内存就可以了。综合上述,Kafka在大数据场景下能够流畅稳定运行至少需要11G,建议安装Kafka的服务器节点的内存至少大于等于16G。
# CPU选择
观察所有的Kafka与线程相关的配置,一共有以下几个:
| 参数名称 | 备注 | 默认值 |
|---|---|---|
| num.network.threads | 服务器用于接收来自网络的请求并向网络发送响应的线程数 | 3 |
| num.io.threads | 服务器用于处理请求的线程数,其可能包括磁盘I/O | 8 |
| num.replica.fetchers | 副本拉取线程数,调大该值可以增加副本节点拉取的并行度 | 1 |
| num.recovery.threads.per.data.dir | 每个数据目录在启动时用于日志恢复和在关闭时刷新的的线程数 | 1 |
| log.cleaner.threads | 用于日志清理的后台线程数 | 1 |
| background.threads | 用于各种后台处理任务的线程数 | 10 |
在生产环境中,建议CPU核数最少为16核,建议32核以上,方可保证大数据环境中的Kafka集群正常处理与运行。
# 集群容错
# 副本分配策略
Kafka采用分区机制对数据进行管理和存储,每个Topic可以有多个分区,每个分区可以有多个副本。应根据业务需求合理配置副本,一般建议设置至少2个副本以保证高可用性。
# 故障转移方案
当Kafka集群中的某个Broker节点发生故障时,其负责的分区副本将会被重新分配到其他存活的Broker节点上,并且会自动选择一个备份分区作为新的主分区来处理消息的读写请求。
# 数据备份与恢复
Kafka采用基于日志文件的存储方式,每个Broker节点上都有副本数据的本地备份。在数据备份方面,可以通过配置Kafka的数据保留策略和数据分区调整策略来保证数据的持久性和安全性;在数据恢复方面,可以通过查找备份数据并进行相应的分区副本替换来恢复数据。
# 参数配置优化
| 参数名 | 默认参数值 | 位置 | 优化场景 | 备注 |
|---|---|---|---|---|
| num.network.threads | 3 | 服务端 | 低延迟 | |
| num.io.threads | 8 | 服务端 | 低延迟 | |
| socket.send.buffer.bytes | 102400(100K) | 服务端 | 高吞吐 | |
| socket.receive.buffer.bytes | 65536(64K) | 服务端 | 高吞吐场景 | |
| max.in.flight.requests.per.connection | 5 | 生产端 | 幂等 | |
| buffer.memory | 33554432(32M) | 生产端 | 高吞吐 | |
| batch.size | 16384(16K) | 生产端 | 提高性能 | |
| linger.ms | 0 | 生产端 | 提高性能 | |
| fetch.min.bytes | 1 | 消费端 | 提高性能 | 网络交互次数 |
| max.poll.records | 500 | 消费端 | 批量处理 | 控制批量获取消息数量 |
| fetch.max.bytes | 57671680 (55M) | 消费端 | 批量处理 | 控制批量获取消息字节大小 |
# 数据压缩和批量发送
通过压缩和批量发送可以优化Kafka的性能表现。Kafka支持多种数据压缩算法,包括Gzip、Snappy、LZ4和zstd。在不同场景下,需要选择合适的压缩算法,以确保性能最优。
下面的表格为网络上不同压缩算法的测试数据,仅作参考
| 压缩算法 | 压缩比率 | 压缩效率 | 解压缩效率 |
|---|---|---|---|
| snappy | 2.073 | 580m/s | 2020m/s |
| lz4 | 2.101 | 800m/s | 4220m/s |
| zstd | 2.884 | 520m/s | 1600m/s |
从表格数据可以直观看出,zstd有着最高得压缩比,而LZ4算法,在吞吐量上表现得非常高效。
对于Kafka而言:
- 在吞吐量上比较:lz4 > snappy>zstd>gzip。
- 而在压缩比上:zstd>lz4>gzip>snappy。
Kafka支持两种批处理方式:异步批处理和同步批处理。在不同场景下,需要选择合适的批处理方式,进行性能优化。同时需要合理设置批处理参数,如batch.size、linger.ms等。