opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

0cc2691b8bdc8d27dd8090a59a7ed182.png

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{

"eventId": 1,

"databaseName": "canal_test",

"tableName": "`company`",

"eventType": 2,

"timestamp": 1477033198000,

"timestampReceipt": 1477033248780,

"binlogName": "mysql-bin.000006",

"position": 353,

"nextPostion": 468,

"serverId": 2,

"before": null,

"after": null,

"isDdl": true,

"sql": "DROP TABLE `company` /* generated by server */"}

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{

"eventId": 0,

"databaseName": "canal_test",

"tableName": "person",

"eventType": 24,

"timestamp": 1477030734000,

"timestampReceipt": 1477032161988,

"binlogName": "mysql-bin.000006",

"position": 242,

"nextPostion": 326,

"serverId": 2,

"before": {

"id": "3",

"sex": "f",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"after": {

"id": "3",

"sex": "m",

"address": "shanghai",

"age": "23",

"name": "zzh3"},

"isDdl": false,

"sql": null}

相关的类文件如下:

CDCEvent.java

package or;

import java.util.Map;

import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.BinlogEventV4Header;

import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {

private long eventId = 0;//事件唯一标识

private String databaseName = null;

private String tableName = null;

private int eventType = 0;//事件类型

private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]

private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]

private String binlogName = null;// binlog file name

private long position = 0;

private long nextPostion = 0;

private long serverId = 0;

private Map before = null;

private Map after = null;

private Boolean isDdl= null;

private String sql = null;

private static AtomicLong uuid = new AtomicLong(0);

public CDCEvent(){}

public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){

this.init(are);

this.databaseName = databaseName;

this.tableName = tableName;

}

private void init(final BinlogEventV4 be){

this.eventId = uuid.getAndAdd(1);

BinlogEventV4Header header = be.getHeader();

this.timestamp = header.getTimestamp();

this.eventType = header.getEventType();

this.serverId = header.getServerId();

this.timestampReceipt = header.getTimestampOfReceipt();

this.position = header.getPosition();

this.nextPostion = header.getNextPosition();

this.binlogName = header.getBinlogFileName();

}

@Override

public String toString(){

StringBuilder builder = new StringBuilder();

builder.append("{ eventId:").append(eventId);

builder.append(",databaseName:").append(databaseName);

builder.append(",tableName:").append(tableName);

builder.append(",eventType:").append(eventType);

builder.append(",timestamp:").append(timestamp);

builder.append(",timestampReceipt:").append(timestampReceipt);

builder.append(",binlogName:").append(binlogName);

builder.append(",position:").append(position);

builder.append(",nextPostion:").append(nextPostion);

builder.append(",serverId:").append(serverId);

builder.append(",isDdl:").append(isDdl);

builder.append(",sql:").append(sql);

builder.append(",before:").append(before);

builder.append(",after:").append(after).append("}");

return builder.toString();

}

// 省略Getter和Setter方法

}

open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:

InstanceListener.java

package or;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.keeper.TableInfoKeeper;

import or.manager.CDCEventManager;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.BinlogEventListener;

import com.google.code.or.binlog.BinlogEventV4;

import com.google.code.or.binlog.impl.event.DeleteRowsEvent;

import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;

import com.google.code.or.binlog.impl.event.QueryEvent;

import com.google.code.or.binlog.impl.event.TableMapEvent;

import com.google.code.or.binlog.impl.event.UpdateRowsEvent;

import com.google.code.or.binlog.impl.event.WriteRowsEvent;

import com.google.code.or.binlog.impl.event.XidEvent;

import com.google.code.or.common.glossary.Column;

import com.google.code.or.common.glossary.Pair;

import com.google.code.or.common.glossary.Row;

import com.google.code.or.common.util.MySQLConstants;

public class InstanceListener implements BinlogEventListener{

private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);

@Override

