本文最后更新于 2024-05-16,文章内容可能已经过时。

1.介绍:

Apache Kafka是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。用Scala和Java编写。运行在JVM上。

Apache Kafka起源于LinkedIn,后来于2011年成为开源Apache项目,然后于2012年成为First-class Apache项目。

特性:

  • 持久化:磁盘,有冗余备份机制(replication)保证高可用.

  • 分布式:系统,易于向外扩展。所有的producer、broker和consumer均为分布式存在。可直接在线扩展集群机器.

2.架构:

image-20240416222907188

2.1 生产者和消费者(producer和consumer):

消息的发送者叫 Producer,消息的使用者和接受者是 Consumer,生产者将数据保存到 Kafka 集群中,消费者从中获取消息进行业务的处理。

2.2 代理 Broker:

Kafka 集群中有很多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个 kafka 实例,也叫做 broker。

2.3 Topic:

一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。

topic经过分片策略被分到不同的分区,而每个分区又使用主从复制策略保证消息的可靠性。

2.4 分区:partition:数据分片

每个 topic 都可以分成多个 partition。这些partition分布在集群的不同broker上。不同的分区中存储的数据不同。

每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。

kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。

另外,每个分区对应一个文件,可以分别存储到不同的机器上,以实现分布式的集群存储,每个 partition 可以有一定的副本,备份到多台机器上,以提高可用性。

2.5 副本:replicated:主从复制:

每个分区内部有多个副本,每个分区有一个leader,多个follower。

kafka 还可以配置 partitions 需要备份的个数(replicas),每个partition 将会被备份到多台机器上,以提高可用性,备份的数量可以通过配置文件指定。

由 leader 负责所有对该分区的读写,其他 server 作为 follower 只需要简单的与 leader 同步,保持跟进即可。如果原来的 leader 失效,会重新选举由其他的 follower 来成为新的 leader。

Kafka 使用 ZK 在 Broker 中选出一个 Controller,用于 Partition 分配和 Leader 选举。

另外,这里我们可以看到,实际上作为 leader 的 server 承担了该分区所有的读写请求,因此其压力是比较大的,从整体考虑,有多少个 partition 就意味着会有多少个leader,kafka 会将 leader 分散到不同的 broker 上,确保整体的负载均衡。

2.6 ZooKeeper:

ZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知,然后生产者和消费者采取决定并开始与某些其他代理协调他们的任务。

Kafka 在 Zookeeper 中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。

由于所有关键信息存储在 Zookeeper 中,并且它通常在其整体上复制此数据。

有人可能会说我在使用Kafka的时候就没有安装ZK,那是因为Kafka内置了一个ZK,一般我们不使用它。

4.工作流程:

整体工作流程如下:

  • 生产者定期向主题发送消息。

  • Kafka 代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka 将在第一分区中存储一个消息,在第二分区中存储第二消息。

  • 消费者订阅特定主题。

  • 一旦消费者订阅主题,Kafka 将向消费者提供主题的当前偏移,并且还将偏移保存在 Zookeeper 系统中。

  • 消费者将定期请求 Kafka (如100 Ms)新消息。

  • 一旦 Kafka 收到来自生产者的消息,它将这些消息转发给消费者。

  • 消费者将收到消息并进行处理。

  • 一旦消息被处理,消费者将向 Kafka 代理发送确认。

  • 一旦 Kafka 收到确认,它将偏移更改为新值,并在 Zookeeper 中更新它。 由于偏移在 Zookeeper 中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。

  • 以上流程将重复,直到消费者停止请求。

  • 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

4.1 发送数据:

4.1.1 指定参数:

对于生产者要写入的一条记录,可以指定四个参数:分别是 topicpartitionkeyvalue,其中 topic 和 value(要写入的数据)是必须要指定的,而 key 和 partition 是可选的。对于一条记录,先对其进行序列化,然后按照 Topic 和 Partition,放进对应的发送队列中。

如果 Partition 没填:

  1. 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。

  2. 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

4.1.2 发送数据:异步:

producer 将会和Topic下所有 partition leader 保持 socket 连接,消息由 producer 直接通过 socket 发送到 broker。其中 partition leader 的位置( host : port )注册在 zookeeper 中,producer 作为 zookeeper client,已经注册了 watch 用来监听 partition leader 的变更事件,因此,可以准确的知道谁是当前的 leader。

