JRT实现缓存协议

上一篇介绍的借助ORM的增、删、改和DolerGet方法,ORM可以很精准的知道热点数据做内存缓存。那么就有一个问题存在,即部署了多个站点时候,如果用户在一个Web里修改数据了,那么其他Web的ORM是不知道这个变化的,其他Web还是缓存的老数据的话就会造成其他Web命中的缓存数据是老的,造成不可靠问题。

那么就需要一种机制来解决多Web缓存不一致问题,参照ECP实现,把Web分为主服务器和从服务器。主服务器启动TCP服务端,从服务器启动TCP客户端连接主服务器,从服务器和主服务器之间一直保留TCP长连接用来通知缓存变化数据。这样在一个服务器增、删、改数据后就能及时通知其他服务器更新缓存,这样所有服务器的缓存数据都是可信赖的。

首先提取发送数据的对象类型
在这里插入图片描述

然后实现ECP管理类ECPManager来管理启动服务端和客户端

package JRT.DAL.ORM.Global;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.ECPDto;/*** 企业缓存协议,基于TCP在多台服务器直接共享缓存的更新,从而确保每个Web下的缓存数据的可靠性,这里实现TCP服务端当做小机,其他服务当做客户端和小机连接。* 小机会分发增加数据(增加数据分发与否倒是不影响缓存可靠性,主要是修改和删除)、修改数据、删除数据的执行记录给每个客户端,客户端按收到的记录把数据推入缓存*/
public class ECPManager {/*** 要推入TCP的缓存队列*/public static ConcurrentLinkedDeque ECPQuen = new ConcurrentLinkedDeque();/*** 主处理进程*/private static Thread MainThread = null;/*** 发送数据的操作对象*/public static Socket Sender = null;/*** 写操作对象*/public static PrintWriter Writer = null;/*** 数据编码*/private static String Encode="GBK";/*** 缓存所有客户端*/private static ConcurrentHashMap<String,Socket> AllClient= new ConcurrentHashMap<>();/*** IP地址*/private static String IP="";/*** 端口*/private static  int Port=1991;/*** 是否启用了ECP*/private static boolean UseEcp=false;/*** 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可** @param obj* @throws Exception*/public static void InECPToQuen(ECPDto obj) throws Exception{ECPQuen.add(obj);}/*** push目前的ECP缓存数据到远程服务器,供GlobalManager定时器定时推送*/public static void TryPushEcp() throws Exception{try {//如果是客户端,先检查连接if (!IP.isEmpty()) {//重连if (Sender == null || Sender.isClosed()) {LogUtils.WriteSecurityLog("ECP尝试重连:" + IP + ":" + Port);StartEcpManager(IP, Port);Thread.sleep(1000);}if (Sender == null || Sender.isClosed()) {return;} else {LogUtils.WriteSecurityLog("ECP尝试重连成功:" + IP + ":" + Port);}}List<ECPDto> pushList=null;//从缓存队列里面弹出数据pushwhile (ECPQuen.size() > 0) {ECPDto obj = (ECPDto)ECPQuen.pop();if (obj != null) {//启用了ECP就push到服务端,否则就只更新自己缓存if(UseEcp==true){//初始化push列表if(pushList==null){pushList=new ArrayList<>();}pushList.add(obj);}else{//转换成数据实体推入缓存JRT.DAL.ORM.Global.GlobalManager.InCache(obj);}}}//直接列表一次推送,没有启用ECP的话这个列表一直是空if(pushList!=null){//客户端推送if (!IP.isEmpty()) {Writer.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));Writer.flush();}//服务端推送else {//给每个连接的客户端推送信息for (String ip : AllClient.keySet()) {Socket oneClient = AllClient.get(ip);//移除关闭的客户端if (oneClient.isClosed()) {AllClient.remove(ip);}PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);oneWriter.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));oneWriter.flush();}}}}catch (Exception ex){LogUtils.WriteExceptionLog("推送数据到ECP异常", ex);}}/*** 启动ECP管理* @param ip* @param port*/public static void StartEcpManager(String ip, int port){IP=ip;Port=port;//当客户端if (!ip.isEmpty()) {MainThread = new Thread(new Runnable() {@Overridepublic void run() {//得到输入流InputStream inputStream = null;//创建Socket对象并连接到服务端Socket socket = null;try {//创建Socket对象并连接到服务端socket = new Socket(ip, port);Sender = socket;Writer = new PrintWriter(new OutputStreamWriter(Sender.getOutputStream(), Encode), false);//得到输入流inputStream = socket.getInputStream();//IO读取byte[] buf = new byte[10240];int readlen = 0;//阻塞读取数据while ((readlen = inputStream.read(buf)) != -1) {try {String ecpJson = new String(buf, 0, readlen, Encode);//得到ECP实体List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);if(dtoList!=null&&dtoList.size()>0){for(ECPDto dto:dtoList) {//转换成数据实体推入缓存JRT.DAL.ORM.Global.GlobalManager.InCache(dto);}}}catch (Exception ee){LogUtils.WriteExceptionLog("ECP处理主服务器数据异常", ee);}}}catch (IOException ex) {LogUtils.WriteExceptionLog("ECP侦听TCP服务异常", ex);}finally {Sender=null;try {if (inputStream != null) {//关闭输入inputStream.close();}if (Writer != null) {Writer.flush();//关闭输出Writer.close();Writer=null;}if (socket != null) {// 关闭连接socket.close();}}catch (Exception ex) {LogUtils.WriteExceptionLog("释放TCP资源异常", ex);}}}});}//当服务端else {MainThread = new Thread(new Runnable() {@Overridepublic void run() {//得到输入流InputStream inputStream = null;//创建Socket对象并连接到服务端Socket socket = null;try {ServerSocket serverSocket = new ServerSocket(port);//增加一个无限循环while (true) {//等待客户端连接,阻塞Socket clientSocket = serverSocket.accept();//按IP存客户端连接String clientIP=clientSocket.getInetAddress().getHostAddress();AllClient.put(clientIP,clientSocket);//接收客户端消息Thread ClientThread = new Thread(new Runnable() {@Overridepublic void run() {try {//得到输出流Writer = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream(), Encode), false);//得到输入流InputStream inputStream = clientSocket.getInputStream();//IO读取byte[] buf = new byte[10240];int readlen = 0;//阻塞读取数据while ((readlen = inputStream.read(buf)) != -1) {String ecpJson = new String(buf, 0, readlen, Encode);//得到ECP实体List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);if(dtoList!=null&&dtoList.size()>0){for(ECPDto dto:dtoList) {//转换成数据实体推入缓存JRT.DAL.ORM.Global.GlobalManager.InCache(dto);}//给每个连接的客户端推送信息for (String ip : AllClient.keySet()) {Socket oneClient = AllClient.get(ip);//移除关闭的客户端if (oneClient.isClosed()) {AllClient.remove(ip);}PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);oneWriter.print(ecpJson);oneWriter.flush();}}}}catch (Exception ee){LogUtils.WriteExceptionLog("ECP处理客户端数据异常", ee);}}});ClientThread.start();}}catch (IOException ex) {LogUtils.WriteExceptionLog("侦听仪器TCP异常", ex);}finally {try {if (inputStream != null) {//关闭输入inputStream.close();}if (Writer != null) {Writer.flush();//关闭输出Writer.close();}if (socket != null) {// 关闭连接socket.close();}} catch (Exception ex) {LogUtils.WriteExceptionLog("释放TCP资源异常", ex);}}}});}//启动主进程MainThread.start();UseEcp=true;}/*** 返回ECP待处理队列的json数据供调试看是否符合预期* @return*/public static String ViewECPQuenDate() throws Exception{return JRT.Core.Util.JsonUtil.Object2Json(ECPQuen);}}

ECP管理类再对接上GlobalManager

package JRT.DAL.ORM.Global;import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;import JRT.Core.MultiPlatform.JRTConfigurtaion;
import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.OneGlobalNode;
import JRT.DAL.ORM.Global.ECPDto;/*** 实现内存模拟global的效果*/
public class GlobalManager {/*** 在内存里缓存热点数据*/private static ConcurrentHashMap<String, ConcurrentHashMap<String, OneGlobalNode>> AllHotData = new ConcurrentHashMap<>();/*** 要缓存数据的队列*/private static ConcurrentLinkedDeque TaskQuen = new ConcurrentLinkedDeque();/*** 管理缓存的定时器*/private static Timer ManageTimer = new Timer();/*** 缓存的最大对象数量*/public static Integer GlobalCacheNum = 100000;/*** 当前的缓存数量*/private static AtomicInteger CurCacheNum=new AtomicInteger(0);/*** 最后删除数据的时间*/private static Long LastDeleteTime=null;/*** 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可** @param obj* @throws Exception*/public static void InCache(ECPDto obj) throws Exception{TaskQuen.add(obj);}/*** 通过主键查询数据* @param model* @param id* @param <T>* @return* @throws Exception*/public static <T> T DolerGet(T model,Object id) throws Exception{//实体的名称String modelName = model.getClass().getName();if(AllHotData.containsKey(modelName)){//命中数据,克隆返回if(AllHotData.get(modelName).containsKey(id)){OneGlobalNode node=AllHotData.get(modelName).get(id);//更新时间node.Time=JRT.Core.Util.TimeParser.GetTimeInMillis();Object retObj=JRT.Core.Util.JsonUtil.CloneObject(node.Data);return (T)retObj;}}return null;}/*** 启动缓存数据管理的线程*/public static void StartGlobalManagerTask() throws Exception{//最大缓存数量String GlobalCacheNumConf = JRTConfigurtaion.Configuration("GlobalCacheNum");if (GlobalCacheNumConf != null && !GlobalCacheNumConf.isEmpty()) {GlobalCacheNum = JRT.Core.Util.Convert.ToInt32(GlobalCacheNumConf);}//定时任务TimerTask timerTask = new TimerTask() {@Overridepublic void run() {try {//缓存队列的数据并入缓存while (TaskQuen.size() > 0) {//处理要加入缓存的队列DealOneDataQuen();}//尝试推送ECP数据到主服务JRT.DAL.ORM.Global.ECPManager.TryPushEcp();//清理多余的缓存数据,这里需要讲究算法,要求在上百万的缓存数据里快速找到时间最久远的数据if(CurCacheNum.get()>GlobalCacheNum){//每轮清理时间处于上次清理时间和当前时间前百分之5的老数据long Diff=(JRT.Core.Util.TimeParser.GetTimeInMillis()-LastDeleteTime)/20;//留下数据的最大时间long LeftMaxTime=LastDeleteTime+Diff;//遍历所有的热点数据for (String model : AllHotData.keySet()) {ConcurrentHashMap<String, OneGlobalNode> oneTableHot=AllHotData.get(model);//记录要删除的数据List<String> delList=new ArrayList<>();for (String key : oneTableHot.keySet()) {OneGlobalNode one=oneTableHot.get(key);//需要删除的数据if(one.Time<LeftMaxTime){delList.add(key);}}//移除时间久的数据for(String del:delList){oneTableHot.remove(del);}}}//清理时间久远的缓存数据} catch (Exception ex) {LogUtils.WriteExceptionLog("处理Global缓存异常", ex);}}};//尝试启动ECP管理器String IP = JRTConfigurtaion.Configuration("ECPIP");String Port = JRTConfigurtaion.Configuration("ECPPort");if(Port!=null&&!Port.isEmpty()){if(IP==null){IP="";}//启动ECP管理器JRT.DAL.ORM.Global.ECPManager.StartEcpManager(IP,JRT.Core.Util.Convert.ToInt32(Port));}//启动缓存管理定时器ManageTimer.schedule(timerTask, 0, 500);}/*** 通过实体名称获得实体类型信息** @param modelName 实体名称* @return*/private static Class GetTypeByName(String modelName) throws Exception {return JRT.Core.Util.ReflectUtil.GetType(JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName") + ".Entity." + modelName, JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName"));}/*** 处理队列里的一条数据并入缓存*/private static void DealOneDataQuen() {try {Object obj = TaskQuen.pop();if (obj != null) {ECPDto dto=(ECPDto)obj;//添加或者更新缓存if(dto.Cmd.equals("A")||dto.Cmd.equals("U")) {Class type = GetTypeByName(dto.Model);Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);//实体的名称String modelName = dto.Model;//得到数据的主键String id = tableInfo.ID.Value.toString();if (!AllHotData.containsKey(modelName)) {ConcurrentHashMap<String, OneGlobalNode> map = new ConcurrentHashMap<>();AllHotData.put(modelName, map);}//更新数据if (AllHotData.get(modelName).containsKey(id)) {AllHotData.get(modelName).get(id).Data = entity;AllHotData.get(modelName).get(id).Time = JRT.Core.Util.TimeParser.GetTimeInMillis();}//加入到缓存else {OneGlobalNode node = new OneGlobalNode();node.Data = entity;node.Time = JRT.Core.Util.TimeParser.GetTimeInMillis();AllHotData.get(modelName).put(id, node);//缓存数量加1CurCacheNum.addAndGet(1);//记录时间if (LastDeleteTime == null) {LastDeleteTime = JRT.Core.Util.TimeParser.GetTimeInMillis();}}}//删除缓存else if(dto.Cmd.equals("D")){Class type = GetTypeByName(dto.Model);Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);//实体的名称String modelName = dto.Model;//得到数据的主键String id = tableInfo.ID.Value.toString();if (AllHotData.containsKey(modelName)&&AllHotData.get(modelName).containsKey(id)) {AllHotData.get(modelName).remove(id);}}//清空表缓存else if(dto.Cmd.equals("CLEAR")){//实体的名称String modelName = dto.Model;if (AllHotData.containsKey(modelName)) {AllHotData.get(modelName).clear();}}}}catch (Exception ex) {LogUtils.WriteExceptionLog("处理Global缓存添加异常", ex);}}/*** 返回Global的json数据供调试看是否符合预期* @return*/public static String ViewGlobalJson() throws Exception{return JRT.Core.Util.JsonUtil.Object2Json(AllHotData);}/*** 返回Global待处理队列的json数据供调试看是否符合预期* @return*/public static String ViewGlobalTaskQuenDate() throws Exception{return JRT.Core.Util.JsonUtil.Object2Json(TaskQuen);}}

增删改和DolerGet调整

/*** 保存对象,不抛异常,执行信息通过参数输出** @param entity   实体对象* @param outParam 输出执行成功或失败信息,执行成功时输出执行记录主键* @param <T>      实体类型约束* @return影响行数*/@Overridepublic <T> int Save(T entity, OutValue outValue, OutParam outParam) throws Exception {int row = 0;PreparedStatement stmt = null;boolean innerT = false;     //标识是否内部开启事务String sql = "";try {//根据实体对象获取表信息JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);HashParam hash = new HashParam();//获取插入SQL语句sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetInsertSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, hash, false);//写SQL日志JRT.Core.Util.LogUtils.WriteSqlLog("执行插入SQL:" + sql + ";SQL参数:" + JRT.Core.Util.JsonUtil.Object2Json(hash.GetParam()));//获取ID列String idKey = tableInfo.ID.Key;//声明式SQL,并设置参数if (!idKey.isEmpty()) {stmt = Manager().Connection().prepareStatement(sql, new String[]{idKey});} else {stmt = Manager().Connection().prepareStatement(sql);}String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());row = stmt.executeUpdate();ResultSet rowID = stmt.getGeneratedKeys();JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);//保存成功返回记录主键,返回影响记录数 1if (row == 1) {if (rowID.next() && (!idKey.isEmpty())) {outValue.Value = rowID.getInt(idKey);//设置RowID到实体JRT.Core.Util.ReflectUtil.SetObjValue(entity, tableInfo.ID.Key, rowID.getInt(idKey));//尝试把数据推入缓存队列Manager().TryPushToCache(entity, "A");}} else {outParam.Code = OutStatus.ERROR;outParam.Message = "保存数据失败,执行保存返回:" + row;}return row;} catch (Exception ex) {outParam.Code = OutStatus.ERROR;//操作异常,判断如果开启事务,则回滚事务if (Manager().Hastransaction) {if (!Manager().RollTransaction()) {outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";}}outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + "执行SQL:" + sql;}//操作结束释放资源finally {if (stmt != null) {stmt.close();}//如果上层调用未开启事务,则调用结束释放数据库连接if (!Manager().Hastransaction) {Manager().Close();}}return row;}/*** 更新实体对象** @param entity        实体对象* @param param         更新条件,有条件就按条件更新,没有条件就按主键更新* @param outParam      输出执行成功或失败的信息* @param updateColName 更新属性名集合,无属性则更新实体的所有属性* @param joiner        连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and* @param operators     操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=* @param <T>           类型限定符* @return 影响行数*/@Overridepublic <T> int Update(T entity, HashParam param, OutParam outParam, List<String> updateColName, List<String> joiner, List<String> operators) throws Exception {PreparedStatement stmt = null;if (outParam == null) outParam = new OutParam();int row = 0;boolean innerT = false;     //标识是否内部开启事务try {//根据实体获取表信息JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);HashParam hash = new HashParam();//获取更新的SQL语句String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetUpdateSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, updateColName, joiner, operators, hash);//写SQL日志JRT.Core.Util.LogUtils.WriteSqlLog("执行更新SQL:" + sql);//声明式SQL,并且设置参数stmt = Manager().Connection().prepareStatement(sql);String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());row = stmt.executeUpdate();JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);if (row == 1) {//尝试把数据推入缓存队列Manager().TryPushToCache(entity, "U");}outParam.Code = OutStatus.SUCCESS;outParam.Message = "更新数据成功。";return row;} catch (Exception ex) {//操作异常,判断如果开启了事务,就回滚事务outParam.Code = OutStatus.ERROR;if (Manager().Hastransaction) {if (!Manager().RollTransaction()) {outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";}}outParam.Message = "更新数据失败!" + ex.getCause().getMessage();}//操作结束释放资源finally {if (stmt != null) {stmt.close();}//如果上层调用未开启事务,则调用结束释放数据库连接if (!Manager().Hastransaction) {Manager().Close();}}return row;}/*** 根据条件删除记录** @param entity    实体对象* @param param     删除条件,有条件按条件删除,没有条件按主键删除* @param outParam  输出执行成功或失败的信息* @param joiner    多条件逻辑连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and* @param operators 操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=* @param <T>       类型限定符* @return 影响行数*/@Overridepublic <T> int Remove(T entity, HashParam param, OutParam outParam, List<String> joiner, List<String> operators) throws Exception {PreparedStatement stmt = null;if (outParam == null) outParam = new OutParam();int row = 0;try {//根据实体对象获取表信息JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);HashParam hash = new HashParam();//获取删除SQL语句String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetDeleteSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, joiner, operators, hash);//写SQL日志JRT.Core.Util.LogUtils.WriteSqlLog("执行删除SQL:" + sql);//声明式SQL,并设置参数stmt = Manager().Connection().prepareStatement(sql);String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());row = stmt.executeUpdate();if (row == 1) {//尝试把数据推入缓存队列Manager().TryPushToCache(entity, "D");}JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);outParam.Code = OutStatus.SUCCESS;outParam.Message = "删除数据成功。";return row;} catch (Exception ex) {//操作异常,判断如果开启了事务,就回滚事务outParam.Code = OutStatus.ERROR;if (Manager().Hastransaction) {if (!Manager().RollTransaction()) {outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";}}outParam.Message = "更新数据失败!" + ex.getCause().getMessage();}//操作结束释放资源finally {if (stmt != null) {stmt.close();}//如果上层调用未开启事务,则调用结束释放数据库连接if (!Manager().Hastransaction) {Manager().Close();}}return row;}/*** 通过主键查询数据,带缓存的查询,用来解决关系库的复杂关系数据获取,顶替Cache的$g** @param model 实体* @param id    主键* @param <T>* @return* @throws Exception*/public <T> T DolerGet(T model, Object id) throws Exception {T ret = GlobalManager.DolerGet(model, id);//命中缓存直接返回if (ret != null) {return ret;}else {//调用数据库查询ret = GetById(model, id);//找到数据,推入缓存if(ret!=null) {ECPDto dto = new ECPDto();dto.Cmd = "A";dto.Model = ret.getClass().getSimpleName();dto.Data = JRT.Core.Util.JsonUtil.Object2Json(ret);//通知存入缓存GlobalManager.InCache(dto);}}return ret;}/*** 通过主键查询数据,主业务数据没找到会按历史切割数量找历史表** @param model* @param id* @param <T>* @return* @throws Exception*/public <T> T GetById(T model, Object id) throws Exception {JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);List<ParamDto> param = new ArrayList<>();ParamDto p = new ParamDto();p.Key = tableInfo.ID.Key;p.Value = id;param.add(p);//创建实体集合List<T> lstObj = FindAll(model, param, "", -1, -1, "", null, null);//结果为空,返回一个新建的对象if (lstObj.size() == 0) {//从历史表取数据String HisTableName = tableInfo.TableInfo.HisTableName();if (!HisTableName.isEmpty()) {int cutNum=0;//指定了切割列按切割列切割if(!tableInfo.TableInfo.CutHisColName().isEmpty()){cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());}else{cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());}//除以历史页大小算到数据该放入哪个历史表int hisNum = cutNum/HistoryPage;//分割所有历史实体String[] HisTableNameArr = HisTableName.split("^");//存放页小于所有历史表数据就做移动if (hisNum < HisTableNameArr.length) {String HisModelName = HisTableNameArr[hisNum];//得到历史表的实体Class cHis = GetTypeByName(HisModelName);//克隆得到历史表的对象Object hisData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);//创建实体集合List<T> lstHisObj = DolerFindAll(0,hisData, param, "", -1, -1, "", null, null);//结果为空,返回一个新建的对象if (lstHisObj.size() > 0) {return lstHisObj.get(0);}}}return null;}//否则返回第一个实体else {return lstObj.get(0);}}/*** 把数据安装维护的历史表大小移入历史表** @param model 实体数据* @param <T>   泛型* @return 是否成功* @throws Exception*/public <T> boolean MoveToHistory(T model) throws Exception {JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);String HisTableName = tableInfo.TableInfo.HisTableName();if (!HisTableName.isEmpty()) {//分割所有历史实体String[] HisTableNameArr = HisTableName.split("^");int cutNum=0;//指定了切割列按切割列切割if(!tableInfo.TableInfo.CutHisColName().isEmpty()){cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());}else{cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());}//除以历史页大小算到数据该放入哪个历史表int hisNum = cutNum/HistoryPage;//存放页小于所有历史表数据就做移动if (hisNum < HisTableNameArr.length) {String HisModelName = HisTableNameArr[hisNum];//得到历史表的实体Class cHis = GetTypeByName(HisModelName);//克隆得到历史表的对象Object newData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);OutParam out = new OutParam();//保存历史数据int saveRet = Save(newData, out);if (saveRet == 1) {saveRet = Remove(model, out);}if (saveRet == 1) {return true;}}}return false;}

连接管理类调整,根据是否带事务在操作执行成功后把数据推入ECP队列供Global管理器往主服务推送分发

package JRT.DAL.ORM.DBUtility;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;import JRT.DAL.ORM.DBUtility.C3P0Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import JRT.DAL.ORM.DBUtility.IDbFactory;
import JRT.DAL.ORM.Global.ECPDto;/*** 连接和事务管理*/
public class DBManager {/*** 驱动名称*/private String factoryName="";/*** 当前对象的驱动*/private IDbFactory factory=null;/*** 存数据库连接对象*/private Connection connection=null;/*** 要转入缓存的临时数据*/private List<ECPDto> ToEcpTmp=null;/*** 尝试把数据推入缓存队列* @param obj 对象* @param Oper 操作 A:添加  U:更新  D:删除* @throws Exception*/public void TryPushToCache(Object obj,String Oper) throws Exception{ECPDto dto=new ECPDto();dto.Cmd=Oper;dto.Model=obj.getClass().getSimpleName();dto.Data=JRT.Core.Util.JsonUtil.Object2Json(obj);//有事务就推缓存if(Hastransaction=true){if(ToEcpTmp==null){ToEcpTmp=new ArrayList<>();}ToEcpTmp.add(dto);}else{//没事务的直接推入缓存队列JRT.DAL.ORM.Global.ECPManager.InECPToQuen(dto);}}/*** 为每个数据库驱动存储工厂*/private static ConcurrentHashMap<String, IDbFactory> hsFact = new ConcurrentHashMap<>();/*** 为每个数据库驱动存储连接池*/private static ConcurrentHashMap<String, ComboPooledDataSource> hsPoll = new ConcurrentHashMap<>();/*** 得到驱动对象* @param factoryName* @return*/public IDbFactory GetIDbFactory(String factoryName){if(factory==null){factory=hsFact.get(factoryName);}return factory;}/*** 尝试初始化连接池* @param factoryName*/public static void TryInitConnPool(String factoryName) throws Exception{if(factoryName==""){factoryName="LisMianDbFactory";}if(!hsPoll.containsKey(factoryName)){IDbFactory factory=JRT.Core.Context.ObjectContainer.GetTypeObject(factoryName);hsPoll.put(factoryName,C3P0Util.GetConnPool(factory));if(!hsFact.containsKey(factoryName)){hsFact.put(factoryName,factory);}}}/*** 构造函数* @param factName 驱动配置名称* @throws Exception*/public DBManager(String factName) throws Exception{factoryName=factName;TryInitConnPool(factoryName);}/*** 存数据库连接对象*/public Connection Connection() throws Exception{if(connection==null){connection=hsPoll.get(factoryName).getConnection();}return connection;}/*** 标识是否开启事务*/public boolean Hastransaction = false;/*** 存储开启多次事务的保存点,每次调用BeginTransaction开启事务是自动创建保存点*/public LinkedList<Savepoint> Transpoints = new LinkedList<Savepoint>();/*** 获取开启的事务层级* @return*/public int GetTransactionLevel(){return this.Transpoints.size();}/*** 释放数据库连接* @return true成功释放,false释放失败*/public boolean Close() throws Exception{if(connection!=null){connection.setAutoCommit(true);connection.close();}connection=null;return true;}/*** 此方法开启事务* @return  true开启事务成功,false开始事务失败*/public boolean BeginTransaction() throws Exception{try{this.Connection().setAutoCommit(false);this.Hastransaction = true;Savepoint savepoint = this.Connection().setSavepoint();Transpoints.addLast(savepoint);return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("开启事务失败!" + sqle.getMessage(), sqle);}return false;}/*** 回滚上一层事务* @return true回滚事务成功,false回滚事务失败*/public boolean RollTransaction() throws Exception{//删除临时数据if(ToEcpTmp!=null) {ToEcpTmp.clear();ToEcpTmp=null;}//未开启事务时,算回滚事务成功if (!this.Hastransaction){return true;}try{if (this.Transpoints.size() == 0){this.Connection().rollback();this.Hastransaction = false;}else{Savepoint point = this.Transpoints.poll();this.Connection().rollback(point);}return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("事务回滚失败!" + sqle.getMessage(),sqle);throw sqle;}finally{if (!this.Hastransaction){Close();}}}/*** 回滚开启的全部事务* @return true回滚事务成功,false回滚事务失败*/public boolean RollTransactionAll() throws Exception{//删除临时数据if(ToEcpTmp!=null) {ToEcpTmp.clear();ToEcpTmp=null;}//未开启事务时,算回滚事务成功if (!this.Hastransaction){return true;}try{this.Connection().rollback();this.Hastransaction = false;return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("回滚所有事务层级失败!" + sqle.getMessage(),sqle);throw sqle;}finally{Close();}}/*** 提交事务* @return true提交事务成功,false提交事务失败*/public boolean CommitTransaction() throws Exception{try{//临时数据推入缓存if(ToEcpTmp!=null){for(ECPDto obj:ToEcpTmp){//没事务的直接推入缓存队列JRT.DAL.ORM.Global.ECPManager.InECPToQuen(obj);}}this.Connection().commit();this.Hastransaction = false;return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("提交事务失败!" + sqle.getMessage(),sqle);}finally{//提交事务,不论成功与否,释放数据库连接try{Close();}catch (Exception ex){}}return false;}}

增加的配置
在这里插入图片描述

这样就有一个理论上比较靠谱的缓存机制了,业务用SQL查到主列表数据后,调用DolerGet的获得各种周边相关数据来组装给前台返回,就不用每个写业务的人自己考虑写复杂的级联查询数据库受不了,自己缓存数据时候缓存是否可靠,自己不缓存数据时候调用太多数据库交互又慢的问题。DolerGet基本可以满足比较无脑的多维取数据组装的要求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/183538.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

webpack如何处理浏览器的样式兼容问题postcss

一、准备工作 css/index.css添加样式 .word {color: red;user-select: none; } 为了兼容不同的浏览器我们需要添加前缀比如&#xff1a; -webkit-user-select: none; 这个工作可以通过postcss的插件postcss-preset-env处理 二、安装依赖 pnpm i -D postcss postcss-loader…

接口测试用例编写和接口测试模板

一、简介 接口测试区别于传统意义上的系统测试&#xff0c;下面介绍接口测试用例和接口测试报告。 二、接口测试用例模板 功能测试用例最重要的两个因素是测试步骤和预期结果&#xff0c;接口测试属于功能测试&#xff0c;所以同理。接口测试的步骤中&#xff0c;最重要的是将…

MySQL中的JOIN与IN:性能对比与最佳实践

文章目录 JOIN与IN的基本介绍JOININ JOIN与IN性能对比使用JOIN的查询使用IN的查询 何时使用JOIN何时使用IN性能优化的其他考虑因素总结 &#x1f389;MySQL中的JOIN与IN&#xff1a;性能对比与最佳实践 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&#xff1a;IT陈…

<蓝桥杯软件赛>零基础备赛20周--第8周第1讲--十大排序

报名明年4月蓝桥杯软件赛的同学们&#xff0c;如果你是大一零基础&#xff0c;目前懵懂中&#xff0c;不知该怎么办&#xff0c;可以看看本博客系列&#xff1a;备赛20周合集 20周的完整安排请点击&#xff1a;20周计划 每周发1个博客&#xff0c;共20周&#xff08;读者可以按…

身份验证和电子邮件的网络安全即将迎来地震

任何拥有 Gmail 或 Yahoo 电子邮件帐户的人都清楚&#xff0c;如果不是明确的欺诈企图&#xff0c;他们的收件箱中可能充满了未经请求的邮件。 这些服务的用户很可能多次想知道他们的提供商是否可以采取措施至少减少垃圾邮件的数量以及随之而来的诈骗风险。 好消息是&#xf…

多模态大模型总结1(2021和2022年)

常用损失函数 ITC &#xff08;image-text contrasctive loss&#xff09; CLIP中采用的对比损失&#xff0c;最大化配对文本对的余弦相似度&#xff0c;最小化非配对文本对的余弦相似度&#xff0c;采用交叉熵损失实现 MLM &#xff08;masked language modeling&#xff0…

重生奇迹mu召唤师攻略

一、技能系统 1、技能大类&#xff1a;召唤师主要有火焰职业技能和水元素技能。 2、火焰职业技能&#xff1a;主要是使用火焰元素进行攻击&#xff0c;可以攻击单个目标&#xff0c;也可以同时攻击多个目标。 3、水元素技能&#xff1a;主要是对多个敌方单位使用水元素&…

ZFPlayer 播放视频的时候的视图层级

未播放的时候 首先看正常展示的时候&#xff0c;还没又开始播放 这个时候我们打开图层看一下&#xff0c;发现视频时长和播放按钮都是放在 视频封面图上的 播放的时候 我们看到的播放视频的画面 我们发现&#xff0c;我们之前在未播放状态看到的视图&#xff0c;仍然还在…

Linux入门

什么是Linux&#xff1f; Linux是一种免费、开源的操作系统内核 最初由芬兰计算机科学家 李纳斯托瓦兹 (Linus Torvalds)在1991年创建 Linux内核最初是为个人电脑设计的&#xff0c;如今已普及到服务器、超级计算机、移动设备等各种硬件平台 由于Linux是自由软件&#xff08;自…

logcat日志的使用——Qt For Android

前言 最近一直用qt开发安卓app&#xff0c;一直无法用真机调试&#xff0c;可能是缺什么东西。但是如果通过Qt Creator在真机上运行&#xff0c;可以在电脑控制台看打印&#xff08;安卓本身的日志、qDebug之类的打印&#xff09;&#xff0c;所以我是通过打印猜测问题所在&am…

在零信任架构下的API安全与滥用防护(上)

引言 在当今数字化的浪潮中&#xff0c;应用程序编程接口&#xff08;API&#xff09;的战略重要性愈发凸显。API不仅仅是现代软件和互联网服务之间沟通的桥梁&#xff0c;更是企业价值创造的核心。随着API的快速发展和广泛应用&#xff0c;安全问题随之而来&#xff0c;其中A…

连锁零售企业如何提高异地组网的稳定性?

随着数字化时代的到来&#xff0c;连锁零售企业面临着日益复杂和多样化的网络挑战。连锁零售企业是在不同地理位置拥有分支机构和零售店&#xff0c;可能同城或异地&#xff0c;需要确保各个地点之间的网络连接稳定和可靠。但由于不同地区的网络基础设施差异、网络延迟和带宽限…

【洛谷算法题】P5716-月份天数【入门2分支结构】

&#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5716-月份天数【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格式&a…

云时空社会化商业 ERP 系统 gpy 文件上传漏洞复现

0x01 产品简介 时空云社会化商业ERP&#xff08;简称时空云ERP&#xff09; &#xff0c;该产品采用JAVA语言和Oracle数据库&#xff0c; 融合用友软件的先进管理理念&#xff0c;汇集各医药企业特色管理需求&#xff0c;通过规范各个流通环节从而提高企业竞争力、降低人员成本…

Day45力扣打卡

打卡记录 无矛盾的最佳球队&#xff08;线性DP&#xff09; class Solution:def bestTeamScore(self, scores: List[int], ages: List[int]) -> int:n len(scores) nums sorted(zip(scores, ages))f sorted(scores)for i in range(n):for j in range(0, i):if nu…

无人机覆盖路径规划综述

摘要&#xff1a;覆盖路径规划包括找到覆盖某个目标区域的每个点的路线。近年来&#xff0c;无人机已被应用于涉及地形覆盖的多个应用领域&#xff0c;如监视、智能农业、摄影测量、灾害管理、民事安全和野火跟踪等。本文旨在探索和分析文献中与覆盖路径规划问题中使用的不同方…

好用的chatgpt工具用过这个比较快

chatgpthttps://www.askchat.ai?r237422 chatGPT能做什么 1. 对话和聊天&#xff1a;我可以与您进行对话和聊天&#xff0c;回答您的问题、提供信息和建议。 2. 问题回答&#xff1a;无论是关于事实、历史、科学、文化、地理还是其他领域的问题&#xff0c;我都可以尽力回答…

关于前端学习的思考-内边距、边框和外边距

从最简单的盒子开始思考 先把实际应用摆出来&#xff1a; margin&#xff1a;居中&#xff0c;控制边距。 padding&#xff1a;控制边距。 border&#xff1a;制作三角形。 盒子分为内容盒子&#xff0c;内边距盒子&#xff0c;边框和外边距。 如果想让块级元素居中&#…

【排序】希尔排序(C语言实现)

文章目录 前言1. 希尔排序的思想2. 希尔排序的一些小优化 前言 本章将详细介绍希尔排序的思想及实现&#xff0c;由于希尔排序是在插入排序的思想上进行升华&#xff0c;所以如果不知道插入排序或者不熟悉的可以先看看这篇文章&#xff1a;《简单排序》中的直接插入排序。 1. 希…

《golang设计模式》第三部分·行为型模式-09-策略模式(Strategy)

文章目录 1. 概述1.1 作用1.1 角色1.2 类图 2. 代码示例2.1 设计2.2 代码2.3 类图 1. 概述 1.1 作用 策略&#xff08;Strategy&#xff09;是用于封装一组算法中单个算法的对象&#xff0c;这些策略可以相互替换&#xff0c;使得单个算法的变化不影响使用它的客户端。 1.1 …