DataX迁移MongoDB
- 项目地址:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。
- 迁移MongoDB,读取组件为mongodbreader,写入组件为mongodbwriter
源码修改
-
目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码
src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java
if (tempCol == null) {//continue; 这个不能直接continue会导致record到目的端错位record.addColumn(new StringColumn(null)); }else if (tempCol instanceof Double) {//TODO deal with Double.isNaN()record.addColumn(new DoubleColumn((Double) tempCol)); } else if (tempCol instanceof Boolean) {record.addColumn(new BoolColumn((Boolean) tempCol)); } else if (tempCol instanceof Date) {record.addColumn(new DateColumn((Date) tempCol)); } else if (tempCol instanceof Integer) {record.addColumn(new LongColumn((Integer) tempCol)); }else if (tempCol instanceof Long) {record.addColumn(new LongColumn((Long) tempCol)); } else {if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);if(Strings.isNullOrEmpty(splitter)) {throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());} else {ArrayList array = (ArrayList)tempCol;String tempArrayStr = Joiner.on(splitter).join(array);record.addColumn(new StringColumn(tempArrayStr));}} else {record.addColumn(new StringColumn(tempCol.toString()));} }
-
修改为:
if (tempCol == null) {//continue; 这个不能直接continue会导致record到目的端错位record.addColumn(new StringColumn(null)); }else if (tempCol instanceof Double) {//TODO deal with Double.isNaN()record.addColumn(new DoubleColumn((Double) tempCol)); } else if (tempCol instanceof Boolean) {record.addColumn(new BoolColumn((Boolean) tempCol)); } else if (tempCol instanceof Date) {record.addColumn(new DateColumn((Date) tempCol)); } else if (tempCol instanceof Integer) {record.addColumn(new LongColumn((Integer) tempCol)); }else if (tempCol instanceof Long) {record.addColumn(new LongColumn((Long) tempCol)); } else if (tempCol instanceof Binary) {// 处理 MongoDB 的 Binary 类型数据Binary binaryData = (Binary) tempCol;byte[] binaryBytes = binaryData.getData();// 将字节数组添加到 DataX 中的二进制列record.addColumn(new BytesColumn(binaryBytes)); } else {if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) {String splitter = column.getString(KeyConstant.COLUMN_SPLITTER);if(Strings.isNullOrEmpty(splitter)) {throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());} else {ArrayList array = (ArrayList)tempCol;String tempArrayStr = Joiner.on(splitter).join(array);record.addColumn(new StringColumn(tempArrayStr));}} else {record.addColumn(new StringColumn(tempCol.toString()));} }
- 修改源码后,要重新打包,由于只更改了mongodbreader,故在打包时,可以考虑将根
迁移脚本
-
编写job脚本:
1.json
{"job": {"content": [{"reader": {"name": "mongodbreader","parameter": {"address": ["ip1:27017"],"collectionName": "data","column": [{"name": "_id","type": "long"},{"name": "fileContent","type": "bytes"}],"dbName": "monitor","userName": "root","userPassword": "123456","query": {"_id": {"$lt": 21}}}},"writer": {"name": "mongodbwriter","parameter": {"address": ["ip2:27017"],"collectionName": "data","column": [{"name": "_id","type": "long"},{"name": "fileContent","type": "bytes"}],"writeMode": {"isReplace": "true","replaceKey": "_id"}"dbName": "test","userName": "root","userPassword": "123456",}}}],"setting": {"speed": {"channel": "2"}}} }
reader
中的query
节点为查询条件,上述demo中是查询_id
小于21的记录。
-
执行命令:
python datax.py G:\Code\1.json
datax.py
在打包后的target目录下,相对路径:target\datax\datax\bin