MongoDB “升级项目” 大型连续剧(3)

MongoDB “升级项目” 大型连续剧(2)-- 到底谁是"der"

MongoDB “升级项目” 大型连续剧(1)-- 可“生”可不升

接上集,上集比对了一些MongoDB 4.2和6.0之间的差别,找准开发和架构以及运营MongoDB的利益点,但这里必须给出一些具体操作的文档。这里我画一个思维导图,将整体迁移过程的需要注意的地方来画一下。

迁移中需要考虑的问题

迁移中需要考虑的问题

具体事情具体分析,这里我们先从知晓的问题开始入手,先产生操作注意事项的文档,先将可能产生的问题说清楚。

一 迁移中的需求收集

根据我们公司一贯的风格,迁移一定要能回滚,且必须有plan B,没有Plan B和回滚的方案都认为是不安全的,如果实在没有,也的想出其他的解决方案,或者这个事情就直接不做。

那么如果升级MongoDB要可进退,必须要进行双向复制,而双向MongoDB的不同版本的复制,这个之前我们通过mongoshake是可以做到的,在上次的迁移中我们学习到一些经验。

1 通过mongoshake进行迁移,需要更大的oplogs 的设置,因为有一些突发的数据写入很可能导致mongoshake还没有吧数据消化就被清理的情况发生。

2 通过mongoshake迁移不能进行任何的DDL操作,尤其建立索引的操作,只要进行就会导致连接错误,主要的原因还是这两边建立索引的命令是不一致的,所有通过mongoshake传递过去也无法在高于4.4以上的数据库上建立索引。

这次我们不打算采用MongoShake的方案,我们本次要通过DTS的方案来进行MongoDB的双向复制。这主要基于阿里云对MongoDB的oplogs日志格式中添加了一个字段,通过这个来标识数据从何而来,通过判断数据的来源而避免同一个数据在环回到源去。

这里针对通过DTS来进行双向数据复制的事情需要注意以下问题。

MongoDB 在阿里云中升级的工作中通过DTS来进行双向同步是一个方案,针对MOGNODB 4.0 到更高的版本。

升级的方案中需要有如下的注意事项

1 目的数据库的磁盘空间应比原有的数据库的数据空间大约10% ,但此为理论值,具体情况看原有的数据库本身的数据库磁盘的大小以及业务的繁忙程度,扩展程度在 5% - 15%上下浮动。

2 在复制中基于阿里云的MOGNODB独有技术可以实现双向复制,需要打开MongoDB的 replication.oplogGlobalIdEnabled 参数

3 在进行操作中需要对数据库中所有的collections进行校验,确保有主键,同时去掉数据库中的 ttl 索引,针对索引进行全部的扫描和记录,并且在数据复制后进行整体的符合,在切换后再建立相关的TTL索引

4 在双向复制中只能通过 oplogs 进行相关获取数据库的变更,而不能通过change streams。

5 在操作中双向复制中,不能建立索引,不能进行约束,数据类型的变更,如原有的KEY VALUES 为 字符,后续要在复制期间改成数值类型 ,这些都是禁止的

6 需要有足够的空间存储OPLOGS

7 同步中,不支持admin 和 local 的库的数据同步 ,如有在此下的数据库无法进行同步

8 初期进行数据同步全量时可能会需要对数据库的配置进行升级等 降低初期产生的问题如系统OOM 挂机 down机等

9 对复制的数据库中,不要对数据库使用 capped的方式

10  目标端MongoDB的count数量需要使用db.$table_name.aggregate([{ $count:"myCount"}])语法查询

11  增量同步支持  create  collection .  create index  drop collection   drop index  rename colleciton (但这里注意是同版本的额,不同版本的不支持)

12 同步中只支持1000个集合 1000张table 超过则无法通过 DTS来进行工作

以上为官方文档的说明,下面是一个脚本,因为在复制中,我们需要判断两方的数据库的索引和表数据库等数量是一致的,那么必然不能通过“手”来进行,我们通过node.js的代码来对这两方的数据库进行程序的自动报告的方式来检查。

Mongo6 的node.js代码

代码语言:javascript代码运行次数:0运行复制
const { MongoClient } = require('mongodb');
const fs = require('fs');
const path = require('path');

const uri = 'mongodb的地址';
const outputFile = path.join(__dirname, 'mongodb_analysis_report.txt');

async functionanalyzeMongoDB() {
    const client = new MongoClient(uri);

    try {
        await client.connect();
        console.log('已连接到 MongoDB');

        const databases = await client.db().admin().listDatabases();
        const dbNames = databases.databases.map(db => db.name);
        
        const reports = [];
        const users = [];

        for (const dbName of dbNames) {
            const db = client.db(dbName);
            const collections = await db.listCollections().toArray();

            let totalCollections = 0;
            let totalIndexes = 0;
            let ttlIndexStatements = [];

            for (const col of collections) {
                const collection = db.collection(col.name);
                const indexes = await collection.indexes();

                totalCollections++;
                totalIndexes += indexes.length;

                for (const index of indexes) {
                    if (index.options && index.options.expireAfterSeconds !== undefined) {
                        const field = Object.keys(index.key)[0];
                        const ttlIndexStatement = `db.${col.name}.createIndex({ "${field}": 1 }, { expireAfterSeconds: ${index.options.expireAfterSeconds} })`;
                        const dropTtlIndexStatement = `db.${col.name}.dropIndex("${index.name}")`;

                        ttlIndexStatements.push({
                            collection: col.name,
                            field,
                            expireAfterSeconds: index.options.expireAfterSeconds,
                            createStatement: ttlIndexStatement,
                            dropStatement: dropTtlIndexStatement
                        });
                    }
                }
            }

            if (dbName === 'admin') {
                const userInfo = await dbmand({ usersInfo: 1 });
                if (userInfo && Array.isArray(userInfo.users)) {
                    users.push(...userInfo.users);
                }
            }

            reports.push({ dbName, totalCollections, totalIndexes, ttlIndexStatements });
        }

        const reportContent = generateReport(dbNames, reports, users);        
        fs.writeFileSync(outputFile, reportContent, 'utf8');
        console.log(`分析报告已生成,保存至: ${outputFile}`);

    } catch (err) {
        console.error('发生错误:', err);
    } finally {
        await client.close();
        console.log('已断开与 MongoDB 的连接');
    }
}

