2019独角兽企业重金招聘Python工程师标准>>>
由于项目中数据量较大,访问量也较高,故在数据库的设计上,采用读写分离思想,达到性能要求!
简单科普一下实现读写分离的思路
- 配置及加载数据库信息,即加载数据源
- 继承类AbstractRoutingDataSource,并重写方法determineCurrentLookupKey(),在这个方法中决定使用哪个数据源,实现切换数据库。这点非常重要。
- 使用AOP切面,截获读写操作,修改数据库读写标志。
废话不多,上代码
- 配置及加载数据库信息
(1)在yml文件中填写数据库配置信息,这里是mysql、1主2从:datasource:#从库数量readSize: 2type: com.alibaba.druid.pool.DruidDataSource # 使用druid数据源#主库write:url: jdbc:mysql://localhost:3306/lingguan_xiaowang?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: usernamepassword: passworddriver-class-name: com.mysql.jdbc.Driverfilters: statmaxActive: 20initialSize: 1maxWait: 60000minIdle: 1timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000validationQueryTimeout: 900000validationQuery: SELECT SYSDATE() from dualtestWhileIdle: truetestOnBorrow: falsetestOnReturn: falsepoolPreparedStatements: truemaxOpenPreparedStatements: 20# 从库read1:url: jdbc:mysql://localhost:3306/slave1?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: usernamepassword: passworddriver-class-name: com.mysql.jdbc.Driverfilters: statmaxActive: 20initialSize: 1maxWait: 60000minIdle: 1timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000validationQueryTimeout: 900000validationQuery: SELECT SYSDATE() from dualtestWhileIdle: truetestOnBorrow: falsetestOnReturn: falsepoolPreparedStatements: truemaxOpenPreparedStatements: 20read2:url: jdbc:mysql://localhost:3306/slave2?useUnicode=true&characterEncoding=utf-8&useSSL=falseusername: usernamepassword: passworddriver-class-name: com.mysql.jdbc.Driverfilters: statmaxActive: 20initialSize: 1maxWait: 60000minIdle: 1timeBetweenEvictionRunsMillis: 60000minEvictableIdleTimeMillis: 300000validationQueryTimeout: 900000validationQuery: SELECT SYSDATE() from dualtestWhileIdle: truetestOnBorrow: falsetestOnReturn: falsepoolPreparedStatements: truemaxOpenPreparedStatements: 20
(2)通过读取yml配置文件,构造动态数据源DataSource
import com.gome.store.util.mybatis.DataSourceType; import com.gome.store.util.mybatis.MyAbstractRoutingDataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.core.env.Environment; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement;import javax.sql.DataSource; import java.util.HashMap; import java.util.Map;/*** @Description 解析数据库的配置项* @Author wangjie<https://my.oschina.net/xiaowangqiongyou>* @Date 2017/7/31*/ @Configuration @EnableTransactionManagement public class DataSourceConfiguration {protected Logger logger = LoggerFactory.getLogger(this.getClass());@Value("${datasource.type}")private Class<? extends DataSource> dataSourceType;@Autowiredprivate Environment env;@Value("${datasource.readSize}")private String dataSourceSize;@Bean(name = "writeDataSource")@Primary@ConfigurationProperties(prefix = "datasource.write")public DataSource writeDataSource() {logger.info("-------------------- writeDataSource init ---------------------");return DataSourceBuilder.create().type(dataSourceType).build();}/*** 有多少个从库就要配置多少个** @author wangjie* @date 2017/7/31* @params*/@Bean(name = "readDataSource1")@ConfigurationProperties(prefix = "datasource.read1")public DataSource readDataSourceOne() {logger.info("-------------------- readDataSourceOne init ---------------------");return DataSourceBuilder.create().type(dataSourceType).build();}@Bean(name = "readDataSource2")@ConfigurationProperties(prefix = "datasource.read2")public DataSource readDataSourceTwo() {logger.info("-------------------- readDataSourceTwo init ---------------------");return DataSourceBuilder.create().type(dataSourceType).build();}@Beanpublic MyAbstractRoutingDataSource dataSource(@Qualifier("writeDataSource") DataSource writeDataSource,@Qualifier("readDataSource1") DataSource readDataSourceOne,@Qualifier("readDataSource2") DataSource readDataSourceTwo) {int size = Integer.parseInt(dataSourceSize);Map<Object, Object> targetDataSources = new HashMap<>();// 写targetDataSources.put(DataSourceType.write.getType(), writeDataSource);// 读targetDataSources.put(DataSourceType.read.getType() + 0, readDataSourceOne);targetDataSources.put(DataSourceType.read.getType() + 1, readDataSourceTwo);MyAbstractRoutingDataSource dataSource = new MyAbstractRoutingDataSource(size);dataSource.setTargetDataSources(targetDataSources);// 该方法是AbstractRoutingDataSource的方法dataSource.setDefaultTargetDataSource(writeDataSource);// 默认的datasource设置为myTestDbDataSourcereturn dataSource;}@Beanpublic SqlSessionFactory sqlSessionFactory(MyAbstractRoutingDataSource ds) throws Exception {SqlSessionFactoryBean fb = new SqlSessionFactoryBean();fb.setDataSource(ds);// 指定数据源(这个必须有,否则报错)// 下边两句仅仅用于*.xml文件,如果整个持久层操作不需要使用到xml文件的话(只用注解就可以搞定),则不加fb.setTypeAliasesPackage(env.getProperty("mybatis.typeAliasesPackage"));// 指定基包fb.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(env.getProperty("mybatis.mapperLocations")));//return fb.getObject();}/*** 配置事务管理器*/@Beanpublic DataSourceTransactionManager transactionManager(MyAbstractRoutingDataSource dataSource) throws Exception {return new DataSourceTransactionManager(dataSource);} }
注释:
· @Primary:指定在同一个接口有多个实现类可以注入的时候,默认选择哪一个,而不是让@Autowire注解报错(一般用于多数据源的情况下)
· @Qualifier:指定名称的注入,当一个接口有多个实现类的时候使用(在本例中,有两个DataSource类型的实例,需要指定名称注入)
· 通过动态数据源构造SqlSessionFactory和事务管理器(如果不需要事务,后者可以去掉)
- 编写相应的控制数据源类型类
(1)数据源类型/*** @Description* @Author wangjie<https://my.oschina.net/xiaowangqiongyou>* @Date 2017/7/31*/ public enum DataSourceType {read("read", "从库"), write("write", "主库");private String type;private String name;DataSourceType(String type, String name) {this.type = type;this.name = name;}public String getType() {return type;}public String getName() {return name;} }
(2)数据源标志类/*** @Description 本地线程全局变量,用来存放当前操作是读还是写* @Author wangjie<https://my.oschina.net/xiaowangqiongyou>* @Date 2017/7/31*/ public class DataSourceContextHolder {private static final ThreadLocal<String> local = new ThreadLocal<>();public static ThreadLocal<String> getLocal() {return local;}/** 读可能是多个库* @author wangjie* @date 2017/7/31* @params*/public static void read() {local.set(DataSourceType.read.getType());}/** 写只有一个库* @author wangjie* @date 2017/7/31* @params*/public static void write() {local.set(DataSourceType.write.getType());}public static String getJdbcType() {return local.get();}/** 清除数据源类型* @author wangjie* @date 2017/7/31* @params*/public static void clearDataSourceType() {local.remove();} }
- 继承AbstractRoutingDataSource
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.util.StringUtils;import java.util.concurrent.atomic.AtomicInteger;/*** @Description 多数据源切换* @Author wangjie<https://my.oschina.net/xiaowangqiongyou>* @Date 2017/7/31*/ public class MyAbstractRoutingDataSource extends AbstractRoutingDataSource {protected Logger logger = LoggerFactory.getLogger(this.getClass());private final int dataSourceNumber;private AtomicInteger count = new AtomicInteger(0);public MyAbstractRoutingDataSource(int dataSourceNumber) {this.dataSourceNumber = dataSourceNumber;}@Overrideprotected Object determineCurrentLookupKey() {Object resultObject = null;String typeKey = DataSourceContextHolder.getJdbcType();// 只对主库开启事务,如果typeKey为空表示获取主库的datasourceif (StringUtils.isEmpty(typeKey)) {resultObject = DataSourceType.write.getType();} else {// 读简单负载均衡int number = count.getAndAdd(1);int lookupKey = number % dataSourceNumber;resultObject = DataSourceType.read.getType() + lookupKey;}logger.info("determineCurrentLookupKey:" + resultObject);return resultObject;} }
- AOP的实现
import com.gome.store.util.mybatis.DataSourceContextHolder; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional;/*** @Description aop 拦截 进行切换数据源* 如果service层 增加了@Transactional ,导致数据源MyAbstractRoutingDataSource的determineCurrentLookupKey()方法会在执行DataSourceAop拦截之前就进行全局事务绑定* 从而导致获取 DataSourceContextHolder.getJdbcType(); 一直都是空值* @Author wangjie<https://my.oschina.net/xiaowangqiongyou>* @Date 2017/7/31*/ @Aspect @Component public class DataSourceAop {private static Logger logger = LoggerFactory.getLogger(DataSourceAop.class);// @Before("execution(* com.ggj.encrypt.modules.*.dao..*.find*(..)) or execution(* com.ggj.encrypt.modules.*.dao..*.get*(..)) or execution(* com.ggj.encrypt.modules.*.dao..*.select*(..))")@Before("execution(* com.gome.store.dao..*.query*(..)) or execution(* com.gome.store.dao..*.get*(..)) or execution(* com.gome.store.dao..*.select*(..))")public void setReadDataSourceType() {DataSourceContextHolder.read();logger.info("dataSource切换到:Read");}// 另一种方式 // @Around("@annotation(org.springframework.transaction.annotation.Transactional)") // public void setWriteDataSourceType(ProceedingJoinPoint joinPoint) throws Throwable { // Transactional datasource = ((MethodSignature)joinPoint.getSignature()).getMethod().getAnnotation(Transactional.class); // if(datasource.readOnly()){ // DataSourceContextHolder.read(); // logger.info("dataSource切换到:Read"); // }else{ // DataSourceContextHolder.write(); // logger.info("dataSource切换到:write"); // } // joinPoint.proceed(); // } }
-
测试
@Override public User queryUsersByUsername(String userName) {return userDao.queryByName(userName); }@Transactional() @Override public void updateUserInfoById(User user) {userDao.updateBySelective(user);userDao.updateById(user.getId()); }
寄语
这里只是简单的实现了数据库的读写分离,很多细节问题没有具体实现,比如某个DB宕掉了时的处理,负载均衡只是简单的采用轮训方式,性能的统计,sql语句黑白名单等等问题。