修改maxwell代码,跳过DDL报错

issue如下:

注意: 我这里的修改在我的这个场景下是可以的,如果是更严格的场景可能需要慎重考虑。

DDL如下:

代码语言:txt复制
alter table `db1`.`_tb1_new` change `name` `name` VARCHAR(32) not null comment 'name';

使用pt-osc的时候,发现maxwell出现报错并退出,报错内容如下:

代码语言:txt复制
2014 [INFO] AppInfoParser: Kafka version: 2.7.0 
2015 [INFO] AppInfoParser: Kafka commitId: 448719dc99a19793
2015 [INFO] AppInfoParser: Kafka startTimeMs: 1743140296555
2069 [INFO] Maxwell: Maxwell v1.40.5 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[binlog.001626:999417140], lastHeartbeat=0]
2344 [INFO] MysqlSavedSchema: Restoring schema id 4598 (last modified at Position[BinlogPosition[binlog.001626:999405799], lastHeartbeat=0])
2566 [INFO] Metadata: [Producer clientId=producer-1] Cluster ID: TsiZuIVUQkmzZeL4vdbINA
2743 [INFO] MysqlSavedSchema: Restoring schema id 4 (last modified at Position[BinlogPosition[binlog.000564:932304891], lastHeartbeat=0])
3332 [INFO] MysqlSavedSchema: beginning to play deltas...
3814 [INFO] MysqlSavedSchema: played 1219 deltas in 476ms 
3849 [INFO] BinlogConnectorReplicator: Setting initial binlog pos to: binlog.001626:999417140
3871 [INFO] BinaryLogClient: Connected to 172.19.31.242:3306 at binlog.001626/999417140 (sid:13579, cid:289144103)
3871 [INFO] BinlogConnectorReplicator: Binlog connected.
4206 [ERROR] MysqlParserListener: (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (db_name (name (id `db1`))) . (name_all_tokens (id `_tb1_new`)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT 'name')
4207 [ERROR] SchemaChange: Error parsing SQL: 'ALTER TABLE `db1`.`_tb1_new` CHANGE COLUMN `name` `name` VARCHAR(32) CHARACTER SET UTF8MB4 NOT NULL DEFAULT _UTF8MB4'' COMMENT 'name''
4208 [ERROR] AbstractSchemaStore: Error on bin log position Position[BinlogPosition[binlog.001626:999417249], lastHeartbeat=0]
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
    at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101)
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115)
    at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49)
    at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102)
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385)
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407)                                                                                                                                   
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738)
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235)
    at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
    at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302)
    at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227)
    at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337)
4228 [INFO] BinlogConnectorReplicator: Binlog disconnected.
4229 [INFO] TaskManager: Stopping 4 tasks 
4230 [ERROR] TaskManager: cause: 
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
    at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93) ~[maxwell-1.40.5.jar:1.40.5]
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17) ~[antlr4-runtime-4.8-1.jar:4.8-1]
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[antlr4-runtime-4.8-1.jar:4.8-1]
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337) ~[maxwell-1.40.5.jar:1.40.5]
4242 [INFO] TaskManager: Stopping: com.zendesk.maxwell.schema.PositionStoreThread@54866ff0
4242 [INFO] TaskManager: Stopping: com.zendesk.maxwell.producer.MaxwellKafkaProducerWorker@11a68543
4243 [INFO] KafkaProducer: [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
4249 [INFO] Metrics: Metrics scheduler closed
4249 [INFO] Metrics: Closing reporter org.apache.kafkamon.metrics.JmxReporter
4249 [INFO] Metrics: Metrics reporters closed
4250 [INFO] AppInfoParser: App info kafka.producer for producer-1 unregistered
4250 [INFO] TaskManager: Stopping: com.zendesk.maxwell.bootstrap.BootstrapController@2fa96e5b
4250 [INFO] TaskManager: Stopping: com.zendesk.maxwell.replication.BinlogConnectorReplicator@2bc879f6
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
    at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17)
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28)
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101)
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115)
    at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49)
    at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102)
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385)
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407)
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738)
at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235)
    at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34)
    at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302)
    at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227)
    at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337)
