​kafka-connect同步问题参考

sink和source的脚本参考以下两个

 

source的脚本


set -x
if [ ! $1 ];then
   echo "Please specify a configuration file."
   exit
fi

source $1

echo "register Postgres connector"
cat <<EOF | curl --request POST --url "http://127.0.0.1:8084/connectors" --header 'Content-Type: application/json' --data @-
{
  "name": "${SOURCE_CONNECTOR_NAME}",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": 1,
    "database.hostname": "${POSTGRES_HOST}",
    "database.port": ${POSTGRES_PORT},
    "database.user": "${POSTGRES_USER}",
    "database.password": "${POSTGRES_PASSWORD}",
    "topic.prefix": "${TOPIC_PREFIX}",
    "database.dbname": "${DATABASE_DBNAME}",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "all_tables",
    "producer.override.max.request.size": "524288000",
    "decimal.handling.mode": "double",

    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "${TOPIC_PREFIX}.schema-changes",
    "incrementing.column.name": "id",   
    "timestamp.column.name": "updated_at",
    "max.retries": 3,             
    "retry.backoff.ms": 1000       
  }
}
EOF

sink的脚本

set -x


if [ ! $1 ];then

   echo "Please specify a configuration file."

   exit

fi



source $1


echo "register clickhouse sink connector"

cat <<EOF | curl --request POST --url "http://127.0.0.1:18084/connectors" --header 'Content-Type: application/json' --data @-

{

    "name": "${SINK_CONNECTOR_NAME}",

    "config": {

      "connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",

      "tasks.max": "1",

      "topics.regex": "${TOPICS_REGEX}",

      "clickhouse.server.url": "${CLICKHOUSE_HOST}",

      "clickhouse.server.user": "${CLICKHOUSE_USER}",

      "clickhouse.server.password": "${CLICKHOUSE_PASSWORD}",

      "clickhouse.server.database": "${CLICKHOUSE_DATABASE}",

      "clickhouse.server.port": ${CLICKHOUSE_PORT},



      "store.kafka.metadata": true,

      "topic.creation.default.partitions": 6,


      "store.raw.data": false,

      "store.raw.data.column": "raw_data",


      "metrics.enable": true,

      "metrics.port": 8084,

      "buffer.flush.time.ms": 500,

      "thread.pool.size": 2,

      "sink.connector.max.queue.size": "200",

      "fetch.max.wait.ms": 1000,

      "fetch.min.bytes": 52428800,

      "consumer.override.max.partition.fetch.bytes": "5242880",


      "enable.kafka.offset": true,


      "replacingmergetree.delete.column": "_sign",


      "auto.create.tables": true,

      "schema.evolution": false,


      "deduplication.policy": "off",

      "pk.mode": "record_value",   

      "pk.fields": "id",          

      "insert.mode": "upsert",

      "max.retries": 3,             

      "retry.backoff.ms": 1000      

    }

}

EOF


在使用 Kafka Connect 同步数据时,出现目标数据库数据重复的问题通常是因为以下几个原因,通常与 幂等性 或 消费偏移量 的处理有关。下面是一些常见的原因和解决方案:

1. Kafka Connect 消费者偏移量(Offsets)未正确管理

Kafka Connect 会使用 消费偏移量 来记录每个任务(Source Connector 或 Sink Connector)处理到的消息的位置。如果偏移量管理不当,可能导致 重复消费。

解决方法:

  • 确保目标数据库支持幂等性操作:如果目标数据库的连接器(例如,JDBC Sink Connector)不支持幂等性,你可能会遇到重复数据问题。你可以考虑在目标数据库上配置适当的主键,或者使用支持幂等性的操作(例如,使用 upsert 或 insert ... on duplicate key update 语句)。
  • 启用幂等性:对于某些 Kafka Connect 连接器(比如 JDBC Sink Connector),可以配置 insert.mode 为 upsert 或 insert,以便处理重复数据。例如,使用 upsert 模式时,Kafka Connect 会尝试更新现有记录而不是插入新的记录,从而避免重复数据。

在 JDBC Sink Connector 的配置文件中,你可以设置:

jsonCopyEdit"insert.mode": "upsert", // 使用 upsert 来避免重复数据

2. Kafka Connect 的 “批量提交” 配置

Kafka Connect 在同步数据时,通常会以批量的方式将数据写入目标数据库。如果批量提交失败,或者 Kafka Connect 没有正确记录已经处理过的偏移量,那么它可能会重复处理相同的数据。

解决方法:

  • 调整 batch.size 和 flush.size 配置,确保批量提交的大小合适,并且 Kafka Connect 在每次成功写入目标数据库后,能够及时提交偏移量。
  • 检查 poll.interval.ms 配置,确保 Kafka Connect 正常轮询并提交偏移量。
3. 目标数据库中的主键或唯一约束缺失

如果目标数据库表中没有合适的主键或者唯一约束,Kafka Connect 会将每一条数据作为一条新的记录插入,而不会检测到数据是否已经存在。

解决方法:

  • 确保目标数据库表有主键或者唯一约束。Kafka Connect 会使用这些主键/约束来判断是否应该插入数据或更新现有记录。
  • 如果使用的是 JDBC Sink Connector,请确保 pk.mode 和 pk.fields 设置正确,指明哪个字段是主键,并配置 insert.mode 为 upsert。

"pk.mode": "record_value", // 主键字段配置 "pk.fields": "id", // 设置主键字段 "insert.mode": "upsert", // 使用 upsert 模式避免重复

4. Source Connector 数据重复

有时数据重复并不是目标数据库的问题,而是 Source Connector 本身在读取数据时重复读取了同一条数据。可能是因为 Kafka Connect 未能正确处理消费偏移量,或者 Source Connector 没有正确标记消息已经被处理。

解决方法:

  • 检查 Source Connector 的偏移量提交配置:确保 Source Connector 的偏移量提交配置(如 offset.storage.topic)正确,并且 Kafka Connect 能够在成功消费数据后正确提交偏移量。
  • 如果你使用的是 JDBC Source Connector,可以考虑启用 timestamp.column.name 或者 incrementing.column.name 来确保数据按时间顺序正确处理。

"incrementing.column.name": "id", // 使用增量列确保数据的唯一性 "timestamp.column.name": "updated_at" // 使用时间戳列确保数据按时间顺序处理

5. 处理时延或幂等性问题(source和sink都插入)

目标数据库可能由于并发或者网络延迟等问题,导致重复插入数据。特别是在高负载或大量数据同步的情况下,可能会出现数据写入延迟,或者提交偏移量时失败,导致重复处理。

解决方法:

  • 确保 Kafka Connect 配置幂等性支持:对于一些数据库(例如,使用 JDBC Sink Connector),可以通过使用 幂等性插入 或 upsert 来避免数据重复。
  • 配置 max.retries 和 retry.backoff.ms,在遇到暂时的写入失败时,Kafka Connect 会重试写入操作,避免因网络或数据库故障导致的数据丢失或重复。

"max.retries": 3, // 配置重试次数 "retry.backoff.ms": 1000 // 配置重试间隔

6. 使用幂等性机制(幂等插入)

一些数据库和 Kafka Connect 连接器支持 幂等性,即在插入数据时,系统会自动检查数据是否已存在,并避免重复插入。

对于 JDBC Sink Connector,你可以使用 upsert(更新或插入)操作,确保在数据库中遇到重复记录时进行更新而不是插入新的记录。具体可以设置:

"insert.mode": "upsert", // 使用 upsert 模式避免重复插入 "pk.mode": "record_value", // 指定主键字段 "pk.fields": "id", // 设置主键字段

7.启用偏移(sink处)

"enable.kafka.offset": true

"enable.kafka.offset": false 是 Kafka Connect 配置中的一个参数,主要用于控制是否启用 Kafka 消费者偏移量(offset)的存储方式。

Kafka Connect 消费者偏移量的作用

