opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

0cc2691b8bdc8d27dd8090a59a7ed182.png

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{

"eventId": 1,

"databaseName": "canal_test",

"tableName": "`company`",

"eventType": 2,

"timestamp": 1477033198000,

"timestampReceipt": 1477033248780,

"binlogName": "mysql-bin.000006",

"position": 353,

"nextPostion": 468,

"serverId": 2,

"before": null,

"after": null,

"isDdl": true,

"sql": "DROP TABLE `company` /* generated by server */"}

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{

"eventId": 0,

"databaseName": "canal_test",

"tableName": "person",

"eventType": 24,

"timestamp": 1477030734000,

"timestampReceipt": 1477032161988,

"binlogName": "mysql-bin.000006",

"position": 242,

"nextPostion": 326,

"serverId": 2,

"before": {

"id": "3",

"sex": "f",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"after": {

"id": "3",

"sex": "m",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"isDdl": false,

"sql": null}

相关的类文件如下:

CDCEvent.java

package or;

import java.util.Map;

import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.BinlogEventV4Header;

import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {

private long eventId = 0;//事件唯一标识

private String databaseName = null;

private String tableName = null;

private int eventType = 0;//事件类型

private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]

private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]

private String binlogName = null;// binlog file name

private long position = 0;

private long nextPostion = 0;

private long serverId = 0;

private Map before = null;

private Map after = null;

private Boolean isDdl= null;

private String sql = null;

private static AtomicLong uuid = new AtomicLong(0);

public CDCEvent(){}

public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){

this.init(are);

this.databaseName = databaseName;

this.tableName = tableName;

}

private void init(final BinlogEventV4 be){

this.eventId = uuid.getAndAdd(1);

BinlogEventV4Header header = be.getHeader();

this.timestamp = header.getTimestamp();

this.eventType = header.getEventType();

this.serverId = header.getServerId();

this.timestampReceipt = header.getTimestampOfReceipt();

this.position = header.getPosition();

this.nextPostion = header.getNextPosition();

this.binlogName = header.getBinlogFileName();

}

@Override

public String toString(){

StringBuilder builder = new StringBuilder();

builder.append("{ eventId:").append(eventId);

builder.append(",databaseName:").append(databaseName);

builder.append(",tableName:").append(tableName);

builder.append(",eventType:").append(eventType);

builder.append(",timestamp:").append(timestamp);

builder.append(",timestampReceipt:").append(timestampReceipt);

builder.append(",binlogName:").append(binlogName);

builder.append(",position:").append(position);

builder.append(",nextPostion:").append(nextPostion);

builder.append(",serverId:").append(serverId);

builder.append(",isDdl:").append(isDdl);

builder.append(",sql:").append(sql);

builder.append(",before:").append(before);

builder.append(",after:").append(after).append("}");

return builder.toString();

}

// 省略Getter和Setter方法

}

open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:

InstanceListener.java

package or;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.keeper.TableInfoKeeper;

import or.manager.CDCEventManager;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.BinlogEventListener;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.impl.event.DeleteRowsEvent;

import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;

import com.google.code.or.binlog.impl.event.QueryEvent;

import com.google.code.or.binlog.impl.event.TableMapEvent;

import com.google.code.or.binlog.impl.event.UpdateRowsEvent;

import com.google.code.or.binlog.impl.event.WriteRowsEvent;

import com.google.code.or.binlog.impl.event.XidEvent;

import com.google.code.or.common.glossary.Column;

import com.google.code.or.common.glossary.Pair;

import com.google.code.or.common.glossary.Row;

import com.google.code.or.common.util.MySQLConstants;

public class InstanceListener implements BinlogEventListener{

private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);

@Override

