步骤三:生产消息
操作步骤
-
通过 VNC 登录 Kafka 客户端节点。详情参考通过 VNC 登录 Kafka。
-
在客户端节点执行以下命令行,向 Topic 发送消息。
-
命令行
cd /opt/kafka/current/bin ./kafka-console-producer.sh --broker-list <连接地址> --topic <Topic 名称> -
参数说明
参数 说明 连接地址
可在 Kafka 集群详情页面,查看连接地址。
Topic 名称
目标 Topic 名称,需根据实际情况进行填写。
-
命令行示例
$ cd /opt/kafka/current/bin ./kafka-console-producer.sh --broker-list 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092 --topic test
-
-
输入需要发送的消息内容后,按 Enter 键发送消息,每一行的内容都将作为一条消息发送至 Kafka。
>hi >hello world >how are you
附录
如何增大生产者发送消息的大小?
Kafka 默认生产者发送的消息不能超过 1M,如需增大生产者发送消息的大小,用户需要在生产端和服务端修改相关参数。
-
生产端
-
若生产者的消息大于 1M、小于等于 32M,增大
max.request.size参数即可。 -
若生产者的消息大于 32M,需要同时增大
max.request.size和buffer.memory参数,且buffer.memory参数值必须大于max.request.size参数值。
-
-
客户端
针对消费者,如果生产者的消息较大,可以修改参数
fetch.message.max.bytes,增加fetch的最大字节数,加大每次fetch的消息大小。 -
服务端
生产端增大了
max.request.size参数值,用户需增大服务端端message.max.bytes参数,否则服务端无法接收这么大的消息。详情可参考修改集群配置参数。说明 message.max.bytes参数值必须大于生产端设置的max.request.size参数值。
Kafka 增减分区对生产者有什么影响?
-
减少分区
若此时生产者未发送消息,将不会对生产者造成影响。若此时生产者正在发送消息,可能会导致部分消息发送失败,需要在生产者回调方法里进行重试,重新发送。
假设 Topic 有 3 个分区
p0,p1,p2,3 个 Broker0,1,2。p0在broker0上,p1在broker1上,p2在broker2上。详细说明如下。-
生产者发送 Message
1~10的消息给 Broker。 -
生产者开始发送消息时,会获取元数据信息。如下所示。
消息 broker message1 0 message2 1 message3 2 message4 0 message5 1 message6 2 -
message1,2,3 能正常发送。
-
message4,5,6 消息组装好后,还没发送之前,将
broker2下线。 -
message4,5 能正常发送,message6 会超时,抛异常,发送更新元数据请求。(需要在生产者回调方法里进行重试,否则消息发送不出去)
-
可用分区变成
p0与p1,剩下的消息会发到p0,p1分区。消息 broker message7 0 message8 1 message9 0 -
message7,8,9 能正常发送。
-
在生产者回调方法里进行重试发送 message6,否则该消息发送不出去。
-
-
增加分区
增加分区将不会对生产者造成影响。
假设 Topic 目前有 2 个分区
p0与p1,3 个 Broker 0,1,2。p0在broker0上,p1在broker1上。-
现在生产者发送很多消息给 Broker。
-
此时增加分区的个数,增加 1 个,也就是分区个数变成 3,如果元数据没有更新,那么消息是不能发送到分区
p2的,只会发送到p0与p1。 -
元数据更新后,消息才可以发到
p2。元数据更新频率为 5 分钟。
-