运行环境
Flink version:1.13.3
Flink CDC version:2.2.0
Doris version:PALO-0.15.1-rc09
doris-flink-connector version:1.13.5-2.12-SNAPSHOT
同步方案
- Sql-Client 开发:当数据库实例、表数量过多,单实例单表最少占用 1 个 solt,存在浪费资源, Job 太多难以统一管理
- DataStream 开发:整库同步,需要自行编写下游处理逻辑;可以做到资源弹性,具体参考Flink Doris Connector
选择 DataStream 开发,根据表 Schema 将如下 changelog 转换为 RowData
{
"before": null,
"after": {
"id": 1,
"name": "name1",
"age": "1"
},
"source": {
"version": "1.5.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1689245998000,
"snapshot": "false",
"db": "cc",
"sequence": null,
"table": "student",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 2781,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1689246005928,
"transaction": null
}
// get data by option
if ("d".equals(op)) {
data = object.getJSONObject("before");
rowData.setRowKind(RowKind.DELETE);
} else {
data = object.getJSONObject("after");
rowData.setRowKind(RowKind.INSERT);
}
问题列表
-
Doris 数据中文乱码问题
# 任务提交参数指定 -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"
-
A slave with the same server_uuid/server_id as this slave has connected to the maste
作业里使用的 server id 和其他作业或其他同步工具使用的 server id 冲突了,server id 需要全局唯一,根据并行度设置,比如作业的 source 设置成了四个并发,可以配置 ‘server-id’ = ‘5001-5004’
-
MySql 数据类型 bigint 和 decimal 乱码
// 需要设置 debezium 参数 properties.setProperty("bigint.unsigned.handling.mode", "long"); properties.setProperty("decimal.handling.mode", "string");
-
CDC 已经进入增量阶段,需要新增加表(2.2.0 支持)
MySqlSource.<String>builder().scanNewlyAddedTableEnabled(true)
-
Doris 连接出现权限问题,无法插入数据
DorisSink 连接用户至少需要 show backends 权限,随机选择 BE
-
导入数据报错 “The connection property zeroDateTimeBehavior only accepts values of the form: exception, round or convertToNull.The value CONVERT_TO_NULL is not in this set”
修改源码
PooledDataSourceFactory.java
中CONVERT_TO_NULL
改为convertToNull
-
Source Coordinator Thread already exists. There should never be more than one thread driving the actions of a Source Coordinator
原因是 jm 的 java.lang.OutOfMemoryError: Java heap space。即使在全量阶段加大资源,进入增量后如果缩小 jm 资源,问题还是存在。参考 jira issue
-
Failed to deserialize data of EventHeaderV4{timestamp=1542193955000, eventType=GTID, serverId=91111, headerLength=19, dataLength=46, nextPosition=1058898202, flags=0}
set global slave_net_timeout = 120; (default was 30sec) set global thread_pool_idle_timeout = 120;
-
Producer attempted an operation with an old epoch.Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.
- 每个算子使用不同的 transactionalId, 每次都给算子指定不会与别人冲突的 name 和 uid 值。 参考
- 排查是否由于负载太高导致事务超时。
-
org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs …, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
常见原因参考 阿里云 Flink CDC 常见问题 ,还有一种是
Some of the GTIDs needed to replicate have been already purged
导致,这个参考 github issue, 升级 Flink-CDC 版本解决。