public void onEvents(BinlogEventV4 be) {

if(be == null){

logger.error("binlog event is null");

return;

}

int eventType = be.getHeader().getEventType();

switch(eventType){

case MySQLConstants.FORMAT_DESCRIPTION_EVENT:

{

logger.trace("FORMAT_DESCRIPTION_EVENT");

break;

}

case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId

{

TableMapEvent tme = (TableMapEvent)be;

TableInfoKeeper.saveTableIdMap(tme);

logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());

break;

}

case MySQLConstants.DELETE_ROWS_EVENT:

{

DeleteRowsEvent dre = (DeleteRowsEvent) be;

long tableId = dre.getTableId();

logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = dre.getRows();

for(Row row:rows){

List before = row.getColumns();

Map beforeMap = getMap(before,databaseName,tableName);

if(beforeMap !=null && beforeMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.UPDATE_ROWS_EVENT:

{

UpdateRowsEvent upe = (UpdateRowsEvent)be;

long tableId = upe.getTableId();

logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List> rows = upe.getRows();

for(Pair p:rows){

List colsBefore = p.getBefore().getColumns();

List colsAfter = p.getAfter().getColumns();

Map beforeMap = getMap(colsBefore,databaseName,tableName);

Map afterMap = getMap(colsAfter,databaseName,tableName);

if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.WRITE_ROWS_EVENT:

{

WriteRowsEvent wre = (WriteRowsEvent)be;

long tableId = wre.getTableId();

logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = wre.getRows();

for(Row row: rows){

List after = row.getColumns();

Map afterMap = getMap(after,databaseName,tableName);

if(afterMap!=null && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.QUERY_EVENT:

{

QueryEvent qe = (QueryEvent)be;

TableInfo tableInfo = createTableInfo(qe);

if(tableInfo == null)

break;

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);

cdcEvent.setIsDdl(true);

cdcEvent.setSql(qe.getSql().toString());

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

break;

}

case MySQLConstants.XID_EVENT:{

XidEvent xe = (XidEvent)be;

logger.trace("XID_EVENT: xid:{}",xe.getXid());

break;

}

default:

{

logger.trace("DEFAULT:{}",eventType);

break;

}

}

}

/**

* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,

* 然后跟取回的List进行映射。

*

* @param cols

* @param databaseName

* @param tableName

* @return

*/

private Map getMap(List cols, String databaseName, String tableName){

Map map = new HashMap<>();

if(cols == null || cols.size()==0){

return null;

}

String fullName = databaseName+"."+tableName;

List columnInfoList = TableInfoKeeper.getColumns(fullName);

if(columnInfoList == null)

return null;

if(columnInfoList.size() != cols.size()){

TableInfoKeeper.refreshColumnsMap();

if(columnInfoList.size() != cols.size())

{

logger.warn("columnInfoList.size is not equal to cols.");

return null;

}

}

for(int i=0;i

if(cols.get(i).getValue()==null)

map.put(columnInfoList.get(i).getName(),"");

else

map.put(columnInfoList.get(i).getName(), cols.get(i).toString());

}

return map;

}

/**

* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,

* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中

*

* @param qe

* @return

*/

private TableInfo createTableInfo(QueryEvent qe){

String sql = qe.getSql().toString().toLowerCase();

TableInfo ti = new TableInfo();

String databaseName = qe.getDatabaseName().toString();

String tableName = null;

if(checkFlag(sql,"table")){

tableName = getTableName(sql,"table");

} else if(checkFlag(sql,"truncate")){

tableName = getTableName(sql,"truncate");

} else{

logger.warn("can not find table name from sql:{}",sql);

return null;

}

ti.setDatabaseName(databaseName);

ti.setTableName(tableName);

ti.setFullName(databaseName+"."+tableName);

return ti;

}

private boolean checkFlag(String sql, String flag){

String[] ss = sql.split(" ");

for(String s:ss){

if(s.equals(flag)){

return true;

}

}

return false;

}

private String getTableName(String sql, String flag){

String[] ss = sql.split("\\.");

String tName = null;

if (ss.length > 1) {

String[] strs = ss[1].split(" ");

tName = strs[0];

} else {

String[] strs = sql.split(" ");

boolean start = false;

for (String s : strs) {

if (s.indexOf(flag) >= 0) {

start = true;

continue;

}

if (start && !s.isEmpty()) {

tName = s;

break;

}

}

}

tName.replaceAll("`", "").replaceAll(";", "");

//del "("[create table person(....]

int index = tName.indexOf('(');

if(index>0){

tName = tName.substring(0, index);

}

return tName;

}

}

上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

private String databaseName;

private String tableName;

private String fullName;

// 省略Getter和Setter

@Override

public boolean equals(Object o){

if(this == o)

return true;

if(o == null || this.getClass()!=o.getClass())

return false;

TableInfo tableInfo = (TableInfo)o;

if(!this.databaseName.equals(tableInfo.getDatabaseName()))

return false;

if(!this.tableName.equals(tableInfo.getTableName()))

return false;

if(!this.fullName.equals(tableInfo.getFullName()))

return false;

return true;

}

@Override

public int hashCode(){

int result = this.tableName.hashCode();

result = 31*result+this.databaseName.hashCode();

result = 31*result+this.fullName.hashCode();

return result;

}

}

接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

private static Map tabledIdMap = new ConcurrentHashMap<>();

private static Map> columnsMap = new ConcurrentHashMap<>();

static{

columnsMap = MysqlConnection.getColumns();

}

public static void saveTableIdMap(TableMapEvent tme){

long tableId = tme.getTableId();

tabledIdMap.remove(tableId);

TableInfo table = new TableInfo();

table.setDatabaseName(tme.getDatabaseName().toString());

table.setTableName(tme.getTableName().toString());

table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());

tabledIdMap.put(tableId, table);

}

public static synchronized void refreshColumnsMap(){

Map> map = MysqlConnection.getColumns();

if(map.size()>0){

// logger.warn("refresh and clear cols.");

columnsMap = map;

// logger.warn("refresh and switch cols:{}",map);

}

else

{

logger.error("refresh columnsMap error.");

}

}

public static TableInfo getTableInfo(long tableId){

return tabledIdMap.get(tableId);

}

public static List getColumns(String fullName){

return columnsMap.get(fullName);

}

}

正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;

import java.sql.Connection;

import java.sql.DatabaseMetaData;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.model.BinlogInfo;

import or.model.BinlogMasterStatus;

import or.model.ColumnInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MysqlConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private static Connection conn;

private static String host;

private static int port;

private static String user;

private static String password;

public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){

try {

if(conn == null || conn.isClosed()){

Class.forName("com.mysql.jdbc.Driver");

host = hostArg;

port = portArg;

user = userArg;

password = passwordArg;

conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);

logger.info("connected to mysql:{} : {}",user,password);

}

} catch (ClassNotFoundException e) {

logger.error(e.getMessage(),e);

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

public static Connection getConnection(){

try {

if(conn == null || conn.isClosed()){

setConnection(host,port,user,password);

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return conn;

}

/**

* 获取Column信息

*

*@return

*/

public static Map> getColumns(){

Map> cols = new HashMap<>();

Connection conn = getConnection();

try {

DatabaseMetaData metaData = conn.getMetaData();

ResultSet r = metaData.getCatalogs();

String tableType[] = {"TABLE"};

while(r.next()){

String databaseName = r.getString("TABLE_CAT");

ResultSet result = metaData.getTables(databaseName, null, null, tableType);

while(result.next()){

String tableName = result.getString("TABLE_NAME");

// System.out.println(result.getInt("TABLE_ID"));

String key = databaseName +"."+tableName;

ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);

cols.put(key, new ArrayList());

while(colSet.next()){

ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));

cols.get(key).add(columnInfo);

}

}

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return cols;

}

/**

* 参考

* mysql> show binary logs

* +------------------+-----------+

* | Log_name | File_size |

* +------------------+-----------+

* | mysql-bin.000001 | 126 |

* | mysql-bin.000002 | 126 |

* | mysql-bin.000003 | 6819 |

* | mysql-bin.000004 | 1868 |

* +------------------+-----------+

*/

public static List getBinlogInfo(){

List binlogList = new ArrayList<>();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show binary logs");

while(resultSet.next()){

BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));

binlogList.add(binlogInfo);

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogList;

}

/**

* 参考:

* mysql> show master status;

* +------------------+----------+--------------+------------------+

* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |

* +------------------+----------+--------------+------------------+

* | mysql-bin.000004 | 1868 | | |

* +------------------+----------+--------------+------------------+

*@return

*/

public static BinlogMasterStatus getBinlogMasterStatus(){

BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show master status");

while(resultSet.next()){

binlogMasterStatus.setBinlogName(resultSet.getString("File"));

binlogMasterStatus.setPosition(resultSet.getLong("Position"));

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogMasterStatus;

}

/**

* 获取open-replicator所连接的mysql服务器的serverid信息

*@return

*/

public static int getServerId(){

int serverId=6789;

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show variables like 'server_id'");

while(resultSet.next()){

serverId = resultSet.getInt("Value");

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return serverId;

}

}

上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;

public class BinlogInfo {

private String binlogName;

private Long fileSize;

// 省略Getter和Setter

}

package or.model;

public class BinlogMasterStatus {

private String binlogName;

private long position;

// 省略Getter和Setter

}

package or.model;

public class ColumnInfo {

private String name;

private String type;

// 省略Getter和Setter

}

最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;

import java.util.concurrent.ConcurrentLinkedDeque;

import or.CDCEvent;

public class CDCEventManager {

public static final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>();

}

所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import or.CDCEvent;

import or.InstanceListener;

import or.MysqlConnection;

import or.OpenReplicatorPlus;

import or.manager.CDCEventManager;

import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.google.gson.JsonElement;

import com.google.gson.JsonParser;

public class OpenReplicatorTest {

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);

private static final String host = "10.198.197.60";

private static final int port = 3306;

private static final String user = "****";

private static final String password = "****";

public static void main(String[] args){

OpenReplicator or = new OpenReplicator ();

or.setUser(user);

or.setPassword(password);

or.setHost(host);

or.setPort(port);

MysqlConnection.setConnection(host, port, user, password);

// or.setServerId(MysqlConnection.getServerId());

//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();

or.setBinlogFileName(bms.getBinlogName());

// or.setBinlogFileName("mysql-bin.000004");

or.setBinlogPosition(4);

or.setBinlogEventListener(new InstanceListener());

try {

or.start();

} catch (Exception e) {

logger.error(e.getMessage(),e);

}

Thread thread = new Thread(new PrintCDCEvent());

thread.start();

}

public static class PrintCDCEvent implements Runnable{

@Override

public void run() {

while(true){

if(CDCEventManager.queue.isEmpty() == false)

{

CDCEvent ce = CDCEventManager.queue.pollFirst();

Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

String prettyStr1 = gson.toJson(ce);

System.out.println(prettyStr1);

}

else{

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

}

时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog

java.io.EOFException: null

at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]

16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from 10.198.197.60:3306

初步解决方案(extends OpenReplicator然后添加重试机制):

package or;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);

private volatile boolean autoRestart = true;

@Override

public void stopQuietly(long timeout, TimeUnit unit){

super.stopQuietly(timeout, unit);

if(autoRestart){

try {

TimeUnit.SECONDS.sleep(10);

logger.error("Restart OpenReplicator");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/298383.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

雷军:有人说我写的代码像诗一样优雅~

全世界只有3.14 % 的人关注了爆炸吧知识整合整理&#xff1a;程序员的那些事&#xff08;id&#xff1a;iProgrammer&#xff09;雷军的代码像诗一样优雅↓↓↓有些网友在评论中质疑&#xff0c;说雷军代码不会是「屎」一样优雅吧。说这话的网友&#xff0c;也许是开玩笑的&…

mysql分析日志_MYSQL 索引(三)--- SQL日志分析

慢查询日志Mysql 的慢查询日志是 Myql 提供的一种日志记录&#xff0c;用来记录在 Myql 中响应时间查过阈值的语句&#xff0c;具体指运行时间超过 long_query_time 值的 SQL&#xff0c;则会被记录在日志中。long_query_time 默认为 10&#xff0c;单位为秒。默认情况下&#…

C#多线程开发-任务并行库

你好&#xff0c;我是阿辉。正文共2090字&#xff0c;预计阅读时间&#xff1a;6min。之前学习了线程池&#xff0c;知道了它有很多好处。使用线程池可以使我们在减少并行度花销时节省操作系统资源。可认为线程池是一个抽象层&#xff0c;其向程序员隐藏了使用线程的细节&#…

为什么数学不好,和语文有关系?

▲ 点击查看苏步青教授在担任复旦大学校长时曾经说过:“如果允许复旦大学单独招生考试&#xff0c;我的意见是第一堂课就考语文&#xff0c;考后就批卷子。不合格的&#xff0c;以下的功课就不要考了。语文你都不行&#xff0c;别的是学不通的。”苏步青作为享誉世界的数学家&a…

python string length_如何使用python获取字符串长度?哪些方法?

掌握多种python技巧&#xff0c;对于我们更好的灵活应用python是非常重要的&#xff0c;比如接下来给大家介绍的获取字节长度&#xff0c;那大家脑海里就该有印象了&#xff0c;有几种方法呢&#xff1f;一起来看下吧~1、使用len()函数这是最直接的方法。 在这里&#xff0c;我…

python turtle 绘图_谈一下Pycharm中关联系统Python解释器的方法

大家知道&#xff0c;PyCharm是一款著名的Python IDE开发工具&#xff0c;是拥有一整套可以帮助用户在使用Python语言开发时提高其效率的工具&#xff0c;具备基本的调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制等等。该IDE分为社区免费版(…

python中线程和进程_python中线程和进程的简单了解

一、操作系统、应用程序1.硬件&#xff1a;硬盘、cpu、主板、显卡........2.装系统(本身也是一个软件)&#xff1a;系统就是一个由程序员写出来的软件&#xff0c;该软件用于控制计算机得硬盘&#xff0c;让他们之间进行互相配合。3.安装软件&#xff1a;各种应用程序二、并发和…

一份干货满满的PPT,答辩加分手到擒来!

全世界只有3.14 % 的人关注了爆炸吧知识对很多童鞋来说&#xff0c;PPT可以说是使用频率很高的办公软件了。毕业答辩需要PPT&#xff0c;项目总结需要PPT&#xff0c;演讲也都需要PPT……可你是否因为PPT陷入这样的——脑壳疼状态&#xff01;花费大量时间&#xff0c;结果PPT效…

Blazor+Dapr+K8s微服务之基于WSL安装K8s集群并部署微服务

前面文章已经演示过&#xff0c;将我们的示例微服务程序DaprTest1部署到k8s上并运行。当时用的k8s是Docker for desktop 自带的k8s&#xff0c;只要在Docker for desktop中启用就可以了。但是我发现&#xff0c;启用了k8s后&#xff0c;Docker for desktop会消耗大量的系统资源…

记一次 .NET 某新能源汽车锂电池检测程序 UI挂死分析

一&#xff1a;背景 1. 讲故事这世间事说来也奇怪&#xff0c;近两个月有三位朋友找到我&#xff0c;让我帮忙分析下他的程序hangon现象&#xff0c;这三个dump分别涉及&#xff1a;医疗&#xff0c;新能源&#xff0c;POS系统。截图如下&#xff1a;那这篇为什么要拿其中的 新…

这个女生躲在衣柜等男友回家,结果竟是......

1 不要什么都怪爸爸我妈明明也是这样帮我脱的▼2 这就是现实版大女主反杀女二的故事啊▼3 我猜到了开头却没有猜到结尾▼4 &#xff1f;&#xff1f;&#xff1f;一定是我被饿出了幻觉▼5 小小的孩子在极短的时间内经历了人生的大戏和大悲▼6 这个令人羡慕的发量啊&#…

mysql savepoint作用_savepoint原理

保存点在MySQL中, 保存点SAVEPOINT属于事务控制处理部分。利用SAVEPOINT可以回滚指定部分事务&#xff0c;从而使事务处理更加灵活和精细。SAVEPOINT相关的SQL语句如下SAVEPOINT identifier设置SAVEPOINT。如果重复设置同名savepoint,新的会覆盖老的.RELEASE SAVEPOINT identif…

路由器笔记 CCNA

配置路由器应用性<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />1配置端口1&#xff09; 配置局域网端口&#xff1a;进入&#xff08;config-if&#xff09;#模式 配置ip地址和掩码 &#xff08;config-if&#xff09;#ip a…

图片裁剪和异步上传插件--一步到位(记录)

图片上传裁剪这功能随处可见&#xff0c;有的自己写&#xff0c;不过太耗费时间了&#xff0c;插件的话感觉好多&#xff0c;前段时间就想挑一款好的插件&#xff0c;以后就用那款&#xff0c;可是挑了几款插件用上去&#xff0c;效果很好&#xff0c;问题就出在合并了&#xf…

git checkout 单个文件_git 如何回退单个文件

1.进入到文件所在文件目录&#xff0c;或者能找到文件的路径查看文件的修改记录git log fileName结果&#xff1a;如果文件修改记录太多&#xff0c;则使用git log -number fileName结果&#xff1a;2.回退到指定版本git reset ** fileName1.git reset -mixed&#xff1a;此为默…

未来十年最吃香专业大盘点,有你的吗?

全世界只有3.14 % 的人关注了爆炸吧知识这个世界上理工科到底有多吃香&#xff1f;如果你是理工男/女&#xff0c;先恭喜一下你&#xff0c;不知道喜从何来&#xff1f;不妨先来看这几组数据&#xff1a;高薪职业平均薪资一览表薪资最高的专业前20名这并不只是中国的情况&#…

mysql挂载到iscsi_corosync+pacemaker+iscsi磁盘实现mysql高可用

实验用主机&#xff1a;提供iscsi磁盘:172.16.103.1,提供iscsi磁盘高可用主机:172.16.103.2&#xff0c;172.16.103.3实验拓扑&#xff1a;实验步骤&#xff1a;一、配置172.16.103.1&#xff0c;输出iscsi磁盘&#xff0c;创建的磁盘分区为/dev/sda3&#xff0c;在实际的使用环…

一种在未来互联网中的面向用户的云操作系统体系

本文描述了一个欲将整个网络当做计算机使用&#xff0c;可以极大增强云计算应用能力和提高用户体验的一个类操作系统体系结构。望有识之士能做下去。 价值1&#xff09;用户的痛苦在哪里&#xff1f;对于终端用户&#xff0c;他们的痛苦在于使用Web应用不方便&#xff0c;可以使…

提高「搜商」,挣大钱

大家好&#xff0c;我是Z哥。在之前的一篇讲述数据分析的文章《这个时代最重要的技能之一》中提到了这周要和大家聊聊「搜商」的事情。搜商这个词诞生于互联网时代&#xff0c;体现的是一个人利用搜索引擎查找自己所需信息的能力。我觉得在当下这个时代&#xff0c;搜商的重要性…

idea 配置jdk版本_JDK 11 安装过程(同时已安装了JDK 8)以及Intellij IDEA 配置

电脑上已经安装过 JDK 8 版本(C:Javajdk1.8.0_111)。安装好 JDK 11 版本(C:Program FilesJavajdk-11.0.2)之后&#xff0c;目录如下&#xff0c;分别有&#xff1a;安装过程&#xff1a;1. 开始安装 JDK 11&#xff1b;2. 完成安装JDK 11&#xff1b;配置过程&#xff1a;1. 将…