用 Docker 快速搭建 Kafka 集群

版本

  • JDK 14
  • Zookeeper
  • Kafka

安装 Zookeeper 和 Kafka

Kafka 依赖 Zookeeper,所以我们需要在安装 Kafka 之前先拥有 Zookeeper。准备如下的 docker-compose.yaml 文件,将文件中的主机地址 192.168.1.100 替换成你自己的环境中的主机地址即可。

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

|

version: “3”

services:

  zookeeper:

    image: zookeeper

    build:

      context: ./

    container_name: zookeeper

    ports:

      - 2181:2181

    volumes:

      - ./data/zookeeper/data:/data

      - ./data/zookeeper/datalog:/datalog

      - ./data/zookeeper/logs:/logs

    restart: always

  kafka_node_0:

    depends_on:

      - zookeeper

    build:

      context: ./

    container_name: kafka-node-0

    image: wurstmeister/kafka

    environment:

      KAFKA_BROKER_ID: 0

      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9092

      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092

      KAFKA_NUM_PARTITIONS: 3

      KAFKA_DEFAULT_REPLICATION_FACTOR: 2

    ports:

      - 9092:9092

    volumes:

      - ./data/kafka/node_0:/kafka

    restart: unless-stopped

  kafka_node_1:

    depends_on:

      - kafka_node_0

    build:

      context: ./

    container_name: kafka-node-1

    image: wurstmeister/kafka

    environment:

      KAFKA_BROKER_ID: 1

      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9093

      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093

      KAFKA_NUM_PARTITIONS: 3

      KAFKA_DEFAULT_REPLICATION_FACTOR: 2

    ports:

      - 9093:9093

    volumes:

      - ./data/kafka/node_1:/kafka

    restart: unless-stopped

  kafka_node_2:

    depends_on:

      - kafka_node_1

    build:

      context: ./

    container_name: kafka-node-2

    image: wurstmeister/kafka

    environment:

      KAFKA_BROKER_ID: 2

      KAFKA_ZOOKEEPER_CONNECT: 192.168.1.100:2181

      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.100:9094

      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094

      KAFKA_NUM_PARTITIONS: 3

      KAFKA_DEFAULT_REPLICATION_FACTOR: 2

    ports:

      - 9094:9094

    volumes:

      - ./data/kafka/node_2:/kafka

    restart: unless-stopped

|

输入 docker-compose up -d 运行脚本文件进行集群构建。等待一会儿,得到如下结果即为成功。

SpringBoot 集成 Kafka 集群

创建一个全新的 SpringBoot 工程,在 build.gradle 文件中添加下列依赖。

|

1

2

3

4

5

6

|

dependencies {

    …

    …

    implementation ‘org.springframework.kafka:spring-kafka:2.5.2.RELEASE’

    implementation ‘com.alibaba:fastjson:1.2.71’

}

|

  1. 在 application.properties 进行 Kafka 相关参数配置

|

1

2

3

4

5

6

7

8

9

10

11

|

spring.kafka.bootstrap-servers=192.168.1.100:9092,192.168.1.100:9093,192.168.1.100:9094

spring.kafka.producer.retries=0

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

|

  1. 创建消息体类。

|

1

2

3

4

5

|

public class Message {

    private Long id;

    private String message;

    private Date sendAt;

}

|

  1. 创建消息发送者

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

|

public class Sender {

    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;

    public void send() {

        Message message = new Message();

        message.setId(System.currentTimeMillis());

        message.setMessage(UUID.randomUUID().toString());

        message.setSendAt(new Date());

        log.info(“message = {}”, JSON.toJSONString(message));

        kafkaTemplate.send(“test”, JSON.toJSONString(message));

    }

}

|

  1. 创建消息接收者

|

1

2

3

4

5

6

7

8

9

10

|

public class Receiver {

    @KafkaListener(topics = {“test”}, groupId = “test”)

    public void listen(ConsumerRecord, ?> record) {

        Optional<?> message = Optional.ofNullable(record.value());

        if (message.isPresent()) {

            log.info("receiver record =" + record);

            log.info("receiver message =" + message.get());

        }

    }

}

|

  1. 测试消息队列

|

1

2

3

4

5

6

7

8

9

10

11

|

public class QueueController {

    @Autowired

    private Sender sender;

    @PostMapping(“/test”)

    public void testQueue() {

        sender.send();

        sender.send();

        sender.send();

    }

}

|

得到如下日志即为集成成功。


到这里就我们就成功搭建了一个 Kafka 伪集群,并成功与 SpringBoot 进行整合。