修改maxwell代码,跳过DDL报错
issue如下:
注意: 我这里的修改在我的这个场景下是可以的,如果是更严格的场景可能需要慎重考虑。
DDL如下:
代码语言:txt复制alter table `db1`.`_tb1_new` change `name` `name` VARCHAR(32) not null comment 'name';
使用pt-osc的时候,发现maxwell出现报错并退出,报错内容如下:
代码语言:txt复制2014 [INFO] AppInfoParser: Kafka version: 2.7.0
2015 [INFO] AppInfoParser: Kafka commitId: 448719dc99a19793
2015 [INFO] AppInfoParser: Kafka startTimeMs: 1743140296555
2069 [INFO] Maxwell: Maxwell v1.40.5 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[binlog.001626:999417140], lastHeartbeat=0]
2344 [INFO] MysqlSavedSchema: Restoring schema id 4598 (last modified at Position[BinlogPosition[binlog.001626:999405799], lastHeartbeat=0])
2566 [INFO] Metadata: [Producer clientId=producer-1] Cluster ID: TsiZuIVUQkmzZeL4vdbINA
2743 [INFO] MysqlSavedSchema: Restoring schema id 4 (last modified at Position[BinlogPosition[binlog.000564:932304891], lastHeartbeat=0])
3332 [INFO] MysqlSavedSchema: beginning to play deltas...
3814 [INFO] MysqlSavedSchema: played 1219 deltas in 476ms
3849 [INFO] BinlogConnectorReplicator: Setting initial binlog pos to: binlog.001626:999417140
3871 [INFO] BinaryLogClient: Connected to 172.19.31.242:3306 at binlog.001626/999417140 (sid:13579, cid:289144103)
3871 [INFO] BinlogConnectorReplicator: Binlog connected.
4206 [ERROR] MysqlParserListener: (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (db_name (name (id `db1`))) . (name_all_tokens (id `_tb1_new`)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT 'name')
4207 [ERROR] SchemaChange: Error parsing SQL: 'ALTER TABLE `db1`.`_tb1_new` CHANGE COLUMN `name` `name` VARCHAR(32) CHARACTER SET UTF8MB4 NOT NULL DEFAULT _UTF8MB4'' COMMENT 'name''
4208 [ERROR] AbstractSchemaStore: Error on bin log position Position[BinlogPosition[binlog.001626:999417249], lastHeartbeat=0]
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93)
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17)
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101)
at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115)
at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49)
at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302)
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227)
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337)
4228 [INFO] BinlogConnectorReplicator: Binlog disconnected.
4229 [INFO] TaskManager: Stopping 4 tasks
4230 [ERROR] TaskManager: cause:
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93) ~[maxwell-1.40.5.jar:1.40.5]
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17) ~[antlr4-runtime-4.8-1.jar:4.8-1]
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[antlr4-runtime-4.8-1.jar:4.8-1]
at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337) ~[maxwell-1.40.5.jar:1.40.5]
4242 [INFO] TaskManager: Stopping: com.zendesk.maxwell.schema.PositionStoreThread@54866ff0
4242 [INFO] TaskManager: Stopping: com.zendesk.maxwell.producer.MaxwellKafkaProducerWorker@11a68543
4243 [INFO] KafkaProducer: [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
4249 [INFO] Metrics: Metrics scheduler closed
4249 [INFO] Metrics: Closing reporter org.apache.kafkamon.metrics.JmxReporter
4249 [INFO] Metrics: Metrics reporters closed
4250 [INFO] AppInfoParser: App info kafka.producer for producer-1 unregistered
4250 [INFO] TaskManager: Stopping: com.zendesk.maxwell.bootstrap.BootstrapController@2fa96e5b
4250 [INFO] TaskManager: Stopping: com.zendesk.maxwell.replication.BinlogConnectorReplicator@2bc879f6
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93)
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17)
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101)
at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115)
at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49)
at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235)
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302)
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227)
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337)
4251 [ERROR] Maxwell: Maxwell saw an exception and is exiting...
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93) ~[maxwell-1.40.5.jar:1.40.5]
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17) ~[antlr4-runtime-4.8-1.jar:4.8-1]
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[antlr4-runtime-4.8-1.jar:4.8-1]
at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227) ~[maxwell-1.40.5.jar:1.40.5]
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337) [maxwell-1.40.5.jar:1.40.5]
5658 [INFO] TaskManager: Stopped all tasks
临时绕过的方法:
代码语言:txt复制git clone下载代码
cd ~/maxwell-master/src/main/java/com/zendesk/maxwell/schema/ddl
vim SchemaChange.java , 将原先的throw (e)注释掉,改为continue让程序继续往下跑
while ( true ) {
try {
return parseSQL(currentDB, sql);
} catch ( ReparseSQLException e ) {
sql = e.getSQL();
LOGGER.debug("rewrote SQL to {}", sql);
// re-enter loop
} catch ( ParseCancellationException e ) {
if (LOGGER.isDebugEnabled()) {
// we are debug logging the toString message, slf4j will log the stacktrace of a throwable
String msg = e.toString();
LOGGER.debug("Parse cancelled: {}", msg);
}
return null;
} catch ( MaxwellSQLSyntaxError e) {
LOGGER.error("Error parsing SQL: '{}'", sql);
// 这里做了修改, 跳过报错,继续下一次循环尝试重新解析
// throw (e);
continue;
}
}
切换到高版本的jdk(jdk17)
然后执行编译,如果有test报错,可以编译的时候跳过
mvn clean package -Dmaven.test.skip=true
将生成的jar包拷贝出来
cd ~/maxwell-master/target/maxwell-1.43.2/maxwell-1.43.2
cp lib/maxwell-1.43.2.jar /usr/local/maxwell-1.43.2/lib
再次启动maxwell,执行之前的报错的DDL,可以看到日志里面会有如下的Skipping的提示,但是maxwell不会崩溃掉:
2025-03-31 18:36:51 ERROR MysqlParserListener - (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (name (id tt)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT '姓名')
2025-03-31 18:36:51 WARN MysqlParserListener - Skipping error node: ''
2025-03-31 18:36:51 ERROR MysqlParserListener - (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (name (id tt)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT '姓名')
2025-03-31 18:36:51 WARN MysqlParserListener - Skipping error node: COMMENT
2025-03-31 18:36:51 ERROR MysqlParserListener - (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (name (id tt)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT '姓名')
2025-03-31 18:36:51 WARN MysqlParserListener - Skipping error node: '姓名'
下面是一个例子,使用修改后的maxwell去消费binlog,不再出现报错,具体操作如下:
代码语言:txt复制-- 创建一个空的表用于演示
use test;
CREATE TABLE `t2` (
`id` int NOT NULL AUTO_INCREMENT,
`age` int DEFAULT NULL,
`name` varchar(100) DEFAULT NULL,
`aaaaa` char(32) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
-- 使用pt-osc,将aaaa列改为varchar类型
pt-online-schema-change \
-udts -h 192.168.3.14 --password='dts' \
--alter="CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'xxxxxx' " \
D=test,t=t2 \
--no-check-replication-filters --alter-foreign-keys-method=rebuild_constraints --recursion-method=none \
--print --charset=utf8 --execute
-- 查看ddl的日志
$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic binlog_test_ddl --from-beginning
控制台输出的日志如下:
{"type":"table-create","database":"test","table":"_t2_new","def":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"CREATE TABLE `test`.`_t2_new` (\n `id` int NOT NULL AUTO_INCREMENT,\n `age` int DEFAULT NULL,\n `name` varchar(100) DEFAULT NULL,\n `aaaaa` char(32) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci","position":"binlog.000006:1641878"}
{"type":"table-alter","database":"test","table":"_t2_new","old":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"def":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"varchar","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"ALTER TABLE `test`.`_t2_new` CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'xxxxxx'","position":"binlog.000006:1643722"}
{"type":"table-alter","database":"test","table":"t2","old":{"database":"test","charset":"utf8mb4","table":"t2","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"def":{"database":"test","charset":"utf8mb4","table":"_t2_old","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`","position":"binlog.000006:1647578"}
{"type":"table-alter","database":"test","table":"_t2_new","old":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"varchar","name":"aaaaa","charset":"utf8mb4"}]},"def":{"database":"test","charset":"utf8mb4","table":"t2","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"varchar","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`","position":"binlog.000006:1647578"}
{"type":"table-drop","database":"test","table":"_t2_old","ts":1743428311000,"sql":"DROP TABLE IF EXISTS `_t2_old` /* generated by server */","position":"binlog.000006:1651027"}
简化后如下:
CREATE TABLE `test`.`_t2_new` (
`id` int NOT NULL AUTO_INCREMENT,
`age` int DEFAULT NULL,
`name` varchar(100) DEFAULT NULL,
`aaaaa` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名',
`aaaaaa` char(10) DEFAULT NULL,
`aaaaaaaa` char(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
ALTER TABLE `test`.`_t2_new` CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET UTF8MB4 NOT NULL COMMENT 'xxxxxx';
-- 注意这里输出了rename操作2次,实际上在maxwell里面记录是2次(2次表元def的改变)
RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`;
RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`;
DROP TABLE IF EXISTS `_t2_old` /* generated by server */;
如果转为更便于查看的样式,则大致如下:
[
{
"type": "table-create",
"database": "test",
"table": "_t2_new",
"def": {
"database": "test",
"charset": "utf8mb4",
"table": "_t2_new",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "char",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"ts": 1743428311000,
"sql": "CREATE TABLE `test`.`_t2_new` (\n `id` int NOT NULL AUTO_INCREMENT,\n `age` int DEFAULT NULL,\n `name` varchar(100) DEFAULT NULL,\n `aaaaa` char(32) DEFAULT NULL,\n PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"position": "binlog.000006:1641878"
},
{
"type": "table-alter",
"database": "test",
"table": "_t2_new",
"old": {
"database": "test",
"charset": "utf8mb4",
"table": "_t2_new",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "char",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"def": {
"database": "test",
"charset": "utf8mb4",
"table": "_t2_new",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "varchar",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"ts": 1743428311000,
"sql": "ALTER TABLE `test`.`_t2_new` CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'xxxxxx'",
"position": "binlog.000006:1643722"
},
{
"type": "table-alter",
"database": "test",
"table": "t2",
"old": {
"database": "test",
"charset": "utf8mb4",
"table": "t2",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "char",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"def": {
"database": "test",
"charset": "utf8mb4",
"table": "_t2_old",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "char",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"ts": 1743428311000,
"sql": "RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`",
"position": "binlog.000006:1647578"
},
{
"type": "table-alter",
"database": "test",
"table": "_t2_new",
"old": {
"database": "test",
"charset": "utf8mb4",
"table": "_t2_new",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "varchar",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"def": {
"database": "test",
"charset": "utf8mb4",
"table": "t2",
"primary-key": [
"id"
],
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "int",
"name": "age",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8mb4"
},
{
"type": "varchar",
"name": "aaaaa",
"charset": "utf8mb4"
}
]
},
"ts": 1743428311000,
"sql": "RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`",
"position": "binlog.000006:1647578"
},
{
"type": "table-drop",
"database": "test",
"table": "_t2_old",
"ts": 1743428311000,
"sql": "DROP TABLE IF EXISTS `_t2_old` /* generated by server */",
"position": "binlog.000006:1651027"
}
]
其他的代码修改:
代码语言:txt复制报错场景:
2025-04-01 09:49:40 ERROR Maxwell - Maxwell saw an exception and is exiting...
com.zendesk.maxwell.schema.ddl.InvalidSchemaError: Couldn't find table 'tb6' in database test
at com.zendesk.maxwell.schema.Database.findTableOrThrow(Database.java:55) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.schema.ddl.TableCreate.resolveLikeTable(TableCreate.java:54) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.schema.ddl.TableCreate.resolve(TableCreate.java:40) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.schema.ddl.TableCreate.resolve(TableCreate.java:12) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:58) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227) ~[maxwell-1.43.2.jar:1.43.2]
at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337) [maxwell-1.43.2.jar:1.43.2]
报错原因:
代码语言:txt复制tb6表没有在 maxwell的元数据里面(例如maxwell崩溃了,然后人为改大了binlog_position里面的值,但是tables表里面数据是有缺失的),当后续有binlog涉及到tb6这个表,就会导致maxwell崩溃退出。
临时修复方法
代码语言:txt复制 vim ~/maxwell-master/src/main/java/com/zendesk/maxwell/schema/Database.java +55 改动如下(将抛出异常改为控制台输出)
// 找不到表都不崩溃,继续执行
public Table findTableOrThrow(String table) throws InvalidSchemaError {
Table t = findTable(table);
if ( t == null )
// throw new InvalidSchemaError("Couldn't find table '" + table + "'" + " in database " + this.name);
System.out.println("WARN, InvalidSchemaError, Couldn't find table '" + table + "'" + " in maxwell metadata database " + this.name);
return null;
return t;
}
vim ~/maxwell-master/src/main/java/com/zendesk/maxwell/replication/TableCache.java 改动如下(将抛出异常改为控制台输出)
public void processEvent(Schema schema, Filter filter, Boolean ignoreMissingSchema, Long tableId, String dbName, String tblName) {
if ( !tableMapCache.containsKey(tableId)) {
if ( filter.isTableBlacklisted(dbName, tblName) ) {
return;
}
Database db = schema.findDatabase(dbName);
if ( db == null ) {
if ( !ignoreMissingSchema || filter.includes(dbName, tblName) )
throw new RuntimeException("Couldn't find database " + dbName);
} else {
Table tbl = db.findTable(tblName);
if (tbl == null) {
// 找不到表都不崩溃,继续执行
if ( !ignoreMissingSchema || filter.includes(dbName, tblName) )
// throw new RuntimeException("Couldn't find table " + tblName + " in database " + dbName);
System.out.println("RuntimeException, Couldn't find table " + tblName + " in database " + dbName);
} else {
tableMapCache.put(tableId, tbl);
}
}
}
}
发布评论