为什么80%的码农都做不了架构师?>>>
##序 tomcat提供了JdbcInterceptor可以用来监控jdbc的执行情况,默认提供了好几个现成的interceptor可以用,SlowQueryReport以及SlowQueryReportJmx就是其中的两个。
##JdbcInterceptor的基本原理
/*** Abstract class that is to be extended for implementations of interceptors.* Everytime an operation is called on the {@link java.sql.Connection} object the* {@link #invoke(Object, Method, Object[])} method on the interceptor will be called.* Interceptors are useful to change or improve behavior of the connection pool.<br>* Interceptors can receive a set of properties. Each sub class is responsible for parsing the properties during runtime when they* are needed or simply override the {@link #setProperties(Map)} method.* Properties arrive in a key-value pair of Strings as they were received through the configuration.* This method is called once per cached connection object when the object is first configured.** @version 1.0*/
public abstract class JdbcInterceptor implements InvocationHandler {/*** {@link java.sql.Connection#close()} method name*/public static final String CLOSE_VAL = "close";/*** {@link Object#toString()} method name*/public static final String TOSTRING_VAL = "toString";/*** {@link java.sql.Connection#isClosed()} method name*/public static final String ISCLOSED_VAL = "isClosed";/*** {@link javax.sql.PooledConnection#getConnection()} method name*/public static final String GETCONNECTION_VAL = "getConnection";/*** {@link java.sql.Wrapper#unwrap(Class)} method name*/public static final String UNWRAP_VAL = "unwrap";/*** {@link java.sql.Wrapper#isWrapperFor(Class)} method name*/public static final String ISWRAPPERFOR_VAL = "isWrapperFor";/*** {@link java.sql.Connection#isValid(int)} method name*/public static final String ISVALID_VAL = "isValid";/*** {@link java.lang.Object#equals(Object)}*/public static final String EQUALS_VAL = "equals";/*** {@link java.lang.Object#hashCode()}*/public static final String HASHCODE_VAL = "hashCode";/*** Properties for this interceptor.*/protected Map<String,InterceptorProperty> properties = null;/*** The next interceptor in the chain*/private volatile JdbcInterceptor next = null;/*** Property that decides how we do string comparison, default is to use* {@link String#equals(Object)}. If set to <code>false</code> then the* equality operator (==) is used.*/private boolean useEquals = true;/*** Public constructor for instantiation through reflection*/public JdbcInterceptor() {// NOOP}/*** Gets invoked each time an operation on {@link java.sql.Connection} is invoked.* {@inheritDoc}*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (getNext()!=null) return getNext().invoke(proxy,method,args);else throw new NullPointerException();}/*** Returns the next interceptor in the chain* @return the next interceptor in the chain*/public JdbcInterceptor getNext() {return next;}/*** configures the next interceptor in the chain* @param next The next chain item*/public void setNext(JdbcInterceptor next) {this.next = next;}/*** Performs a string comparison, using references unless the useEquals property is set to true.* @param name1 The first name* @param name2 The second name* @return true if name1 is equal to name2 based on {@link #useEquals}*/public boolean compare(String name1, String name2) {if (isUseEquals()) {return name1.equals(name2);} else {return name1==name2;}}/*** Compares a method name (String) to a method (Method)* {@link #compare(String,String)}* Uses reference comparison unless the useEquals property is set to true* @param methodName The method name* @param method The method* @return <code>true</code> if the name matches*/public boolean compare(String methodName, Method method) {return compare(methodName, method.getName());}/*** Gets called each time the connection is borrowed from the pool* This means that if an interceptor holds a reference to the connection* the interceptor can be reused for another connection.* <br>* This method may be called with null as both arguments when we are closing down the connection.* @param parent - the connection pool owning the connection* @param con - the pooled connection*/public abstract void reset(ConnectionPool parent, PooledConnection con);/*** Called when {@link java.sql.Connection#close()} is called on the underlying connection.* This is to notify the interceptors, that the physical connection has been released.* Implementation of this method should be thought through with care, as no actions should trigger an exception.* @param parent - the connection pool that this connection belongs to* @param con - the pooled connection that holds this connection* @param finalizing - if this connection is finalizing. True means that the pooled connection will not reconnect the underlying connection*/public void disconnected(ConnectionPool parent, PooledConnection con, boolean finalizing) {}/*** Returns the properties configured for this interceptor* @return the configured properties for this interceptor*/public Map<String,InterceptorProperty> getProperties() {return properties;}/*** Called during the creation of an interceptor* The properties can be set during the configuration of an interceptor* Override this method to perform type casts between string values and object properties* @param properties The properties*/public void setProperties(Map<String,InterceptorProperty> properties) {this.properties = properties;final String useEquals = "useEquals";InterceptorProperty p = properties.get(useEquals);if (p!=null) {setUseEquals(Boolean.parseBoolean(p.getValue()));}}/*** @return true if the compare method uses the Object.equals(Object) method* false if comparison is done on a reference level*/public boolean isUseEquals() {return useEquals;}/*** Set to true if string comparisons (for the {@link #compare(String, Method)} and {@link #compare(String, String)} methods) should use the Object.equals(Object) method* The default is false* @param useEquals <code>true</code> if equals will be used for comparisons*/public void setUseEquals(boolean useEquals) {this.useEquals = useEquals;}/*** This method is invoked by a connection pool when the pool is closed.* Interceptor classes can override this method if they keep static* variables or other tracking means around.* <b>This method is only invoked on a single instance of the interceptor, and not on every instance created.</b>* @param pool - the pool that is being closed.*/public void poolClosed(ConnectionPool pool) {// NOOP}/*** This method is invoked by a connection pool when the pool is first started up, usually when the first connection is requested.* Interceptor classes can override this method if they keep static* variables or other tracking means around.* <b>This method is only invoked on a single instance of the interceptor, and not on every instance created.</b>* @param pool - the pool that is being closed.*/public void poolStarted(ConnectionPool pool) {// NOOP}}
可以看到它实现了InvocationHandler这个接口,也就是使用的是java内置的动态代理技术,主要是因为jdbc本身就是面向接口编程的,因而用java内置的动态代理是水到渠成的。
##ConnectionPool tomcat-jdbc-8.5.11-sources.jar!/org/apache/tomcat/jdbc/pool/ConnectionPool.java
/*** configures a pooled connection as a proxy.* This Proxy implements {@link java.sql.Connection} and {@link javax.sql.PooledConnection} interfaces.* All calls on {@link java.sql.Connection} methods will be propagated down to the actual JDBC connection except for the* {@link java.sql.Connection#close()} method.* @param con a {@link PooledConnection} to wrap in a Proxy* @return a {@link java.sql.Connection} object wrapping a pooled connection.* @throws SQLException if an interceptor can't be configured, if the proxy can't be instantiated*/protected Connection setupConnection(PooledConnection con) throws SQLException {//fetch previously cached interceptor proxy - one per connectionJdbcInterceptor handler = con.getHandler();if (handler==null) {//build the proxy handlerhandler = new ProxyConnection(this,con,getPoolProperties().isUseEquals());//set up the interceptor chainPoolProperties.InterceptorDefinition[] proxies = getPoolProperties().getJdbcInterceptorsAsArray();for (int i=proxies.length-1; i>=0; i--) {try {//create a new instanceJdbcInterceptor interceptor = proxies[i].getInterceptorClass().newInstance();//configure propertiesinterceptor.setProperties(proxies[i].getProperties());//setup the chaininterceptor.setNext(handler);//call resetinterceptor.reset(this, con);//configure the last one to be held by the connectionhandler = interceptor;}catch(Exception x) {SQLException sx = new SQLException("Unable to instantiate interceptor chain.");sx.initCause(x);throw sx;}}//cache handler for the next iterationcon.setHandler(handler);} else {JdbcInterceptor next = handler;//we have a cached handler, reset itwhile (next!=null) {next.reset(this, con);next = next.getNext();}}try {getProxyConstructor(con.getXAConnection() != null);//create the proxy//TODO possible optimization, keep track if this connection was returned properly, and don't generate a new facadeConnection connection = null;if (getPoolProperties().getUseDisposableConnectionFacade() ) {connection = (Connection)proxyClassConstructor.newInstance(new Object[] { new DisposableConnectionFacade(handler) });} else {connection = (Connection)proxyClassConstructor.newInstance(new Object[] {handler});}//return the connectionreturn connection;}catch (Exception x) {SQLException s = new SQLException();s.initCause(x);throw s;}}
这里判断有没有interceptor,有的话,则创建ProxyConnection,然后构造interceptor的链 ##ProxyConnection
public class ProxyConnection extends JdbcInterceptor {protected PooledConnection connection = null;protected ConnectionPool pool = null;public PooledConnection getConnection() {return connection;}public void setConnection(PooledConnection connection) {this.connection = connection;}public ConnectionPool getPool() {return pool;}public void setPool(ConnectionPool pool) {this.pool = pool;}protected ProxyConnection(ConnectionPool parent, PooledConnection con,boolean useEquals) {pool = parent;connection = con;setUseEquals(useEquals);}@Overridepublic void reset(ConnectionPool parent, PooledConnection con) {this.pool = parent;this.connection = con;}public boolean isWrapperFor(Class<?> iface) {if (iface == XAConnection.class && connection.getXAConnection()!=null) {return true;} else {return (iface.isInstance(connection.getConnection()));}}public Object unwrap(Class<?> iface) throws SQLException {if (iface == PooledConnection.class) {return connection;}else if (iface == XAConnection.class) {return connection.getXAConnection();} else if (isWrapperFor(iface)) {return connection.getConnection();} else {throw new SQLException("Not a wrapper of "+iface.getName());}}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (compare(ISCLOSED_VAL,method)) {return Boolean.valueOf(isClosed());}if (compare(CLOSE_VAL,method)) {if (connection==null) return null; //noop for already closed.PooledConnection poolc = this.connection;this.connection = null;pool.returnConnection(poolc);return null;} else if (compare(TOSTRING_VAL,method)) {return this.toString();} else if (compare(GETCONNECTION_VAL,method) && connection!=null) {return connection.getConnection();} else if (method.getDeclaringClass().equals(XAConnection.class)) {try {return method.invoke(connection.getXAConnection(),args);}catch (Throwable t) {if (t instanceof InvocationTargetException) {throw t.getCause() != null ? t.getCause() : t;} else {throw t;}}}if (isClosed()) throw new SQLException("Connection has already been closed.");if (compare(UNWRAP_VAL,method)) {return unwrap((Class<?>)args[0]);} else if (compare(ISWRAPPERFOR_VAL,method)) {return Boolean.valueOf(this.isWrapperFor((Class<?>)args[0]));}try {PooledConnection poolc = connection;if (poolc!=null) {return method.invoke(poolc.getConnection(),args);} else {throw new SQLException("Connection has already been closed.");}}catch (Throwable t) {if (t instanceof InvocationTargetException) {throw t.getCause() != null ? t.getCause() : t;} else {throw t;}}}public boolean isClosed() {return connection==null || connection.isDiscarded();}public PooledConnection getDelegateConnection() {return connection;}public ConnectionPool getParentPool() {return pool;}@Overridepublic String toString() {return "ProxyConnection["+(connection!=null?connection.toString():"null")+"]";}}
ProxyConnection本身就是JdbcInterceptor,包装了PooledConnection
##AbstractCreateStatementInterceptor 这个是JdbcInterceptor的一个比较重要的扩展,SlowQueryReport就是基于这个扩展的。这个定义了一些抽象方法供子类实现。
/*** Abstraction interceptor. This component intercepts all calls to create some type of SQL statement.* By extending this class, one can intercept queries and update statements by overriding the {@link #createStatement(Object, Method, Object[], Object, long)}* method.* @version 1.0*/
public abstract class AbstractCreateStatementInterceptor extends JdbcInterceptor {protected static final String CREATE_STATEMENT = "createStatement";protected static final int CREATE_STATEMENT_IDX = 0;protected static final String PREPARE_STATEMENT = "prepareStatement";protected static final int PREPARE_STATEMENT_IDX = 1;protected static final String PREPARE_CALL = "prepareCall";protected static final int PREPARE_CALL_IDX = 2;protected static final String[] STATEMENT_TYPES = {CREATE_STATEMENT, PREPARE_STATEMENT, PREPARE_CALL};protected static final int STATEMENT_TYPE_COUNT = STATEMENT_TYPES.length;protected static final String EXECUTE = "execute";protected static final String EXECUTE_QUERY = "executeQuery";protected static final String EXECUTE_UPDATE = "executeUpdate";protected static final String EXECUTE_BATCH = "executeBatch";protected static final String[] EXECUTE_TYPES = {EXECUTE, EXECUTE_QUERY, EXECUTE_UPDATE, EXECUTE_BATCH};public AbstractCreateStatementInterceptor() {super();}/*** {@inheritDoc}*/@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {if (compare(CLOSE_VAL,method)) {closeInvoked();return super.invoke(proxy, method, args);} else {boolean process = false;process = isStatement(method, process);if (process) {long start = System.currentTimeMillis();Object statement = super.invoke(proxy,method,args);long delta = System.currentTimeMillis() - start;return createStatement(proxy,method,args,statement, delta);} else {return super.invoke(proxy,method,args);}}}/*** This method will be invoked after a successful statement creation. This method can choose to return a wrapper* around the statement or return the statement itself.* If this method returns a wrapper then it should return a wrapper object that implements one of the following interfaces.* {@link java.sql.Statement}, {@link java.sql.PreparedStatement} or {@link java.sql.CallableStatement}* @param proxy the actual proxy object* @param method the method that was called. It will be one of the methods defined in {@link #STATEMENT_TYPES}* @param args the arguments to the method* @param statement the statement that the underlying connection created* @param time Elapsed time* @return a {@link java.sql.Statement} object*/public abstract Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time);/*** Method invoked when the operation {@link java.sql.Connection#close()} is invoked.*/public abstract void closeInvoked();/*** Returns true if the method that is being invoked matches one of the statement types.** @param method the method being invoked on the proxy* @param process boolean result used for recursion* @return returns true if the method name matched*/protected boolean isStatement(Method method, boolean process){return process(STATEMENT_TYPES, method, process);}/*** Returns true if the method that is being invoked matches one of the execute types.** @param method the method being invoked on the proxy* @param process boolean result used for recursion* @return returns true if the method name matched*/protected boolean isExecute(Method method, boolean process){return process(EXECUTE_TYPES, method, process);}/** Returns true if the method that is being invoked matches one of the method names passed in* @param names list of method names that we want to intercept* @param method the method being invoked on the proxy* @param process boolean result used for recursion* @return returns true if the method name matched*/protected boolean process(String[] names, Method method, boolean process) {final String name = method.getName();for (int i=0; (!process) && i<names.length; i++) {process = compare(names[i],name);}return process;}/*** no-op for this interceptor. no state is stored.*/@Overridepublic void reset(ConnectionPool parent, PooledConnection con) {// NOOP}
}
##AbstractQueryReport 主要实现了createStatement方法:
/*** Creates a statement interceptor to monitor query response times*/@Overridepublic Object createStatement(Object proxy, Method method, Object[] args, Object statement, long time) {try {Object result = null;String name = method.getName();String sql = null;Constructor<?> constructor = null;if (compare(CREATE_STATEMENT,name)) {//createStatementconstructor = getConstructor(CREATE_STATEMENT_IDX,Statement.class);}else if (compare(PREPARE_STATEMENT,name)) {//prepareStatementsql = (String)args[0];constructor = getConstructor(PREPARE_STATEMENT_IDX,PreparedStatement.class);if (sql!=null) {prepareStatement(sql, time);}}else if (compare(PREPARE_CALL,name)) {//prepareCallsql = (String)args[0];constructor = getConstructor(PREPARE_CALL_IDX,CallableStatement.class);prepareCall(sql,time);}else {//do nothing, might be a future unsupported method//so we better bail out and let the system continuereturn statement;}result = constructor.newInstance(new Object[] { new StatementProxy(statement,sql) });return result;}catch (Exception x) {log.warn("Unable to create statement proxy for slow query report.",x);}return statement;}
这里同样适用了jdk的动态代理,包装了statement
/*** Class to measure query execute time**/protected class StatementProxy implements InvocationHandler {protected boolean closed = false;protected Object delegate;protected final String query;public StatementProxy(Object parent, String query) {this.delegate = parent;this.query = query;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//get the name of the method for comparisonfinal String name = method.getName();//was close invoked?boolean close = compare(JdbcInterceptor.CLOSE_VAL,name);//allow close to be called multiple timesif (close && closed) return null;//are we calling isClosed?if (compare(JdbcInterceptor.ISCLOSED_VAL,name)) return Boolean.valueOf(closed);//if we are calling anything else, bail outif (closed) throw new SQLException("Statement closed.");boolean process = false;//check to see if we are about to execute a queryprocess = isExecute( method, process);//if we are executing, get the current timelong start = (process)?System.currentTimeMillis():0;Object result = null;try {//execute the queryresult = method.invoke(delegate,args);}catch (Throwable t) {reportFailedQuery(query,args,name,start,t);if (t instanceof InvocationTargetException&& t.getCause() != null) {throw t.getCause();} else {throw t;}}//measure the timelong delta = (process)?(System.currentTimeMillis()-start):Long.MIN_VALUE;//see if we meet the requirements to measureif (delta>threshold) {try {//report the slow queryreportSlowQuery(query, args, name, start, delta);}catch (Exception t) {if (log.isWarnEnabled()) log.warn("Unable to process slow query",t);}} else if (process) {reportQuery(query, args, name, start, delta);}//perform close cleanupif (close) {closed=true;delegate = null;}return result;}}
这里记录了sql的执行耗时,然后跟阈值对比,判断是否记录到slow query