function generateReport(dbNames, reports, users) {
    return `
MongoDB 分析报告
=================

数据库数量: ${dbNames.length}

${reports.map((report) => `
数据库: ${report.dbName}
总集合数量: ${report.totalCollections}
总索引数量: ${report.totalIndexes}

TTL 索引信息:
------------
${report.ttlIndexStatements.map((info) => `
集合: ${info.collection}
字段: ${info.field}
过期时间: ${info.expireAfterSeconds} 秒
创建语句: ${info.createStatement}
删除语句: ${info.dropStatement}`).join('\n') || '无 TTL 索引'}.         
`).join('\n')}

用户信息:
--------

${users.map(user => `
用户名: ${user.user}@${user.db}
角色: ${user.roles.map(role => role.role).join(', ') || '无角色'}` ).join('\n')}
`;
}

analyzeMongoDB();

mongo4.x 的 node.js的代码

代码语言:javascript代码运行次数:0运行复制
const { MongoClient } = require('mongodb');
const fs = require('fs');
const path = require('path');

const uri = '';
const outputFile = path.join(__dirname, 'mongodb_analysis_report.txt');

async functionanalyzeMongoDB() {
    const client = new MongoClient(uri);

    try {
        await client.connect();
        console.log('已连接到 MongoDB');

        const databases = await client.db().admin().listDatabases();
        const dbNames = databases.databases.map(db => db.name);
        
        const reports = [];
        const users = [];

        for (const dbName of dbNames) {
            const db = client.db(dbName);
            const collections = await db.listCollections().toArray();

            let totalCollections = 0;
            let totalIndexes = 0;
            let ttlIndexStatements = [];

            for (const col of collections) {
                const collection = db.collection(col.name);
                const indexes = await collection.indexes();

                totalCollections++;
                totalIndexes += indexes.length;

                for (const index of indexes) {
                    if (index.options && index.options.expireAfterSeconds !== undefined) {
                        const field = Object.keys(index.key)[0];
                        const ttlIndexStatement = `db.${col.name}.createIndex({ "${field}": 1 }, { expireAfterSeconds: ${index.options.expireAfterSeconds} })`;
                        const dropTtlIndexStatement = `db.${col.name}.dropIndex("${index.name}")`;

                        ttlIndexStatements.push({
                            collection: col.name,
                            field,
                            expireAfterSeconds: index.options.expireAfterSeconds,
                            createStatement: ttlIndexStatement,
                            dropStatement: dropTtlIndexStatement
                        });
                    }
                }
            }

            if (dbName === 'admin') {
                const userInfo = await dbmand({ usersInfo: 1 });
                if (userInfo && Array.isArray(userInfo.users)) {
                    users.push(...userInfo.users);
                }
            }

            reports.push({ dbName, totalCollections, totalIndexes, ttlIndexStatements });
        }

        const reportContent = generateReport(dbNames, reports, users);        
        fs.writeFileSync(outputFile, reportContent, 'utf8');
        console.log(`分析报告已生成,保存至: ${outputFile}`);

    } catch (err) {
        console.error('发生错误:', err);
    } finally {
        await client.close();
        console.log('已断开与 MongoDB 的连接');
    }
}

function generateReport(dbNames, reports, users) {
    return `
MongoDB 分析报告
=================

数据库数量: ${dbNames.length}

${reports.map((report) => `
数据库: ${report.dbName}
总集合数量: ${report.totalCollections}
总索引数量: ${report.totalIndexes}

TTL 索引信息:
------------
${report.ttlIndexStatements.map((info) => `
集合: ${info.collection}
字段: ${info.field}
过期时间: ${info.expireAfterSeconds} 秒
创建语句: ${info.createStatement}
删除语句: ${info.dropStatement}`).join('\n') || '无 TTL 索引'}.         
`).join('\n')}

用户信息:
--------

${users.map(user => `
用户名: ${user.user}@${user.db}
角色: ${user.roles.map(role => role.role).join(', ') || '无角色'}` ).join('\n')}
`;
}

analyzeMongoDB();

image

执行后,会产生对应的报告,其中包含多少数据库,每个库里面的有多少表,每个库里面有总的索引是多少。

因为在DTS双向同步的情况下,他们是不会建立用户的,也就是不会同步用户,这里就需要我们在写脚本来把用户建立的语句导出,然后在目的库,填写好密码后,在建立,这个工作的脚本我们下期来说。

整体前期的一些铺垫的工作还在进行,程序端还在测试,同时我们后期还要列出 mongodb 4.x 和 6.X之间的一些语句的差别等,方便开发了解。下期我们继续......

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。原始发表:2025-04-28,如有侵权请联系 cloudcommunity@tencent 删除数据库mongodb数据索引同步