在 Kafka Connect 中,偏移量(offset)用于跟踪每个连接器(如 Source Connector)处理的 Kafka 消息的位置。每个消息在 Kafka 中都有一个唯一的偏移量,Kafka Connect 会记录下它最后处理的消息的偏移量,这样如果连接器重新启动或者发生故障,它可以从上次停止的地方继续消费数据,而不需要重新处理已经消费过的消息。

通常,Kafka Connect 会将这些偏移量存储在一个 Kafka 集群中的专用主题中,默认情况下是 __consumer_offsets,以保证每个连接器的消费进度持久化。

"enable.kafka.offset": false 的作用

当设置 "enable.kafka.offset": false 时,Kafka Connect 不会 使用 Kafka 自身的机制来存储偏移量,而是禁用偏移量存储功能。

这通常用于以下场景:

  1. 禁用偏移量存储

:如果你希望避免 Kafka Connect 自动管理和存储偏移量,可能因为一些特定的需求或者你希望通过其他方式(如自定义的数据库)来管理偏移量,你可以禁用这个功能。

  1. 临时性或者实验性用途

:在一些开发和测试场景下,禁用偏移量存储可以减少管理和资源的消耗,特别是当你只是临时运行一个任务并且不关心消费进度的时候。

使用场景

  • 单次数据处理

:如果你仅仅需要从 Kafka 中读取一次数据,然后将其导入目标系统(比如数据库),并不关心偏移量的持久化和管理,可以使用 false 禁用偏移量存储。

  • 自定义偏移量管理

:如果你希望将偏移量存储到不同的地方(比如数据库),而不是 Kafka 自带的 __consumer_offsets 主题,可以通过设置该参数为 false 来避免 Kafka Connect 自己的偏移量管理机制。

8."replacingmergetree.delete.column": "_sign",作用(sink处)

"replacingmergetree.delete.column": "_sign"

"replacingmergetree.delete.column": "_sign" 是 ClickHouse 中的一个配置项,用于 ReplacingMergeTree 引擎的表结构中,指定用于软删除(逻辑删除)的标记字段。

ReplacingMergeTree 引擎概述

在 ClickHouse 中,ReplacingMergeTree 是一种特殊的表引擎,它用于在合并过程中删除重复的行,通常适用于具有某种版本控制或更新操作的场景。每当相同主键的不同版本记录插入时,ClickHouse 会通过比较记录中的某些字段,保留最新的版本(如果设置了相关字段)。

_sign 字段与软删除(逻辑删除)

_sign 字段用于标记记录的删除状态。在 ReplacingMergeTree 引擎中,_sign 字段的值为:

  • 1 表示该记录是最新的有效记录(即未删除)。
  • -1 表示该记录已经被逻辑删除(已被标记为删除,通常是通过更新操作完成的)。

"replacingmergetree.delete.column": "_sign" 配置的作用

  • 功能

:"replacingmergetree.delete.column": "_sign" 配置告诉 ClickHouse 在执行 ReplacingMergeTree 引擎的合并操作时,将 _sign 列视为 删除标记,即如果该字段值为 -1,则表示该行已被删除。

  • 背景

:在 ReplacingMergeTree 表中,如果某条记录的 _sign 字段为 -1,ClickHouse 会在数据合并时,将其视为“已删除”记录,并在后续的合并操作中排除这些记录。因此,这个字段实际上提供了一种 软删除(逻辑删除)的机制,而不会真正删除该行数据(在物理存储中仍然存在,但逻辑上已经被删除)。

总结

当 Kafka Connect 同步数据时,重复数据问题可能是由以下几个因素引起的:

  1. 消费偏移量未正确提交,导致重复消费。
  2. 目标数据库中缺少唯一约束,导致重复插入。
  3. 配置错误导致 Source Connector 重复读取数据。
  4. 目标数据库的写入操作没有使用幂等性或 upsert。

你可以通过正确配置 偏移量管理、主键/唯一约束、幂等性写入 和 增量处理 等措施,来避免数据重复的问题。

如果问题依旧无法解决,可以进一步分析 Kafka Connect 的日志,查看是哪个环节导致了数据重复处理。



AI/BI
​kafka-connect同步问题参考
刘付国瑞 2025年2月20日
分析这篇文章

存档
登录 留下评论
启动superset