public void onEvents(BinlogEventV4 be) {

if(be == null){

logger.error("binlog event is null");

return;

}

int eventType = be.getHeader().getEventType();

switch(eventType){

case MySQLConstants.FORMAT_DESCRIPTION_EVENT:

{

logger.trace("FORMAT_DESCRIPTION_EVENT");

break;

}

case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId

{

TableMapEvent tme = (TableMapEvent)be;

TableInfoKeeper.saveTableIdMap(tme);

logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());

break;

}

case MySQLConstants.DELETE_ROWS_EVENT:

{

DeleteRowsEvent dre = (DeleteRowsEvent) be;

long tableId = dre.getTableId();

logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = dre.getRows();

for(Row row:rows){

List before = row.getColumns();

Map beforeMap = getMap(before,databaseName,tableName);

if(beforeMap !=null && beforeMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.UPDATE_ROWS_EVENT:

{

UpdateRowsEvent upe = (UpdateRowsEvent)be;

long tableId = upe.getTableId();

logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List> rows = upe.getRows();

for(Pair p:rows){

List colsBefore = p.getBefore().getColumns();

List colsAfter = p.getAfter().getColumns();

Map beforeMap = getMap(colsBefore,databaseName,tableName);

Map afterMap = getMap(colsAfter,databaseName,tableName);

if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setBefore(beforeMap);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.WRITE_ROWS_EVENT:

{

WriteRowsEvent wre = (WriteRowsEvent)be;

long tableId = wre.getTableId();

logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

List rows = wre.getRows();

for(Row row: rows){

List after = row.getColumns();

Map afterMap = getMap(after,databaseName,tableName);

if(afterMap!=null && afterMap.size()>0){

CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);

cdcEvent.setIsDdl(false);

cdcEvent.setSql(null);

cdcEvent.setAfter(afterMap);

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

}

}

break;

}

case MySQLConstants.QUERY_EVENT:

{

QueryEvent qe = (QueryEvent)be;

TableInfo tableInfo = createTableInfo(qe);

if(tableInfo == null)

break;

String databaseName = tableInfo.getDatabaseName();

String tableName = tableInfo.getTableName();

logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);

cdcEvent.setIsDdl(true);

cdcEvent.setSql(qe.getSql().toString());

CDCEventManager.queue.addLast(cdcEvent);

logger.info("cdcEvent:{}",cdcEvent);

break;

}

case MySQLConstants.XID_EVENT:{

XidEvent xe = (XidEvent)be;

logger.trace("XID_EVENT: xid:{}",xe.getXid());

break;

}

default:

{

logger.trace("DEFAULT:{}",eventType);

break;

}

}

}

/**

* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,

* 然后跟取回的List进行映射。

*

* @param cols

* @param databaseName

* @param tableName

* @return

*/

private Map getMap(List cols, String databaseName, String tableName){

Map map = new HashMap<>();

if(cols == null || cols.size()==0){

return null;

}

String fullName = databaseName+"."+tableName;

List columnInfoList = TableInfoKeeper.getColumns(fullName);

if(columnInfoList == null)

return null;

if(columnInfoList.size() != cols.size()){

TableInfoKeeper.refreshColumnsMap();

if(columnInfoList.size() != cols.size())

{

logger.warn("columnInfoList.size is not equal to cols.");

return null;

}

}

for(int i=0;i

if(cols.get(i).getValue()==null)

map.put(columnInfoList.get(i).getName(),"");

else

map.put(columnInfoList.get(i).getName(), cols.get(i).toString());

}

return map;

}

/**

* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,

* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中

*

* @param qe

* @return

*/

private TableInfo createTableInfo(QueryEvent qe){

String sql = qe.getSql().toString().toLowerCase();

TableInfo ti = new TableInfo();

String databaseName = qe.getDatabaseName().toString();

String tableName = null;

if(checkFlag(sql,"table")){

tableName = getTableName(sql,"table");

} else if(checkFlag(sql,"truncate")){

tableName = getTableName(sql,"truncate");

} else{

logger.warn("can not find table name from sql:{}",sql);

return null;

}

ti.setDatabaseName(databaseName);

ti.setTableName(tableName);

ti.setFullName(databaseName+"."+tableName);

return ti;

}

private boolean checkFlag(String sql, String flag){

String[] ss = sql.split(" ");

for(String s:ss){

if(s.equals(flag)){

return true;

}

}

return false;

}

private String getTableName(String sql, String flag){

String[] ss = sql.split("\\.");

String tName = null;

if (ss.length > 1) {

String[] strs = ss[1].split(" ");

tName = strs[0];

} else {

String[] strs = sql.split(" ");

boolean start = false;

for (String s : strs) {

if (s.indexOf(flag) >= 0) {

start = true;

continue;

}

if (start && !s.isEmpty()) {

tName = s;

break;

}

}

}

tName.replaceAll("`", "").replaceAll(";", "");

//del "("[create table person(....]

int index = tName.indexOf('(');

if(index>0){

tName = tName.substring(0, index);

}

return tName;

}

}

上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

private String databaseName;

private String tableName;

private String fullName;

// 省略Getter和Setter

@Override

public boolean equals(Object o){

if(this == o)

return true;

if(o == null || this.getClass()!=o.getClass())

return false;

TableInfo tableInfo = (TableInfo)o;

if(!this.databaseName.equals(tableInfo.getDatabaseName()))

return false;

if(!this.tableName.equals(tableInfo.getTableName()))

return false;

if(!this.fullName.equals(tableInfo.getFullName()))

return false;

return true;

}

@Override

public int hashCode(){

int result = this.tableName.hashCode();

result = 31*result+this.databaseName.hashCode();

result = 31*result+this.fullName.hashCode();

return result;

}

}

