sink和source的脚本参考以下两个
source的脚本
set -x
if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi
source $1
echo "register Postgres connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:8084/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SOURCE_CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "${POSTGRES_HOST}",
"database.port": ${POSTGRES_PORT},
"database.user": "${POSTGRES_USER}",
"database.password": "${POSTGRES_PASSWORD}",
"topic.prefix": "${TOPIC_PREFIX}",
"database.dbname": "${DATABASE_DBNAME}",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "all_tables",
"producer.override.max.request.size": "524288000",
"decimal.handling.mode": "double",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "${TOPIC_PREFIX}.schema-changes",
"incrementing.column.name": "id",
"timestamp.column.name": "updated_at",
"max.retries": 3,
"retry.backoff.ms": 1000
}
}
EOF
sink的脚本
set -x
if [ ! $1 ];then
echo "Please specify a configuration file."
exit
fi
source $1
echo "register clickhouse sink connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:18084/connectors" --header 'Content-Type: application/json' --data @-
{
"name": "${SINK_CONNECTOR_NAME}",
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "1",
"topics.regex": "${TOPICS_REGEX}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",
"clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},
"store.kafka.metadata": true,
"topic.creation.default.partitions": 6,
"store.raw.data": false,
"store.raw.data.column": "raw_data",
"metrics.enable": true,
"metrics.port": 8084,
"buffer.flush.time.ms": 500,
"thread.pool.size": 2,
"sink.connector.max.queue.size": "200",
"fetch.max.wait.ms": 1000,
"fetch.min.bytes": 52428800,
"consumer.override.max.partition.fetch.bytes": "5242880",
"enable.kafka.offset": true,
"replacingmergetree.delete.column": "_sign",
"auto.create.tables": true,
"schema.evolution": false,
"deduplication.policy": "off",
"pk.mode": "record_value",
"pk.fields": "id",
"insert.mode": "upsert",
"max.retries": 3,
"retry.backoff.ms": 1000
}
}
EOF
在使用 Kafka Connect 同步数据时,出现目标数据库数据重复的问题通常是因为以下几个原因,通常与 幂等性 或 消费偏移量 的处理有关。下面是一些常见的原因和解决方案:
1. Kafka Connect 消费者偏移量(Offsets)未正确管理
Kafka Connect 会使用 消费偏移量 来记录每个任务(Source Connector 或 Sink Connector)处理到的消息的位置。如果偏移量管理不当,可能导致 重复消费。
解决方法:
- 确保目标数据库支持幂等性操作:如果目标数据库的连接器(例如,JDBC Sink Connector)不支持幂等性,你可能会遇到重复数据问题。你可以考虑在目标数据库上配置适当的主键,或者使用支持幂等性的操作(例如,使用 upsert 或 insert ... on duplicate key update 语句)。
- 启用幂等性:对于某些 Kafka Connect 连接器(比如 JDBC Sink Connector),可以配置 insert.mode 为 upsert 或 insert,以便处理重复数据。例如,使用 upsert 模式时,Kafka Connect 会尝试更新现有记录而不是插入新的记录,从而避免重复数据。
在 JDBC Sink Connector 的配置文件中,你可以设置:
jsonCopyEdit"insert.mode": "upsert", // 使用 upsert 来避免重复数据
2. Kafka Connect 的 “批量提交” 配置
Kafka Connect 在同步数据时,通常会以批量的方式将数据写入目标数据库。如果批量提交失败,或者 Kafka Connect 没有正确记录已经处理过的偏移量,那么它可能会重复处理相同的数据。
解决方法:
- 调整 batch.size 和 flush.size 配置,确保批量提交的大小合适,并且 Kafka Connect 在每次成功写入目标数据库后,能够及时提交偏移量。
- 检查 poll.interval.ms 配置,确保 Kafka Connect 正常轮询并提交偏移量。
3. 目标数据库中的主键或唯一约束缺失
如果目标数据库表中没有合适的主键或者唯一约束,Kafka Connect 会将每一条数据作为一条新的记录插入,而不会检测到数据是否已经存在。
解决方法:
- 确保目标数据库表有主键或者唯一约束。Kafka Connect 会使用这些主键/约束来判断是否应该插入数据或更新现有记录。
- 如果使用的是 JDBC Sink Connector,请确保 pk.mode 和 pk.fields 设置正确,指明哪个字段是主键,并配置 insert.mode 为 upsert。
"pk.mode": "record_value", // 主键字段配置 "pk.fields": "id", // 设置主键字段 "insert.mode": "upsert", // 使用 upsert 模式避免重复
4. Source Connector 数据重复
有时数据重复并不是目标数据库的问题,而是 Source Connector 本身在读取数据时重复读取了同一条数据。可能是因为 Kafka Connect 未能正确处理消费偏移量,或者 Source Connector 没有正确标记消息已经被处理。
解决方法:
- 检查 Source Connector 的偏移量提交配置:确保 Source Connector 的偏移量提交配置(如 offset.storage.topic)正确,并且 Kafka Connect 能够在成功消费数据后正确提交偏移量。
- 如果你使用的是 JDBC Source Connector,可以考虑启用 timestamp.column.name 或者 incrementing.column.name 来确保数据按时间顺序正确处理。
"incrementing.column.name": "id", // 使用增量列确保数据的唯一性 "timestamp.column.name": "updated_at" // 使用时间戳列确保数据按时间顺序处理
5. 处理时延或幂等性问题(source和sink都插入)
目标数据库可能由于并发或者网络延迟等问题,导致重复插入数据。特别是在高负载或大量数据同步的情况下,可能会出现数据写入延迟,或者提交偏移量时失败,导致重复处理。
解决方法:
- 确保 Kafka Connect 配置幂等性支持:对于一些数据库(例如,使用 JDBC Sink Connector),可以通过使用 幂等性插入 或 upsert 来避免数据重复。
- 配置 max.retries 和 retry.backoff.ms,在遇到暂时的写入失败时,Kafka Connect 会重试写入操作,避免因网络或数据库故障导致的数据丢失或重复。
"max.retries": 3, // 配置重试次数 "retry.backoff.ms": 1000 // 配置重试间隔
6. 使用幂等性机制(幂等插入)
一些数据库和 Kafka Connect 连接器支持 幂等性,即在插入数据时,系统会自动检查数据是否已存在,并避免重复插入。
对于 JDBC Sink Connector,你可以使用 upsert(更新或插入)操作,确保在数据库中遇到重复记录时进行更新而不是插入新的记录。具体可以设置:
"insert.mode": "upsert", // 使用 upsert 模式避免重复插入 "pk.mode": "record_value", // 指定主键字段 "pk.fields": "id", // 设置主键字段
7.启用偏移(sink处)
"enable.kafka.offset": true
"enable.kafka.offset": false 是 Kafka Connect 配置中的一个参数,主要用于控制是否启用 Kafka 消费者偏移量(offset)的存储方式。
Kafka Connect 消费者偏移量的作用
在 Kafka Connect 中,偏移量(offset)用于跟踪每个连接器(如 Source Connector)处理的 Kafka 消息的位置。每个消息在 Kafka 中都有一个唯一的偏移量,Kafka Connect 会记录下它最后处理的消息的偏移量,这样如果连接器重新启动或者发生故障,它可以从上次停止的地方继续消费数据,而不需要重新处理已经消费过的消息。
通常,Kafka Connect 会将这些偏移量存储在一个 Kafka 集群中的专用主题中,默认情况下是 __consumer_offsets,以保证每个连接器的消费进度持久化。
"enable.kafka.offset": false 的作用
当设置 "enable.kafka.offset": false 时,Kafka Connect 不会 使用 Kafka 自身的机制来存储偏移量,而是禁用偏移量存储功能。
这通常用于以下场景:
- 禁用偏移量存储
:如果你希望避免 Kafka Connect 自动管理和存储偏移量,可能因为一些特定的需求或者你希望通过其他方式(如自定义的数据库)来管理偏移量,你可以禁用这个功能。
- 临时性或者实验性用途
:在一些开发和测试场景下,禁用偏移量存储可以减少管理和资源的消耗,特别是当你只是临时运行一个任务并且不关心消费进度的时候。
使用场景
- 单次数据处理
:如果你仅仅需要从 Kafka 中读取一次数据,然后将其导入目标系统(比如数据库),并不关心偏移量的持久化和管理,可以使用 false 禁用偏移量存储。
- 自定义偏移量管理
:如果你希望将偏移量存储到不同的地方(比如数据库),而不是 Kafka 自带的 __consumer_offsets 主题,可以通过设置该参数为 false 来避免 Kafka Connect 自己的偏移量管理机制。
8."replacingmergetree.delete.column": "_sign",作用(sink处)
"replacingmergetree.delete.column": "_sign"
"replacingmergetree.delete.column": "_sign" 是 ClickHouse 中的一个配置项,用于 ReplacingMergeTree 引擎的表结构中,指定用于软删除(逻辑删除)的标记字段。
ReplacingMergeTree 引擎概述
在 ClickHouse 中,ReplacingMergeTree 是一种特殊的表引擎,它用于在合并过程中删除重复的行,通常适用于具有某种版本控制或更新操作的场景。每当相同主键的不同版本记录插入时,ClickHouse 会通过比较记录中的某些字段,保留最新的版本(如果设置了相关字段)。
_sign 字段与软删除(逻辑删除)
_sign 字段用于标记记录的删除状态。在 ReplacingMergeTree 引擎中,_sign 字段的值为:
- 1 表示该记录是最新的有效记录(即未删除)。
- -1 表示该记录已经被逻辑删除(已被标记为删除,通常是通过更新操作完成的)。
"replacingmergetree.delete.column": "_sign" 配置的作用
- 功能
:"replacingmergetree.delete.column": "_sign" 配置告诉 ClickHouse 在执行 ReplacingMergeTree 引擎的合并操作时,将 _sign 列视为 删除标记,即如果该字段值为 -1,则表示该行已被删除。
- 背景
:在 ReplacingMergeTree 表中,如果某条记录的 _sign 字段为 -1,ClickHouse 会在数据合并时,将其视为“已删除”记录,并在后续的合并操作中排除这些记录。因此,这个字段实际上提供了一种 软删除(逻辑删除)的机制,而不会真正删除该行数据(在物理存储中仍然存在,但逻辑上已经被删除)。
总结
当 Kafka Connect 同步数据时,重复数据问题可能是由以下几个因素引起的:
- 消费偏移量未正确提交,导致重复消费。
- 目标数据库中缺少唯一约束,导致重复插入。
- 配置错误导致 Source Connector 重复读取数据。
- 目标数据库的写入操作没有使用幂等性或 upsert。
你可以通过正确配置 偏移量管理、主键/唯一约束、幂等性写入 和 增量处理 等措施,来避免数据重复的问题。
如果问题依旧无法解决,可以进一步分析 Kafka Connect 的日志,查看是哪个环节导致了数据重复处理。
kafka-connect同步问题参考