4251 [ERROR] Maxwell: Maxwell saw an exception and is exiting...
com.zendesk.maxwell.schema.ddl.MaxwellSQLSyntaxError: ''
    at com.zendesk.maxwell.schema.ddl.MysqlParserListener.visitErrorNode(MysqlParserListener.java:93) ~[maxwell-1.40.5.jar:1.40.5]
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:17) ~[antlr4-runtime-4.8-1.jar:4.8-1]
    at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:28) ~[antlr4-runtime-4.8-1.jar:4.8-1]
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parseSQL(SchemaChange.java:101) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.schema.ddl.SchemaChange.parse(SchemaChange.java:115) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:49) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227) ~[maxwell-1.40.5.jar:1.40.5]
    at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337) [maxwell-1.40.5.jar:1.40.5]
5658 [INFO] TaskManager: Stopped all tasks

临时绕过的方法:

代码语言:txt复制
git clone下载代码

cd ~/maxwell-master/src/main/java/com/zendesk/maxwell/schema/ddl

vim  SchemaChange.java  , 将原先的throw (e)注释掉,改为continue让程序继续往下跑

        while ( true ) { 
            try {
                return parseSQL(currentDB, sql);
            } catch ( ReparseSQLException e ) { 
                sql = e.getSQL();
                LOGGER.debug("rewrote SQL to {}", sql);
                // re-enter loop
            } catch ( ParseCancellationException e ) { 
                if (LOGGER.isDebugEnabled()) {
                    // we are debug logging the toString message, slf4j will log the stacktrace of a throwable
                    String msg = e.toString();
                    LOGGER.debug("Parse cancelled: {}", msg);
                }   
                return null;
            } catch ( MaxwellSQLSyntaxError e) {
                LOGGER.error("Error parsing SQL: '{}'", sql);
                // 这里做了修改, 跳过报错,继续下一次循环尝试重新解析  
                // throw (e);                                                                                                                                                       
                continue;
            }   
        } 
        

切换到高版本的jdk(jdk17)

然后执行编译,如果有test报错,可以编译的时候跳过
mvn clean package -Dmaven.test.skip=true

将生成的jar包拷贝出来
cd ~/maxwell-master/target/maxwell-1.43.2/maxwell-1.43.2
cp lib/maxwell-1.43.2.jar /usr/local/maxwell-1.43.2/lib 