接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;

import or.model.ColumnInfo;

import or.model.TableInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

private static Map tabledIdMap = new ConcurrentHashMap<>();

private static Map> columnsMap = new ConcurrentHashMap<>();

static{

columnsMap = MysqlConnection.getColumns();

}

public static void saveTableIdMap(TableMapEvent tme){

long tableId = tme.getTableId();

tabledIdMap.remove(tableId);

TableInfo table = new TableInfo();

table.setDatabaseName(tme.getDatabaseName().toString());

table.setTableName(tme.getTableName().toString());

table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());

tabledIdMap.put(tableId, table);

}

public static synchronized void refreshColumnsMap(){

Map> map = MysqlConnection.getColumns();

if(map.size()>0){

// logger.warn("refresh and clear cols.");

columnsMap = map;

// logger.warn("refresh and switch cols:{}",map);

}

else

{

logger.error("refresh columnsMap error.");

}

}

public static TableInfo getTableInfo(long tableId){

return tabledIdMap.get(tableId);

}

public static List getColumns(String fullName){

return columnsMap.get(fullName);

}

}

正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;

import java.sql.Connection;

import java.sql.DatabaseMetaData;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import or.model.BinlogInfo;

import or.model.BinlogMasterStatus;

import or.model.ColumnInfo;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MysqlConnection {

private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private static Connection conn;

private static String host;

private static int port;

private static String user;

private static String password;

public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){

try {

if(conn == null || conn.isClosed()){

Class.forName("com.mysql.jdbc.Driver");

host = hostArg;

port = portArg;

user = userArg;

password = passwordArg;

conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);

logger.info("connected to mysql:{} : {}",user,password);

}

} catch (ClassNotFoundException e) {

logger.error(e.getMessage(),e);

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

public static Connection getConnection(){

try {

if(conn == null || conn.isClosed()){

setConnection(host,port,user,password);

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return conn;

}

/**

* 获取Column信息

*

*@return

*/

public static Map> getColumns(){

Map> cols = new HashMap<>();

Connection conn = getConnection();

try {

DatabaseMetaData metaData = conn.getMetaData();

ResultSet r = metaData.getCatalogs();

String tableType[] = {"TABLE"};

while(r.next()){

String databaseName = r.getString("TABLE_CAT");

ResultSet result = metaData.getTables(databaseName, null, null, tableType);

while(result.next()){

String tableName = result.getString("TABLE_NAME");

// System.out.println(result.getInt("TABLE_ID"));

String key = databaseName +"."+tableName;

ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);

cols.put(key, new ArrayList());

while(colSet.next()){

ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));

cols.get(key).add(columnInfo);

}

}

}

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

return cols;

}

/**

* 参考

* mysql> show binary logs

* +------------------+-----------+

* | Log_name | File_size |

* +------------------+-----------+

* | mysql-bin.000001 | 126 |

* | mysql-bin.000002 | 126 |

* | mysql-bin.000003 | 6819 |

* | mysql-bin.000004 | 1868 |

* +------------------+-----------+

*/

public static List getBinlogInfo(){

List binlogList = new ArrayList<>();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show binary logs");

while(resultSet.next()){

BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));

binlogList.add(binlogInfo);

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogList;

}