producer 端采用异步发送:将多条消息暂且在客户端 buffer 起来,并将他们批量的发送到 broker,小数据 IO 太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。

4.1.3 幂等性:

发送消息如果配置了重试机制,比如由于网络波动,生产者未得到broker收到了消息的响应,就会触发重试机制,3秒后再次发送此消息。broker之前已经收到过这个消息,但生产者由于触发了重试机制,就导致了消息的重复发送。那么broker就会在磁盘缓存多条同样的消息,消费端从broker拉取消息时,就会造成重复消费。

在0.11版本的Kafka之前,只能保证数据不丢失,在下游对数据的重复进行去重操作,多余多个下游应用的情况,则分别进行全局去重,对性能有很大影响。

0.11版本的kafka,引入了一项重大特性:幂等性,幂等性指代Producer不论向Server发送了多少次重复数据,Server端都只会持久化一条数据

启用幂等性,即在Producer的参数中设置enable.idempotence=true即可,Kafka的幂等性实现实际是将之前的去重操作放在了数据上游来做,开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number,而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息的时候,Broker只会持久化一条。

但PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。

4.1.4 指定分区发送:

1、没有设置key我们的消息就会被轮询的发送到不同的分区。

2、设置了key : 使用kafka自带的分区器,会根据key计算出来一个hash值,这个hash值会对应某一个分区。如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区

生产者在生产数据的时候,可以为每条消息人为地指定key,这样消息被发送到broker时,会根据分区规则选择消息将被存储到哪一个分区中。如果分区规则设置合理,那么所有的消息将会被均匀/线性的分布到不同的分区中,这样就实现了负载均衡和水平扩展。

但是有些比较特殊的时候,我们就需要自定义分区。

可以通过 实现 org.apache.kafka.clients.producer.Partitioner 接口,这个实现类可以根据自己的业务规则进行自定义制定分区,如根据hash算法指定分区的分布规则。

4.1.4 发送重试:

为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement,如果producer收到ack就会进行下一轮的发送,否则重新发送数据。

4.2 写入数据:

4.2.1 leader写入:

消息由producer发送到leader broker后,由leader将消息写入本地文件,然后对应分区的follower会主动向leader pull同步消息数据。

4.2.2 ISR:同步副本集

如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,那么leader就要一直等待下去,直到它同步完成,才可以发送ack,这种完全同步的方式太过于耗费时间。

解决方案:

leader中维护了一个动态的ISR(in-sync replica set),即与leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该之间阈值由replica.lag.time.max.ms参数设定。

4.2.3 ack返回:

在接收到消息后,leader想producer发送返回成功的信息,称为ack。

对于不同的数据,可以有不同的ack策略。比如对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等到ISR中所有的follower全部接受成功。

Kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡选择不同的配置。

  • 0:producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据。

  • 1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)

  • -1(all):producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack。

流程图:

image-20240416223345551

4.2 存储数据:

producer采用push模式将数据发布到broker,每条消息追加append 写(顺序写,性能高) 到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的。

image-20240417140500465

Kafka文件存储也是通过本地落盘的方式存储的,主要是通过相应的log与index等文件保存具体的消息文件。

4.2.1 文件结构:

image-20240416223652478

由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引的机制,将每个partition分为多个segment。

Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

image-20240417140706600

4.2.2 索引优化:

kafka为每个分段后的数据文件建立了索引文件,.index文件:

  • 索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引

  • 索引包含两个部分(均为4字节的数字),分别为相对应的offset和position

4.2.2.1 稀疏索引:

Kafka中采用了稀疏索引的方式读取索引,kafka每当写入了4k大小的日志(.log),就往index里写入一个记录索引。

4.2.3 存储策略:

无论消息是否被消费,kafka都会保存所有的消息。对旧数据的删除策略:

  • 基于时间,默认配置是168小时(7天)。

  • 基于大小,默认配置是1073741824。

需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能。

4.3 消费数据:

4.3.1 消费者组:

对于消费者,不是以单独的形式存在的,每一个消费者属于一个 consumer group,一个 group 包含多个 consumer。

订阅 Topic 是以一个消费组来订阅的,发送到 Topic 的消息,只会被订阅此 Topic 的每个 group 中的一个 consumer 消费。