再次启动maxwell,执行之前的报错的DDL,可以看到日志里面会有如下的Skipping的提示,但是maxwell不会崩溃掉:
2025-03-31 18:36:51 ERROR MysqlParserListener - (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (name (id tt)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT '姓名')
2025-03-31 18:36:51 WARN  MysqlParserListener - Skipping error node: ''
2025-03-31 18:36:51 ERROR MysqlParserListener - (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (name (id tt)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT '姓名')
2025-03-31 18:36:51 WARN  MysqlParserListener - Skipping error node: COMMENT
2025-03-31 18:36:51 ERROR MysqlParserListener - (parse (statement (alter_table (alter_table_preamble ALTER TABLE (table_name (name (id tt)))) (alter_specifications (alter_specification (change_column CHANGE COLUMN (full_column_name (name (id `name`))) (column_definition (name (id `name`)) (data_type (string_type VARCHAR (length ( 32 )) (column_options (charset_def (character_set CHARACTER SET (charset_name UTF8MB4)))) (column_options (nullability NOT NULL)) (column_options (default_value DEFAULT (literal_with_weirdo_multistring (byte_literal _UTF8MB4)))))))))))) '' COMMENT '姓名')
2025-03-31 18:36:51 WARN  MysqlParserListener - Skipping error node: '姓名'

下面是一个例子,使用修改后的maxwell去消费binlog,不再出现报错,具体操作如下:

代码语言:txt复制
-- 创建一个空的表用于演示
use test;
CREATE TABLE `t2` (
  `id` int NOT NULL AUTO_INCREMENT,
  `age` int DEFAULT NULL,
  `name` varchar(100) DEFAULT NULL,
  `aaaaa` char(32)  DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;



-- 使用pt-osc,将aaaa列改为varchar类型
pt-online-schema-change \
-udts -h 192.168.3.14 --password='dts' \
--alter="CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'xxxxxx' " \
D=test,t=t2 \
--no-check-replication-filters --alter-foreign-keys-method=rebuild_constraints --recursion-method=none \
--print --charset=utf8 --execute



-- 查看ddl的日志
$ ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic binlog_test_ddl --from-beginning
控制台输出的日志如下:
{"type":"table-create","database":"test","table":"_t2_new","def":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"CREATE TABLE `test`.`_t2_new` (\n  `id` int NOT NULL AUTO_INCREMENT,\n  `age` int DEFAULT NULL,\n  `name` varchar(100) DEFAULT NULL,\n  `aaaaa` char(32) DEFAULT NULL,\n  PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci","position":"binlog.000006:1641878"}
{"type":"table-alter","database":"test","table":"_t2_new","old":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"def":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"varchar","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"ALTER TABLE `test`.`_t2_new` CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'xxxxxx'","position":"binlog.000006:1643722"}
{"type":"table-alter","database":"test","table":"t2","old":{"database":"test","charset":"utf8mb4","table":"t2","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"def":{"database":"test","charset":"utf8mb4","table":"_t2_old","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"char","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`","position":"binlog.000006:1647578"}
{"type":"table-alter","database":"test","table":"_t2_new","old":{"database":"test","charset":"utf8mb4","table":"_t2_new","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"varchar","name":"aaaaa","charset":"utf8mb4"}]},"def":{"database":"test","charset":"utf8mb4","table":"t2","primary-key":["id"],"columns":[{"type":"int","name":"id","signed":true},{"type":"int","name":"age","signed":true},{"type":"varchar","name":"name","charset":"utf8mb4"},{"type":"varchar","name":"aaaaa","charset":"utf8mb4"}]},"ts":1743428311000,"sql":"RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`","position":"binlog.000006:1647578"}
{"type":"table-drop","database":"test","table":"_t2_old","ts":1743428311000,"sql":"DROP TABLE IF EXISTS `_t2_old` /* generated by server */","position":"binlog.000006:1651027"}


简化后如下:
CREATE TABLE `test`.`_t2_new` (
 `id` int NOT NULL AUTO_INCREMENT,
 `age` int DEFAULT NULL,
  `name` varchar(100) DEFAULT NULL,
  `aaaaa` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名',
 `aaaaaa` char(10) DEFAULT NULL,
  `aaaaaaaa` char(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

ALTER TABLE `test`.`_t2_new` CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET UTF8MB4 NOT NULL COMMENT 'xxxxxx';

-- 注意这里输出了rename操作2次,实际上在maxwell里面记录是2次(2次表元def的改变)
RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`;
RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`;

DROP TABLE IF EXISTS `_t2_old` /* generated by server */;


如果转为更便于查看的样式,则大致如下:
[
    {
        "type": "table-create",
        "database": "test",
        "table": "_t2_new",
        "def": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "_t2_new",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "char",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "ts": 1743428311000,
        "sql": "CREATE TABLE `test`.`_t2_new` (\n  `id` int NOT NULL AUTO_INCREMENT,\n  `age` int DEFAULT NULL,\n  `name` varchar(100) DEFAULT NULL,\n  `aaaaa` char(32) DEFAULT NULL,\n  PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
        "position": "binlog.000006:1641878"
    },
    {
        "type": "table-alter",
        "database": "test",
        "table": "_t2_new",
        "old": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "_t2_new",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "char",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "def": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "_t2_new",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "varchar",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "ts": 1743428311000,
        "sql": "ALTER TABLE `test`.`_t2_new` CHANGE column aaaaa aaaaa VARCHAR(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'xxxxxx'",
        "position": "binlog.000006:1643722"
    },
    {
        "type": "table-alter",
        "database": "test",
        "table": "t2",
        "old": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "t2",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "char",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "def": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "_t2_old",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "char",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "ts": 1743428311000,
        "sql": "RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`",
        "position": "binlog.000006:1647578"
    },
    {
        "type": "table-alter",
        "database": "test",
        "table": "_t2_new",
        "old": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "_t2_new",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "varchar",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "def": {
            "database": "test",
            "charset": "utf8mb4",
            "table": "t2",
            "primary-key": [
                "id"
            ],
            "columns": [
                {
                    "type": "int",
                    "name": "id",
                    "signed": true
                },
                {
                    "type": "int",
                    "name": "age",
                    "signed": true
                },
                {
                    "type": "varchar",
                    "name": "name",
                    "charset": "utf8mb4"
                },
                {
                    "type": "varchar",
                    "name": "aaaaa",
                    "charset": "utf8mb4"
                }
            ]
        },
        "ts": 1743428311000,
        "sql": "RENAME TABLE `test`.`t2` TO `test`.`_t2_old`, `test`.`_t2_new` TO `test`.`t2`",
        "position": "binlog.000006:1647578"
    },
    {
        "type": "table-drop",
        "database": "test",
        "table": "_t2_old",
        "ts": 1743428311000,
        "sql": "DROP TABLE IF EXISTS `_t2_old` /* generated by server */",
        "position": "binlog.000006:1651027"
    }
]

其他的代码修改:

代码语言:txt复制
报错场景:

2025-04-01 09:49:40 ERROR Maxwell - Maxwell saw an exception and is exiting...
com.zendesk.maxwell.schema.ddl.InvalidSchemaError: Couldn't find table 'tb6' in database test
	at com.zendesk.maxwell.schema.Database.findTableOrThrow(Database.java:55) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.schema.ddl.TableCreate.resolveLikeTable(TableCreate.java:54) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.schema.ddl.TableCreate.resolve(TableCreate.java:40) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.schema.ddl.TableCreate.resolve(TableCreate.java:12) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.schema.AbstractSchemaStore.resolveSQL(AbstractSchemaStore.java:58) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.schema.MysqlSchemaStore.processSQL(MysqlSchemaStore.java:102) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:385) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.replication.BinlogConnectorReplicator.processQueryEvent(BinlogConnectorReplicator.java:407) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.replication.BinlogConnectorReplicator.getRow(BinlogConnectorReplicator.java:738) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.replication.BinlogConnectorReplicator.work(BinlogConnectorReplicator.java:235) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.util.RunLoopProcess.runLoop(RunLoopProcess.java:34) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.Maxwell.startInner(Maxwell.java:302) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.Maxwell.start(Maxwell.java:227) ~[maxwell-1.43.2.jar:1.43.2]
	at com.zendesk.maxwell.Maxwell.main(Maxwell.java:337) [maxwell-1.43.2.jar:1.43.2]

报错原因:

代码语言:txt复制
tb6表没有在 maxwell的元数据里面(例如maxwell崩溃了,然后人为改大了binlog_position里面的值,但是tables表里面数据是有缺失的),当后续有binlog涉及到tb6这个表,就会导致maxwell崩溃退出。

临时修复方法

代码语言:txt复制
    vim ~/maxwell-master/src/main/java/com/zendesk/maxwell/schema/Database.java +55  改动如下(将抛出异常改为控制台输出)

        // 找不到表都不崩溃,继续执行
        public Table findTableOrThrow(String table) throws InvalidSchemaError {
            Table t = findTable(table);
            if ( t == null )
                // throw new InvalidSchemaError("Couldn't find table '" + table + "'" + " in database " + this.name);
                System.out.println("WARN, InvalidSchemaError, Couldn't find table '" + table + "'" + " in maxwell metadata database " + this.name);
                return null;
            return t;
        }   


    vim ~/maxwell-master/src/main/java/com/zendesk/maxwell/replication/TableCache.java  改动如下(将抛出异常改为控制台输出)

        public void processEvent(Schema schema, Filter filter, Boolean ignoreMissingSchema, Long tableId, String dbName, String tblName) {
            if ( !tableMapCache.containsKey(tableId)) {
                if ( filter.isTableBlacklisted(dbName, tblName) ) { 
                    return;
                }   

                Database db = schema.findDatabase(dbName);
                if ( db == null ) { 
                    if ( !ignoreMissingSchema || filter.includes(dbName, tblName) )
                        throw new RuntimeException("Couldn't find database " + dbName);

                } else {
                    Table tbl = db.findTable(tblName);

                    if (tbl == null) {
                        // 找不到表都不崩溃,继续执行
                        if ( !ignoreMissingSchema || filter.includes(dbName, tblName) )
                            // throw new RuntimeException("Couldn't find table " + tblName + " in database " + dbName);
                            System.out.println("RuntimeException, Couldn't find table " + tblName + " in database " + dbName);
                    } else {
                        tableMapCache.put(tableId, tbl);
                    }
                }
            }                                                                                                                                                                           

        }