/**

* 参考:

* mysql> show master status;

* +------------------+----------+--------------+------------------+

* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |

* +------------------+----------+--------------+------------------+

* | mysql-bin.000004 | 1868 | | |

* +------------------+----------+--------------+------------------+

*@return

*/

public static BinlogMasterStatus getBinlogMasterStatus(){

BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show master status");

while(resultSet.next()){

binlogMasterStatus.setBinlogName(resultSet.getString("File"));

binlogMasterStatus.setPosition(resultSet.getLong("Position"));

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return binlogMasterStatus;

}

/**

* 获取open-replicator所连接的mysql服务器的serverid信息

*@return

*/

public static int getServerId(){

int serverId=6789;

Connection conn = null;

Statement statement = null;

ResultSet resultSet = null;

try {

conn = getConnection();

statement = conn.createStatement();

resultSet = statement.executeQuery("show variables like 'server_id'");

while(resultSet.next()){

serverId = resultSet.getInt("Value");

}

} catch (Exception e) {

logger.error(e.getMessage(),e);

} finally{

try {

if(resultSet != null)

resultSet.close();

if(statement != null)

statement.close();

if(conn != null)

conn.close();

} catch (SQLException e) {

logger.error(e.getMessage(),e);

}

}

return serverId;

}

}

上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;

public class BinlogInfo {

private String binlogName;

private Long fileSize;

// 省略Getter和Setter

}

package or.model;

public class BinlogMasterStatus {

private String binlogName;

private long position;

// 省略Getter和Setter

}

package or.model;

public class ColumnInfo {

private String name;

private String type;

// 省略Getter和Setter

}

最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;

import java.util.concurrent.ConcurrentLinkedDeque;

import or.CDCEvent;

public class CDCEventManager {

public static final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>();

}

所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import or.CDCEvent;

import or.InstanceListener;

import or.MysqlConnection;

import or.OpenReplicatorPlus;

import or.manager.CDCEventManager;

import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;

import com.google.gson.Gson;

import com.google.gson.GsonBuilder;

import com.google.gson.JsonElement;

import com.google.gson.JsonParser;

public class OpenReplicatorTest {

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);

private static final String host = "10.198.197.60";

private static final int port = 3306;

private static final String user = "****";

private static final String password = "****";

public static void main(String[] args){

OpenReplicator or = new OpenReplicator ();

or.setUser(user);

or.setPassword(password);

or.setHost(host);

or.setPort(port);

MysqlConnection.setConnection(host, port, user, password);

// or.setServerId(MysqlConnection.getServerId());

//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();

or.setBinlogFileName(bms.getBinlogName());

// or.setBinlogFileName("mysql-bin.000004");

or.setBinlogPosition(4);

or.setBinlogEventListener(new InstanceListener());

try {

or.start();

} catch (Exception e) {

logger.error(e.getMessage(),e);

}

Thread thread = new Thread(new PrintCDCEvent());

thread.start();

}

public static class PrintCDCEvent implements Runnable{

@Override

public void run() {

while(true){

if(CDCEventManager.queue.isEmpty() == false)

{

CDCEvent ce = CDCEventManager.queue.pollFirst();

Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();

String prettyStr1 = gson.toJson(ce);

System.out.println(prettyStr1);

}

else{

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

}

时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog

java.io.EOFException: null

at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]

at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]

at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]

16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from 10.198.197.60:3306

初步解决方案(extends OpenReplicator然后添加重试机制):

package or;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{

private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);

private volatile boolean autoRestart = true;

@Override

