消息队列 Kafka
Kafka 数据管道是流计算系统中最常用的数据源(Source)和数据目的(Sink)。您可以把流数据导入到 Kafka 的某个 Topic 中,通过 Flink 算子进行处理后,输出到相同或不同 Kafka 的另一个 Topic 中。
Kafka 支持同一个 Topic 多分区读写,数据可以从多个分区读入,也可以写入到多个分区,以提供更高的吞吐量,减少数据倾斜和热点。
版本说明
当前仅支持 Flink 1.12 版本。
使用范围
Kafka 支持作为数据源表(Source),也可以作为目的表(Sink)。
DDL 定义
在实际使用中请根据实际情况配置字段名和 WITH 参数。
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
元数据字段
以下元数据可以作为表定义中的元数据字段进行访问。
读/写列定义元数据字段是否可读(R)
或可写(W)
或可读写(R/W)
。
只读列(R)必须声明为 VIRTUAL。
Key | 数据类型 | 说明 | 读/写 |
---|---|---|---|
topic |
STRING NOT NULL |
Kafka 消息所在的 Topic 名称。 |
R |
partition |
INT NOT NULL |
Kafka 消息所在的分区 ID。 |
R |
headers |
MAP<STRING, BYTES> NOT NULL |
Kafka 消息的消息头(header)。 |
R/W |
leader-epoch |
INT NULL |
Kafka 消息的 Leader epoch。 |
R |
offset |
BIGINT NOT NULL |
分区中 Kafka 消息的偏移量。 |
R |
timestamp |
TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL |
Kafka 消息的时间戳。 |
R/W |
timestamp-type |
STRING NOT NULL |
Kafka 消息的时间戳类型。
|
R |
Kafka 源表 WITH 参数
参数值 | 是否必填 | 默认值 | 数据类型 | 描述 | ||
---|---|---|---|---|---|---|
connector |
是 |
无 |
String |
固定值为 |
||
topic |
否 |
无 |
String |
Kafka Topic 名称。
|
||
topic-pattern |
否 |
无 |
String |
匹配读取 Topic 名称的正则表达式。
|
||
properties.bootstrap.servers |
是 |
无 |
String |
Kafka Broker 地址列表,以 |
||
properties.group.id |
是 |
无 |
String |
Kafka 消费组 ID。 |
||
properties.* |
否 |
无 |
String |
后缀名称必须是 Kafka 配置文档 中定义的配置项。Flink 会将 properties. 前缀移除,并将剩余的键和值传递给 Kafka 客户端。 |
||
format |
是 |
无 |
String |
在反序列化来自 Kafka 的消息 value 部分时使用的格式。
有关更多详细信息,请参考官方文档。 |
||
value.format |
是 |
无 |
String |
与 format 同样含义,只能配置其中一个。 |
||
key.format |
否 |
无 |
String |
在反序列化来自 Kafka 的消息 value 部分时使用的格式。
有关更多详细信息,请参考官方文档。
|
||
key.fields |
否 |
无 |
String |
消息键解析出来的数据存放的字段。 |
||
key.fields-prefix |
否 |
无 |
String |
为所有消息键指定自定义前缀,以避免与消息体格式字段重名,默认前缀为空。
|
||
value.fields-include |
否 |
ALL |
String |
控制哪些字段应该出现在消息 value 解析出来的数据中。可取值:
|
||
scan.startup.mode |
否 |
group-offset |
String |
Kafka consumer 的启动模式。包括:lastest-offset、earliest-offset、specific-offset、group-offset、timestamp。详细信息请参考:启动模式。 |
||
scan.startup.specific-offsets |
否 |
无 |
Sring |
scan.startup.mode 选择 specific-offsets 时填写,指定各个分区 offset 的位置。 |
||
scan.startup.timestamp-millis |
否 |
无 |
Long |
scan.startup.mode 选择 timestamp 时填写,指定启动的时间戳,单位为毫秒。 |
||
scan.topic-partition-discovery.interval |
否 |
无 |
Duration |
Kafka consumer 定期发现动态创建的 Kafka topic 和分区的时间间隔。 |
Kafka 结果表 WITH 参数
参数 | 是否必填 | 默认值 | 数据类型 | 描述 | ||
---|---|---|---|---|---|---|
connector |
是 |
无 |
String |
固定值为 |
||
topic |
是 |
无 |
String |
结果写入的 Topic 名称。 |
||
properties.bootstrap.servers |
是 |
无 |
String |
Kafka Broker 地址列表,以(,)分隔,格式为 |
||
sink.partitioner |
否 |
Default |
String |
Flink 分区到 Kafka 分区的映射模式。有效值为:
|
||
sink.parallelism |
否 |
无 |
Integer |
Kafka sink 算子的并行度,默认情况下,框架使用上游算子相同的平行度。 |
||
properties.group.id |
否 |
无 |
String |
Kafka 消费组 ID。 |
||
sink.semantic |
否 |
at-least-once |
String |
Kafka的写入策略。
|
||
value.format |
是 |
无 |
String |
与 format 同样含义,只能配置其中一个。 |
||
key.fields-prefix |
否 |
无 |
String |
为所有消息 key 指定自定义前缀,以避免与消息 value 字段重名,默认前缀为空。
|
||
format |
是 |
无 |
String |
|||
key.format |
否 |
无 |
String |
序列化 Kafka 消息 value 部分时使用的格式。取值如下:
有关更多详细信息,请参考官方文档。
|
||
value.fields-include |
否 |
ALL |
String |
控制哪些字段应该出现在消息 value 解析出来的数据中。可取值:
|
||
key.fields |
否 |
无 |
String |
消息 key 的数据字段。 |
CSV 格式
详情请参见:CSV 格式。
CSV 格式 DDL 定义
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)
CSV 格式 WITH 参数
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
format |
是 |
无 |
String |
格式,固定值为 |
csv.field-delimiter |
否 |
, |
String |
指定 CSV 字段分隔符,默认是半角逗号。字段分隔符,必须是单个字符。您可以使用反斜杠指定特殊字符,例如表示制表符。您还可以使用 unicode,例如 |
csv.disable-quote-character |
否 |
false |
Boolean |
禁止字段包围引号。如果为 true,则 |
csv.quote-character |
否 |
" |
String |
字段包围引号。默认是双引号。 |
csv.allow-comments |
否 |
false |
Boolean |
忽略 # 开头的注释行(请务必将 csv.ignore-parse-errors 设为 true)。 |
csv.ignore-parse-errors |
否 |
false |
Boolean |
忽略处理错误。对于无法解析的字段,会输出为 null。 |
csv.array-element-delimiter |
否 |
; |
String |
数组元素的分隔符。 |
csv.escape-character |
否 |
无 |
String |
指定转义符,默认禁用转义。 |
csv.null-literal |
否 |
无 |
String |
将指定的字符串看作 null 值。 |
JSON 格式
详情请参见:JSON 格式。
JSON 格式 DDL 定义
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
JSON 格式 WITH 参数
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
format |
是 |
无 |
String |
格式,固定值为 |
json.fail-on-missing-field |
否 |
false |
Boolean |
|
json.ignore-parse-errors |
否 |
false |
Boolean |
|
json.timestamp-format.standard |
否 |
SQL |
String |
指定 JSON 时间戳
|
json.map-null-key.mode |
否 |
FAIL |
String |
序列化 Map 遇到 null key 时的处理模式:
|
json.map-null-key.literal |
否 |
null |
String |
|
Avro 格式
详情请参见:Avro 格式。
Avro 格式 DDL 定义
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = 'testGroup',
'format' = 'avro'
)
Avro 格式 WITH 参数
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
format |
是 |
无 |
String |
格式,固定值为 |
Confluent Avro 格式
Avro Schema Registry (avro-confluent
) 格式支持读取由 io.confluent.kafka.serializers.KafkaAvroSerializer
序列化后的消息
输出的消息响应的也可以被 io.confluent.kafka.serializers.KafkaAvroDeserializer
读取。详情请参见:Confluent Avro 格式。
Debezium 格式
Debezium 是一个 CDC(Changelog Data Capture)工具,可以将 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的实时 changelog 流式传输到 Kafka。Debezium 为 changelog 提供统一的 schema 格式,并支持使用 JSON 和 Apache Avro 序列化消息。详情请参见:Debezium 格式。
Canal 格式
Maxwell 格式
Maxwell 是一个 CDC(Changelog Data Capture)工具,可以将 MySQL 的实时 changelog 流式传输到 Kafka 和其他流式 connector。Maxwell 为 changelog 提供统一的 schema 格式,并支持使用 JSON 序列化消息。详情请参见:Maxwell 格式。
Raw 格式
Raw 格式允许将原始(基于字节的)值作为单列进行读写。详情请参见:Raw 格式。
Raw 格式 DDL 定义
CREATE TABLE nginx_log (
log STRING
) WITH (
'connector' = 'kafka',
'topic' = 'nginx_log',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = 'testGroup',
'format' = 'raw'
)
Raw 格式 WITH 参数
参数 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
format |
是 |
无 |
String |
格式,固定值为 |
raw.charset |
否 |
UTF-8 |
String |
编码字符集 |
raw.endianness |
否 |
big-endian |
String |
编码字节序。有效值为 |
代码示例
示例一
从 Kafka 中读取数据后插入 Kafka。
从名称为 source 的 Topic 中读取 Kafka 数据,再写入名称为 sink 的 Topic,数据使用 json 格式。
CREATE TEMPORARY TABLE Kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'value.format' = 'json'
);
CREATE TEMPORARY TABLE Kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'value.format' = 'json'
);
INSERT INTO Kafka_sink SELECT id, name, age FROM Kafka_source;
示例二
key 和 value 包含相同名称的字段。
CREATE TABLE KafkaTable (
`k_version` INT,
`k_user_id` BIGINT,
`k_item_id` BIGINT,
`version` INT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.fields-prefix' = 'k_',
'key.fields' = 'k_version;k_user_id;k_item_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)
示例三
常规 kafka 与 upsert-kafka 做 join 查询,对实时交易数据与实时汇率数据做联合查询,获取实时交易额。
CREATE TEMPORARY TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECONDS,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TEMPORARY TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'ALL'
);
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
t.total,
c.currency_code,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code;