功能概述

Kafka 通过 Topic 进行生产消息和消费消息,生产者往 Topic 中写消息,消费者从 Topic 中读消息。

操作步骤

通过 Kafka Manager 创建 Topic

用户可以通过 Kafka Manager 创建 Topic,详情可参考使用 Kafka Manager 管理 Topic

通过 Kafbat UI 创建 Topic

  1. 在本地浏览器里输入 Kafbat UI 地址 http://client_ip:port

    • client_ip 为 客户端节点的 IP 地址。

      • 内网连接:IP 地址为客户端节点内网 IP。

      • 公网连接:IP 地址为客户端节点绑定的公网 IP。

    • port 为 Kafbat UI 的访问端口,可通过集群配置参数 kafka-manager.port 进行设置,默认为 9000

  2. 如果集群配置参数 kafka-manager.basicAuthentication.enabled 设置为 true,则表示需要登录,请使用配置的帐号登录。该参数默认为 false,即不需要登录。集群参数配置可参考修改配置参数

  3. 在 Kafbat UI 页面,点击左侧 Topics,进入 Topic 列表页面。

  4. 在 Topic 列表页面,点击右上角的 + Add Topic,弹出 Topic 创建对话框。

    kafbat_ui_add_topic
  5. 用户根据页面信息,填写相关参数后,点击Create Topic,完成 Topic 创建操作。

    Topic Name 为必填项,其余参数若不填写,系统会使用集群级别默认参数。

使用 Kafka 命令行工具创建 Topic

  1. 为了方便用户通过本地访问 Kafka 集群,用户可以为客户端节点开启公网访问,或通过 VPN 的方式打通网络,以确保本地服务器可以访问 Kafka 集群网络。

    详情可分别参考 绑定公网 IP VPN

  2. 在本地服务器终端,执行以下命令行,创建 Topic。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num>
    
    # 已开启 SASL
    ./kafka-topics.sh --create --topic <topic_name> --bootstrap-server <Kafka 连接地址> --partitions <partition_num> --replication-factor <replication_num> --command-config /ssl/kafka.config

    命令行参数说明

    参数 说明

    topic_name

    Topic 名称。

    Kafka 连接地址

    可在 Kafka 集群详情页面,获取连接地址。

    partition_num

    Partition 数。每个 Topic 被拆分成多个 Partition,每个 Partition 由一系列有序的消息组成。

    replication_num

    副本数量。

  3. 在本地服务器终端,执行以下命令行,查看刚创建的 Topic。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址>
    
    # 已开启 SASL
    ./kafka-topics.sh --list --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config
  4. 在本地服务器终端,执行以下命令行,平衡 Topic 分区 Leader。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址>
    
    # 已开启 SASL
    ./kafka-preferred-replica-election.sh --bootstrap-server <Kafka 连接地址> --command-config /ssl/kafka.config
  5. 在本地服务器终端,执行以下命令行,更改 Topic 配置参数。用户也可以在创建 Topic 的时候指定,格式为 --config a=b --config x=y

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value>
    
    # 已开启 SASL
    ./kafka-configs.sh --bootstrap-server <Kafka 连接地址> --entity-type topics --entity-name <topic_name> --alter --add-config <para_name>=<para_value> --command-config /ssl/kafka.config
  6. 在本地服务器终端,执行以下命令行,修改 Topic 分区。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num>
    
    # 已开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --alter --topic <topic_name> --partitions <partition_num> --command-config /ssl/kafka.config
  7. 在本地服务器终端,执行以下命令行,删除 Topic。

    cd /opt/kafka/current/bin
    
    # 未开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name>
    
    # 已开启 SASL
    ./kafka-topics.sh --bootstrap-server <Kafka 连接地址> --delete --topic <topic_name> --command-config /ssl/kafka.config