如果所有的 Consumer 都具有相同的 group,那么就像是一个点对点的消息系统;如果每个 consumer 都具有不同的 group,那么消息会广播给所有的消费者。

4.3.2 消费方式:pull:

在 kafka 中,采用了 pull 方式,即 consumer 在和 broker 建立连接之后,主动去 pull(或者说 fetch )消息。

选择pull的原因:

  • push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

  • pull模式可简化broker的设计,更轻量级。

  • Consumer可自主控制消费消息的速率(offest),同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

  • 且由于kafka broker会持久化数据到磁盘,broker没有内存压力。因此,consumer非常适合采用pull的方式消费数据。

partition 中的消息只有一个 consumer 在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见 kafka broker 端是相当轻量级的。当消息被 consumer 接收之后,需要保存 Offset 记录消费到哪,以前保存在 ZK 中,由于 ZK 的写性能不好,以前的解决方法都是 Consumer 每隔一分钟上报一次,在 0.10 版本后,Kafka 把这个 Offset 的保存,从 ZK 中剥离,保存在一个名叫 consumeroffsets topic 的 Topic 中,由此可见,consumer 客户端也很轻量级。

4.3.3 消费分区:

每个消费者组中的每个消费者会消费不同分区的数据。这样所有的消费者加起来就消费了所有分区的数据,实现了一个消费者组消费一个topic的目的。

4.3.3 Offset:提交读取位置:

由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复以后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了那个offset,以便故障恢复后继续消费。

consumer记录消费到的位置offset,消费完后会将该offset提交到kafka中,kafka只保存已提交的offset

offset表示这条消息在他的partition中的偏移量,唯一标识一条消息

消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。

消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。

推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发。

关于offest的相关配置:

  • earliest: 如果这个topic有历史消息,现在新启动了一个消费者组,消费者会从头开始消费历史信息.

  • latest:如果这个topic有历史消息,现在新启动了一个消费者组,消费者会从连接上broker后接受的第一个消息开始消费,不会消费历史信息.

  • none:对于同一个消费者组,若没有提交过offset,会抛异常.

  • enable.auto.commit:该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量.

4.3.4 消息丢失:

消费端丢消息最主要体现在消费端offset的自动提交,如果开启了自动提交,万一消费到数据还没处理完,此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了,因为offset已经提交完毕,下次会从新offset处开始消费新消息。解决办法是采用消费端的手动提交

4.3.5 重复消费:

如果消费端拉取了一部分数据,消费完毕后,准备执行手动提交(或自动提交)时,消费者挂掉了,此时offset还未提交呢,那么当服务重启时,还是会拉取相同的一批数据重复处理,造成消息重复消费.

解决:无论是生产者还是消费者的重复消息,一般都会在消费端卡死,做幂等性处理。幂等性可以用redis的setnx分布式锁+业务逻辑去重来实现。

5.常见问题:

5.1 故障恢复:

首先了解两个概念:

LEO(Log End Offset):每个副本最后的一个offset

HW(High Watermark):高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。

5.1.1 follower故障和leader故障:

follower故障:follower发生故障后会被临时提出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。

leader故障:leader发生故障之后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据的一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。

这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

image-20240416225355745

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。

5.2 Rebalance:分区再均衡:

Rebalance 是一个操作.用来重新规划消费者组中,每个消费者应该消费哪个分区的问题。

5.2.1 触发条件:

  1. 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)

  2. 订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance

  3. 订阅主题的分区数发生变更(broker宕机/分区增加)

5.2.2 Coordinator:

每个consumer group 都会选择一个broker作为自己的coordinator,他是负责监控整个消费组里的各个分区的心跳,以及判断是否宕机,管理位移offset,和开启rebalance的coordinator存储了partition和消费者的对应关系,还有订阅的topic列表。

如果采用kafka集群存储消息的话,offset是提交到特定的主题,存储到coordinator broker里。

5.2.2.1 选择:

首先对group id 进行hash,接着对__consumer_offsets的分区数量进行取模,默认分区数量是50。

5.2.3 实现:

Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送心跳。

而rebalance是通过心跳线程完成的。rebalance的前提是coordinator已经确定了。

rebalance分为2步:Join和Sync。

5.2.3.1 Join:

加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

image-20240417144442793

5.2.3.2 Sync:

这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

image-20240417144545102

5.2.4 避免:

重平衡过程中,消费者无法从 kafka 消费消息,这对 kafkaTPS 影响极大,而如果 kafka 集群内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间 kafka 基本处于不可用状态。所以在线上环境中,应该尽量避免重平衡发生。

要说完全避免重平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以我们需要保证尽力避免消费者故障。

而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。

  1. 一般会约定一个时间,超时即判定对方挂了。而在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。

  2. 还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。

  3. 此外,还有最后一个参数,max.poll.interval.ms,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。

三个参数:

  • session.timout.ms控制心跳超时时间。

  • heartbeat.interval.ms控制心跳发送频率。

  • max.poll.interval.ms控制poll的间隔。

5.2.5 平衡冲突:

当触发Rebalance时 由于kafka正在分配所有权 1. 会导致消费者不能消费,2. 重复消费的问题,

消费者还没来得及提交偏移量时 分区所有权遭到了重新分配 那么这时候就会导致一个消息被多个消费者重复消费。

那么 解决方案就是在消费者订阅时, 添加一个再均衡监听器, 也就是当kafka在做Rebalance 操作前后 均会调用再均衡监听器 那么这时候 我们可以在kafka Rebalance之前提交我们消费者最后处理的消息来解决这个问题。

这里有三种rebalance的策略:range、round-robin、sticky。

5.3 顺序消息:

kafka想要保证消息顺序,是需要牺牲一定性能的,方法就是一个消费者,消费一个分区,可以保证消费的顺序性。但也仅限于消费端消费消息的有序性,无法保证生产者发送消息有序。

比如:如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。发送端消息发送已经乱序,到了消费端消费时,自然无法保证顺序!

如果一定要保证生产-消费全链路消息有序,发送端需要同步发送,ack回调不能设置为0。且只能有一个分区,一个消费者进行消费,但这样明显有悖于kafka的高性能理论.

5.4 消息积压:

线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。

解决方案:此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。

image-20240417102352310

由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。

解决方案:此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。这个死信队列,kafka并没有提供,需要整合第三方插件。

5.5 与rocketmq的区别:

  • 重平衡:kafka由于重平衡机制可能相比于rcmq不够可靠.

  • 功能:kafka不支持定时消息和事务消息。

  • 性能:由于producer每次是缓存批量发消息,所以比rcmq吞吐量要好一些。

6.部署:docker

6.1.创建一个网络:

不然不同容器间网络隔离无法相互访问

可以通过容器间共享网络network的方式,也可以通过link

6.2.安装zookeeper:

Kafka依赖zookeeper所以先安装zookeeper
-p:设置映射端口(默认2181)
-d:后台启动
docker run -d --name zookeeper-server \
    --network app-tier \
    -p 2181:2181 \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest

查看zookeeper容器日志(可省略)

docker logs -f zookeeper

6.3.安装kafka:

安装并运行Kafka,
–name:容器名称
-p:设置映射端口(默认9092 )
-d:后台启动
ALLOW_PLAINTEXT_LISTENER任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper
KAFKA_CFG_ADVERTISED_LISTENERS当前主机IP或地址(重点:如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误)
docker run -d --name kafka-server0 \
    -p 9092:9092 \
    -e KAFKA_BROKER_ID=0 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=43.139.143.108:2181 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://43.139.143.108:9092 \
    bitnami/kafka:latest
docker run -d --name kafka-server1 \
    -p 9093:9092 \
    -e KAFKA_BROKER_ID=1 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=43.139.143.108:2181 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://182.92.164.82:9092 \
    bitnami/kafka:latest
docker run -d --name kafka-server2 \
    -p 9094:9095 \
    -e KAFKA_BROKER_ID=2 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=43.139.143.108:2181 \
    -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://182.92.164.82:9095\
    bitnami/kafka:latest

Ps:集群部署,第一台服务器部署了一个kafka broker0 和一个zkp还有一个gui,第二个部署了2个broker 1,2

docker logs -f kafka-server

6.4.安装GUI:

docker run --name kafka-manager -d  \
    --network app-tier \
        -p 9000:9000  \
        -e ZK_HOSTS="43.139.143.108:2181" \
        sheepkiller/kafka-manager

我们使用/kafka-manager,可以用他创建topic,broker。