前面我们已经学习了解过了数据库常用的分库分表方案,本节以水平分表为例来实战下.
需求背景
最近项目中的几张表数据行超过了1000万行,所以需要对这些表进行水平分表,提高数据查询的性能。可选的方案有sharding-jdbc中间件还有就是Mybatis拦截器。由于使用的是Pg数据库,并且Pg数据库支持很多函数,以及复杂的sql查询语句,使用sharding-jdbc可能会有意想不到的坑,所以决定采用Mybatis拦截器的方式。
分表思路
这里我需要分表的表名为process_log,这里不是根据正常的id字段去分表(因为这张表连id字段都没有…),而是选择这张表的唯一字段form_data_code来作为分表字段,将form_data_code字段值进行hashCode()然后进行取模。目前这张表的数据足足5000多万,考虑之后还会增加,需要将表数据控制在百万级内,所以决定分表20张。
// 1.准备20张表
结构同表process_log的20张表,process_log_0 ->process_log_19
// 2.分表健formDataCode,使用java hash算法
Math.abs(formDataCode.hashCode() % 20);
// 3.迁移数据
Mybaits拦截器实现
实现自定义注解
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SegmentTable {/*** 表名*/String[] tableName();/*** 算法策略*/Class[] strategyClazz();
}
// 需要分表的table的Mapper接口
@SegmentTable(tableName = {"process_log"}, strategyClazz ={ProcessLogStrategy.class})
public interface ProcessLogMapper {// todo}
实现算法策略
// 策略模式
public interface ShardTableStrategy {/*** 分表算法** @param statementHandler* @return*/String shardAlgorithm(StatementHandler statementHandler);
}
public class ShardTableContext {private ShardTableStrategy tableStrategy;public ShardTableContext(ShardTableStrategy tableStrategy) {this.tableStrategy = tableStrategy;}public String doShardAlgorithm(StatementHandler statementHandler){return tableStrategy.shardAlgorithm(statementHandler);}
}
process_log策略实现
/*** process_log 分表算法**/
public class ProcessLogStrategy implements ShardTableStrategy {private static final Logger logger = LoggerFactory.getLogger(ProcessLogStrategy.class);/*** 原始表名*/private final static String PROCESS_LOG_ORIGIN_TABLE_NAME = "process_log";private final static String TABLE_LINE = "_";/*** 分表20张*/public final static Integer PROCESS_LOG_TABLE_NUM = 20;/*** 特殊处理字段*/private final static String PROCESS_LOG_TABLE_CONFIRM_INDEX = "subTableConfirmIndex";/*** 分表字段*/private final static String PROCESS_LOG_TABLE_SUB_FIELD = "formDataCode";@Overridepublic String shardAlgorithm(StatementHandler statementHandler) throws RuntimeException {AppEnv appEnv = ApplicationContextUtil.getApplicationContext().getBean(AppEnv.class);if(!appEnv.isShardingTableProcessLog()){return PROCESS_LOG_ORIGIN_TABLE_NAME;}BoundSql boundSql = statementHandler.getBoundSql();Object parameterObject = boundSql.getParameterObject();// 参数值Map param2ValeMap = JSONObject.parseObject(JSON.toJSONString(parameterObject), Map.class);logger.info("ProcessLogStrategy test!!! param2ValeMap={}", JSON.toJSONString(param2ValeMap));// 特殊处理foreach循环语句Object confirmIndexValue = param2ValeMap.get(PROCESS_LOG_TABLE_CONFIRM_INDEX);if (confirmIndexValue != null) {logger.info("handle success, sql exist param confirmIndexValue={}", confirmIndexValue);return PROCESS_LOG_ORIGIN_TABLE_NAME + TABLE_LINE + confirmIndexValue;}Object subFieldValue = param2ValeMap.get(PROCESS_LOG_TABLE_SUB_FIELD);if (MapUtils.isEmpty(param2ValeMap) || subFieldValue == null) {throw new MybatisInterceptorException("process_log is subTable so must have subFiledValue!");}return PROCESS_LOG_ORIGIN_TABLE_NAME + TABLE_LINE + Math.abs(subFieldValue.hashCode() % PROCESS_LOG_TABLE_NUM);}}
拦截器intercept()执行逻辑
mybatis拦截器必须实现Interceptor接口
public interface Interceptor { // 拦截器执行的逻辑方法Object intercept(Invocation invocation) throws Throwable; // 用来封装目标对象。可以返回目标对象本身也可以根据实际需要,创建一个代理对象Object plugin(Object target);// 在Mybatis进行配置插件的时候可以配置自定义相关属性void setProperties(Properties properties);
}
@Overridepublic Object intercept(Invocation invocation) throws Throwable {StatementHandler statementHandler = (StatementHandler) invocation.getTarget();// 全局操作读对象MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY, SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());// @SegmentTable -- 只拦截有注解的MapperSegmentTable segmentTable = getSegmentTable(metaObject);if (segmentTable == null) {return invocation.proceed();}// 1.对value进行算法 -> 确定表名Class strategyClazz = segmentTable.strategyClazz();ShardTableStrategy strategy = (ShardTableStrategy) strategyClazz.newInstance();String index = new ShardTableContext(strategy).doShardAlgorithm(statementHandler);logger.info("ShardTableInterceptor segmentTable={},index={}", JSON.toJSONString(segmentTable), index);// 2.替换表名// 获取原始sqlString tableName = segmentTable.tableName();String sql = (String) metaObject.getValue(BOUND_SQL_NAME);metaObject.setValue(BOUND_SQL_NAME, sql.replaceFirst(tableName, tableName + index));return invocation.proceed();}
总结遇到的问题
1.Mapper文件问题
老系统比较混乱,存在多表关联查询,有些Mapper.xml对象对应的sql不是唯一表。这里需要注意,因为注解是针对整个Mapper文件的,只要是上面的sql都会拦截。但是需要分表的sql都必须要有分表字段。需要避免不分表的sql走策略算法。 最好将需要分表的表拆成单表,逻辑在代码里处理。
2.分页插件pagehelper导致自定义插件无效
在sqlSessionFactory对象中放入拦截器对象,如果系统中有使用Mybatis对分页插件,要注意与自定义拦截器对顺序,拦截器底层采用责任链对方式,通常都会返回invocation.proceed()传递,但是分页插件没有返回。所以需要调整注册顺序.
3.针对sql中对foreach循环
sql查询中用到了分表key的集合,这种情况,在应用层提前使用hash算法,找到所在的表。
4.迁移数据
数据库的hash算法和Java的hash算法是不一致的,所以要确定两边的算法一直,大多数数据库是提供自定义算法的,本次是Pg数据.
// pg自定义java的hashCode算法
DROP FUNCTION IF EXISTS hash_code(text);
CREATE FUNCTION hash_code(text) RETURNS integerLANGUAGE plpgsql
AS
$$
DECLAREi integer := 0;DECLAREh bigint := 0;
BEGINFOR i IN 1..length($1)LOOPh = (h * 31 + ascii(substring($1, i, 1))) & 4294967295;END LOOP;RETURN cast(cast(h AS bit(32)) AS int4);
END;
$$;
总结
以上便是水平分表的一次实践,就是提供一种分表的思路吧,加深下分库分表的理解.