public void stopQuietly(long timeout, TimeUnit unit){

super.stopQuietly(timeout, unit);

if(autoRestart){

try {

TimeUnit.SECONDS.sleep(10);

logger.error("Restart OpenReplicator");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/298383.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

雷军:有人说我写的代码像诗一样优雅~

全世界只有3.14 % 的人关注了爆炸吧知识整合整理&#xff1a;程序员的那些事&#xff08;id&#xff1a;iProgrammer&#xff09;雷军的代码像诗一样优雅↓↓↓有些网友在评论中质疑&#xff0c;说雷军代码不会是「屎」一样优雅吧。说这话的网友&#xff0c;也许是开玩笑的&…

国外网站评出对程序员最具影响的书籍清单

国外知名网站 stackoverflow 上有一个问题调查&#xff1a; 哪本书是对程序员最有影响、每个程序员都该阅读的书&#xff1f;这个调查已历时两年&#xff0c;目前为止吸引了153,432 人访问&#xff0c;读者共推荐出了 478 本书(还在增加)&#xff0c;其中最火的一本书《Code Co…

python大于小于_在Python中大于/小于Pandas DataFrames / Series之间的比较

如何在DataFrame和Series之间进行比较&#xff1f;我想掩盖DataFrame / Series中比其他DataFrame / Series中的元素更大/更小的元素. 例如,以下内容不会替换大于均值的元素 与nans虽然我期待它&#xff1a; >>> x pd.DataFrame(data{a: [1, 2], b: [3, 4]}) >>…

NodeJS学习笔记

通过js创建个简单的web服务器 var httprequire(http); http.createServer(function(req,res){ res.writeHead(200,{Content-Type:text/html}); res.end("server is up!"); }).listen(8000); console.log(listened on 8000); 推荐学习:Node入门 转载于:https://www.cn…

mysql分析日志_MYSQL 索引(三)--- SQL日志分析

慢查询日志Mysql 的慢查询日志是 Myql 提供的一种日志记录&#xff0c;用来记录在 Myql 中响应时间查过阈值的语句&#xff0c;具体指运行时间超过 long_query_time 值的 SQL&#xff0c;则会被记录在日志中。long_query_time 默认为 10&#xff0c;单位为秒。默认情况下&#…

【转】SMIL基础教程(1)

最近公司项目需要使用到smil相关知识&#xff0c;因而专门学习了一下。在网上找到了几篇基础教程&#xff0c;转载以方便查看。一、 简介随着流技术的成熟和广泛的应用&#xff0c;其优点我们有了深深的体会。但是&#xff0c;其不足之处也逐渐体现出来。问题的出现&#xff0c…

C#多线程开发-任务并行库

你好&#xff0c;我是阿辉。正文共2090字&#xff0c;预计阅读时间&#xff1a;6min。之前学习了线程池&#xff0c;知道了它有很多好处。使用线程池可以使我们在减少并行度花销时节省操作系统资源。可认为线程池是一个抽象层&#xff0c;其向程序员隐藏了使用线程的细节&#…

.cue 文件格式

cue文件格式&#xff08;基础版&#xff09; PERFORMER "陈小春" TITLE "抱一抱" FILE "陈小春.-.[抱一抱].专辑.(ape).ape" WAVE TRACK 01 AUDIO TITLE "抱一抱" INDEX 01 00:00:00 TRACK 02 AUDIO TITLE "我爱的人…

python切片原理_深度解析Python切片

详解Python 切片语法 Python的切片是特别常用的功能&#xff0c;主要用于对列表的元素取值。使用切片也会让你的代码显得特别Pythonic。 切片的主要声明如下&#xff0c;假设现在有一个list&#xff0c;命名为alist&#xff1a; alist [0,1,2,3,4] 切片语法的基本形式为&#…

为什么数学不好,和语文有关系?

▲ 点击查看苏步青教授在担任复旦大学校长时曾经说过:“如果允许复旦大学单独招生考试&#xff0c;我的意见是第一堂课就考语文&#xff0c;考后就批卷子。不合格的&#xff0c;以下的功课就不要考了。语文你都不行&#xff0c;别的是学不通的。”苏步青作为享誉世界的数学家&a…

Docker 博客

Docker 常用命令&#xff1a;首先推荐&#xff1a;http://blog.tankywoo.com/docker/2014/05/08/docker-4-summary.html Docker 网络桥接&#xff1a;http://blog.tankywoo.com/2014/12/22/docker-bridge-network.html docker 大牛的博客&#xff1a;http://blog.csdn.net/smal…

python string length_如何使用python获取字符串长度?哪些方法?

掌握多种python技巧&#xff0c;对于我们更好的灵活应用python是非常重要的&#xff0c;比如接下来给大家介绍的获取字节长度&#xff0c;那大家脑海里就该有印象了&#xff0c;有几种方法呢&#xff1f;一起来看下吧~1、使用len()函数这是最直接的方法。 在这里&#xff0c;我…

二进制、八进制、十进制、十六进制之间转换

一、 十进制与二进制之间的转换 &#xff08;1&#xff09; 十进制转换为二进制&#xff0c;分为整数部分和小数部分 ① 整数部分 方法&#xff1a;除2取余法&#xff0c;即每次将整数部分除以2&#xff0c;余数为该位权上的数&#xff0c;而商继续除以2&#xff0c;余数又为上…

【招聘(北京成都)】北森 招聘.NET 架构师工程师

.net后端架构师 25k-38k14薪工作职责:1.根据业务框架要求&#xff0c;提供技术实现方案&#xff1b;2.负责技术架构选型、并主导功能模块设计、数据结构设计、对外接口设计&#xff1b;3.负责核心技术攻关、系统调优&#xff0c;使系统架构、代码结构不断演进优化&#xff1b;4…

android 网络开发

反复研究了 Android Market&#xff0c;总结一下&#xff0c;之前发在新浪微博上&#xff0c;但不够详细&#xff0c;主要是提高用户体验。 1.网络异常处理&#xff0c;重试机制。 上wifi常常网络断开&#xff0c;那就看运用程序是否健壮。可以用Android 提供的 frameworks/bas…

python turtle 绘图_谈一下Pycharm中关联系统Python解释器的方法

大家知道&#xff0c;PyCharm是一款著名的Python IDE开发工具&#xff0c;是拥有一整套可以帮助用户在使用Python语言开发时提高其效率的工具&#xff0c;具备基本的调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制等等。该IDE分为社区免费版(…

python中线程和进程_python中线程和进程的简单了解

一、操作系统、应用程序1.硬件&#xff1a;硬盘、cpu、主板、显卡........2.装系统(本身也是一个软件)&#xff1a;系统就是一个由程序员写出来的软件&#xff0c;该软件用于控制计算机得硬盘&#xff0c;让他们之间进行互相配合。3.安装软件&#xff1a;各种应用程序二、并发和…

一份干货满满的PPT,答辩加分手到擒来!

全世界只有3.14 % 的人关注了爆炸吧知识对很多童鞋来说&#xff0c;PPT可以说是使用频率很高的办公软件了。毕业答辩需要PPT&#xff0c;项目总结需要PPT&#xff0c;演讲也都需要PPT……可你是否因为PPT陷入这样的——脑壳疼状态&#xff01;花费大量时间&#xff0c;结果PPT效…

《Java程序员全攻略:从小工到专家》连载八:加入什么样的公司

加入什么样的公司 “怎么样&#xff0c;蔡佳娃&#xff1f;听了这么多介绍&#xff0c;心里有点谱了吧&#xff1f;” “嗯&#xff0c;听师兄你这么一说&#xff0c;我想了想&#xff0c;还是优先要追求一下欧美的IT公司。追不到也没关系&#xff0c;至少知道自己不行了&#…

理解浮动元素——怎么使用它们、它们有什么问题以及怎么解决这些问题。

需要浮动的元素可使用CSS中float属性来定义元素的浮动位置&#xff0c;left&#xff1a;往左浮动&#xff0c;right&#xff1a;往右浮动浮动元素引起的问题&#xff1a;&#xff08;1&#xff09;父元素的高度无法被撑开&#xff0c;影响与父元素同级的元素&#xff08;2&…