数据库迁移脚本
这次使用node.js作为工具去做
至于为啥用node.js??
可能是js在异步操作上非常高效? (但它仍然是单线程的,对于 CPU 密集型的操作可能不如其他语言。)
本质就是先查再插
例子:
{"name": "数据库迁移","version": "1.0.0","dependencies": {"log4js": "^6.9.1","mysql2": "^3.5.2"}
}
// 引入依赖
const mysql = require('mysql2/promise');
const log4js = require('log4js');// 配置日志,此处格式需要自定义
log4js.configure({appenders: {errorLog: { type: 'file', filename: 'error.log' }},categories: {default: { appenders: ['errorLog'], level: 'error' }}
})
const logger = log4js.getLogger();// 数据迁移配置
const migrationConfig = {batchSize: 1000, // 每批处理的数据量sourceDB: {host: 'db2.ilaw.com.cn',port: '3306',user: 'developer',password: 'cpsoft_873406',database: 'kinglex',connectionLimit: 10, // 设置连接数限制queueLimit: 100, // 设置队列限制acquireTimeout: 30000, // 设置获取连接的超时时间为30秒waitForConnections: true, // 等待连接可用acquireRetryCount: 3, // 获取连接的重试次数acquireRetryWait: 1000, // 获取连接的重试间隔时间为1秒supportBigNumbers: true,bigNumberStrings: true},targetDB: {host: 'db2.ilaw.com.cn',port: '3306',user: 'developer',password: 'cpsoft_873406',database: 'kinglex',connectionLimit: 20, // 设置连接数限制queueLimit: 200, // 设置队列限制acquireTimeout: 5000, // 设置获取连接的超时时间为5秒waitForConnections: true, // 等待连接可用acquireRetryCount: 5, // 获取连接的重试次数acquireRetryWait: 500, // 获取连接的重试间隔时间为0.5秒supportBigNumbers: true,bigNumberStrings: true}
};// 创建源数据库连接池
const sourcePool = mysql.createPool(migrationConfig.sourceDB);// 创建目标数据库连接池
const targetPool = mysql.createPool(migrationConfig.targetDB);async function cust_linkman() {let sourceConnection, targetConnection;try {sourceConnection = await getConnectionFromSourcePool();targetConnection = await getConnectionFromTargetPool();let offset = 0;while (true) {const [rows] = await sourceConnection.query(`SELECT * FROM cust_linkman LIMIT ${offset}, ${migrationConfig.batchSize}`);console.log(rows)if (rows.length === 0) {// 没有更多数据,数据迁移完成break;}const dataBase = []const dataPerson = []// 处理当前批次的数据for (const row of rows) {const typeO = row.type;let EntityType = null;if(typeO!=null && typeO==1){EntityType=501;}if(typeO!=null && typeO==2){EntityType=503;}if(typeO!=null && typeO==3){EntityType=506;}if(typeO!=null && typeO==4){EntityType=505;}if(typeO!=null && typeO==5){EntityType=504;}if(typeO!=null && typeO==6){EntityType=507;}const transformedUserBase = {id: row.id,groupId: null,type: 5, //5就是联系人name: row.name,nameIndex: row.firstName,nameGBK: null,simpleName: null,formerName:row.beforeName,codeType:row.cardType,code:row.IDCard,mobile1:row.mobile1,mobile2:row.mobile2,phone:null,country:row.country,province:row.province,city:row.city,district:row.district,businessArea:row.businessArea,countryCode:null,provinceCode:null,cityCode:null,districtCode:null,businessAreaCode:null,address:null,description: row.description,remark: row.memo,teamId: row.teamId,managerId: row.managerId,shareId:null,resource: row.isAuto,importanceEvaluate:null,caseSourceEmpId:null,creditedLevel:null,relationLevel:null,isImportance:null,importanceDesc:null,isinstancy:null,instancyDesc:null,status1 :null,email: row.email,renameStatus :null,isGetGSData :null,isGetGSGJData :null,relation :null,deptId :null,dept :null,relationCustomer :null,company:row.company,createUserId:row.createUserId,createTime:row.createDate,modifyTime:row.updateDate,isDel:row.isDel,belongTo:row.custId,mobile3:row.mobile3,entityType:EntityType}const transformedUserPerson = {custId: row.id,groupId: null,type: 3, //3就是联系人sex : row.sex,marriage: row.marriage,homePhone:row.homePhone,qq: row.qq,nation:row.nation,birthday: row.birthday,registeredAddress: row.address,bigDiploma: row.bigDiploma,origin: row.origin,face: row.face,religion: row.religion,workCompany: row.workCompany,workDept: row.workDept,workDimission:row.workDimission,workJob:row.workJob,workRight: row.workRight,workPhone:row.workPhone,workFax: row.workFax,workAddress: row.workAddress,cardDept: null,cardAddress: null,cardAddressZip: null,endUniversity: row.endUniversity,workZip: row.workZip,workWebsite: row.workWebsite,workLinePhone: row.workLinePhone,msn: row.msn,homeAddress: row.homeAddress,homeZip: row.homeZip,communicationAddress: row.communicationAddress,communicationZip: row.communicationZip,like: row.lkmLike,specialty: row.specialty,consortInfo: row.consortInfo,childInfo: row.childInfo,otherInfo: row.otherInfo,company:row.company,createUserId:row.createUserId,createTime: row.createDate,modifyTime:row.updateDate,isDel : row.isDel}dataBase.push(transformedUserBase)dataPerson.push(transformedUserPerson)}const valueb = dataBase.map(Object.values)console.log("valueb----------------------------")console.log(valueb)const valuep = dataPerson.map(Object.values)try {await targetConnection.query('INSERT IGNORE INTO `cust_base_info`' + 'VALUES ?',[valueb]);await targetConnection.query('INSERT IGNORE INTO `cust_person`' +'VALUES ?',[valuep]);} catch (err) {// 处理迁移过程中的错误// 可以选择继续处理下一条数据或者中断迁移logger.error(err);console.error('出错: ', err);}//console.log('cst_opponent_organization_company 迁移成功:' + data.map(item => item.id + ''))console.log('cst_opponent_organization_company 迁移成功:' + valueb.length + '条')offset += migrationConfig.batchSize;}console.log('cst_opponent_organization_company 迁移完成')} catch (err) {console.error('出错: ', err);throw err;} finally {// 释放连接回连接池if (sourceConnection) {sourceConnection.release();}if (targetConnection) {targetConnection.release();}}
}async function cst_opponent_organization_company() {let sourceConnection, targetConnection;try {sourceConnection = await getConnectionFromSourcePool();targetConnection = await getConnectionFromTargetPool();let offset = 0;while (true) {const [rows] = await sourceConnection.query(`SELECT *, cst_opponent.id as id, cst_opponent.address as address, cst_opponent.company as company, cst_opponent.createTime as createTime,cst_opponent.createUserId as createUserId, cst_opponent.modifyTime as modifyTime,cst_opponent.isDel as isDelFROM cst_opponentleft join (select * from cst_opponent_organization where isDel = 0) opponent_organization on opponent_organization.opponentId = cst_opponent.idwhere type = 1 LIMIT ${offset}, ${migrationConfig.batchSize}`);if (rows.length === 0) {// 没有更多数据,数据迁移完成break;}const data = []// 处理当前批次的数据for (const row of rows) {// 在这里执行具体的数据转换、处理和插入操作// 转换和映射数据const transformedUser = {custId: row.id,groupId: null,type: 2,scope: row.scope,property: row.property,fax: row.fax,website: row.website,officeArea: row.officeArea,turnover: row.turnover,employeeNumber: row.employeeNumber,creditCode: row.creditCode,bankroll: row.bankroll,code: row.code,establishDate: row.finishDate,chieftain: row.chieftain,businessScope: null,taxNo: row.taxNo,bizPhone: row.bizPhone,bizAddress: row.bizAddress,bank1: row.bankOne,bankAccountName1: row.bankAccountNameOne,bankAccountId1: row.bankAccountIdOne,bank2: row.bankTwo,bankAccountName2: row.bankAccountNameTwo,bankAccountId2: row.bankAccountIdTwo,bank3: row.bankThree,bankAccountName3: row.bankAccountNameThree,bankAccountId3: row.bankAccountIdThree,zip: row.zip,brand: row.brand,bizLicense: row.bizLicense,propertySubName: null,billCode: null,company :row.company,createUserId :row.createUserId,createTime :row.createTime,modifyTime :row.modifyTime,isDel :row.isDel};data.push(transformedUser)}const value = data.map(Object.values)try {await targetConnection.query('INSERT IGNORE INTO cust_company ' +'(custId, groupId, type, scope, property, ' +'fax, website, officeArea, turnover, employeeNumber, ' +'creditCode, bankroll, code, establishDate, chieftain, ' +'businessScope, taxNo, bizPhone, bizAddress, bank1, bankAccountName1, ' +'bankAccountId1, bank2, bankAccountName2, bankAccountId2, bank3, ' +'bankAccountName3, bankAccountId3, zip, brand, bizLicense, ' +'propertySubName, billCode, company ,createUserId ,createTime ,modifyTime ,isDel) ' +'VALUES ?',[value]);} catch (err) {// 处理迁移过程中的错误// 可以选择继续处理下一条数据或者中断迁移logger.error(err);console.error('出错: ', err);}//console.log('cst_opponent_organization_company 迁移成功:' + data.map(item => item.id + ''))console.log('cst_opponent_organization_company 迁移成功:' + data.length + '条')offset += migrationConfig.batchSize;}console.log('cst_opponent_organization_company 迁移完成')} catch (err) {console.error('出错: ', err);throw err;} finally {// 释放连接回连接池if (sourceConnection) {sourceConnection.release();}if (targetConnection) {targetConnection.release();}}
}