基于zbus的MySQL透明代理(100行)

项目地址 https://git.oschina.net/rushmore/zbus

我们上次讲到zbus网络通讯的核心API:

Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡

IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等

Session -- 代表网络链接,可以读写消息

实际的应用,我们几乎只需要做IoAdaptor的个性化实现就能完成高效的网络通讯服务,今天我们将举例说明如何个性化这个IoAdaptor。

我们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差异。

我们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在我们本地机器上跑起来我们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。

image
image

完成大概不到100 行的代码, Cool?Let’s roll!

首先,我们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实非常简单,可以描述为,将来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何做到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。

基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,我们要处理的几个事件应该包括:1)从上游客户端发起的链接请求--代理服务器的Accept事件,2)代理服务器连接目标服务器的Connect事件,3)上下游的数据事件onMessage。

zbus.NET的IoAdaptor提供的个性化事件如下

image

基本包括一个链接(客户端或者服务端)的生命周期,与消息的编解码。

我们的代理IoAdaptor就是逐一个性化处理。

第一步,编解码: 透明代理对消息内容不做理解,所以不需要编解码。

// 透传不需要编解码,简单返回ByteBuffer数据public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透传不需要编解码,简单返回ByteBuffer数据public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}

第二步,代理服务接入:

@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}

这里的逻辑思路是,代理服务器每接受到一个请求--通过onSessionAccepted表达,我们将同时创建一个到目标服务器的链接,今天的例子是目标MySQL服务器,注意上面的处理中把创建目标服务器Session过程与真正链接到目标服务分开(Dispatcher也提供合并二者的工具方法),是为了能在没有发生链接之前绑定上好上下游关系,通过Session的chain变量来表达,也就是当前Session的关联Session,关联好之后启动感兴趣Connect事件,逻辑处理完毕。

第三步,链接成功事件(第二步中需要链接到目标服务器)

@Overridepublic void onSessionConnected(Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; }   if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}

这里的一个核心是当上下游都处于链接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为什么在这里做的原因是一定要等上下游都正常态后才启动双方消息处理,不然会出现字节丢失。

第四步,处理上下游数据事件

@Overrideprotected void onMessage(Object msg, Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }

是不是非常简单,类似pipeline,从一端的数据写到另外一端。

原则上面4步结束,整个透明代理就完成了,但是为了处理链接异常清理,我们增加了Session清理处理,如下

@Overridepublic void onSessionToDestroy(Session sess) throws IOException {   try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try {    sess.chain.close();    sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}

工作就是解决上下游链接清理链接。

至此为止我们的IoAdaptor个性化就完成了,是不是非常简单,现在我们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。

public static void main(String[] args) throws Exception {   Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start();}

骚年,包括渣渣import和少许注释加起来折腾了不到100行,该跑一跑了,还是那句话,不是HelloWorld,你可以规模压力测。看看你是否在本地代理出来了你的目标服务MySQL,gl,hf, gogogo.

完整代码可运行代码如下,也可直接到zbus示例代码库中找到

https://git.oschina.net/rushmore/zbus/blob/master/src/test/java/org/zbus/net/TcpProxyAdaptor.java?dir=0&filepath=src%2Ftest%2Fjava%2Forg%2Fzbus%2Fnet%2FTcpProxyAdaptor.java&oid=08abff381d93519485e1c0ee2c35f1d4f8d1814c&sha=a29272ed99a8f21ec19a14b403ebee53a385e9a4

package org.zbus.net;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import org.zbus.net.core.Dispatcher;
import org.zbus.net.core.IoAdaptor;
import org.zbus.net.core.IoBuffer;
import org.zbus.net.core.Session;  
public class TcpProxyAdaptor extends IoAdaptor {private String targetAddress;public TcpProxyAdaptor(String targetAddress) {this.targetAddress = targetAddress;}// 透传不需要编解码,简单返回ByteBuffer数据public IoBuffer encode(Object msg) {if (msg instanceof IoBuffer) {IoBuffer buff = (IoBuffer) msg;return buff;} else {throw new RuntimeException("Message Not Support");}}// 透传不需要编解码,简单返回ByteBuffer数据public Object decode(IoBuffer buff) {if (buff.remaining() > 0) {byte[] data = new byte[buff.remaining()];buff.readBytes(data);return IoBuffer.wrap(data);} else {return null;}}@Overrideprotected void onSessionAccepted(Session sess) throws IOException {Session target = null;Dispatcher dispatcher = sess.getDispatcher();try {target = dispatcher.createClientSession(targetAddress, this);} catch (Exception e) {sess.asyncClose();return;}sess.chain = target;target.chain = sess;dispatcher.registerSession(SelectionKey.OP_CONNECT, target);}@Overridepublic void onSessionConnected(Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){ sess.asyncClose();return; }   if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ);chain.register(SelectionKey.OP_READ);}}@Overrideprotected void onMessage(Object msg, Session sess) throws IOException {  Session chain = sess.chain;if(chain == null){sess.asyncClose(); return;} chain.write(msg); }@Overridepublic void onSessionToDestroy(Session sess) throws IOException {   try {sess.close();} catch (IOException e) { //ignore} if (sess.chain == null) return; try {    sess.chain.close();    sess.chain.chain = null;sess.chain = null;} catch (IOException e) { }}@SuppressWarnings("resource")public static void main(String[] args) throws Exception {   Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306);server.setServerName("TcpProxyServer");server.start();}
}

文章转载自 开源中国社区[https://www.oschina.net]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/395514.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux添加jetdirect协议,Padavan 路由器固件 不能驱动 hp1005、hp1020之类打印机 foo2zjs ZjStream协议的linux打印机驱动程序...

单击链接,或剪切并粘贴下面的整个命令行以下载驱动程序。现在解压缩它:Unpack:$ tar zxf foo2zjs.tar.gz$ cd foo2zjs现在编译并安装它。 INSTALL文件包含更详细的说明; 请现在阅读。Compile:$ makeGet extra files from the web, such as .ICM profiles…

返回指定月份的周列表 包含 周序号、开始日期、结束日期(不包含周末)

/*** 返回当前年月的周列表 包含 周序号、开始日期、结束日期(不包含周末)* param year 年* param month 月* returns {Array} */function getYearMonthWeekList(year,month) {var weekList[];var time year "/" month "/01";//取当前月的第…

tez-site.xml_数字支付系统的未来-Google Tez和音频快速响应

tez-site.xmlby Vaidic Joshi通过Vaidic Joshi 数字支付系统的未来-Google Tez和音频快速响应 (The future of digital payment systems — Google Tez and Audio Quick Response) Google recently marked its entry into the Indian digital payments market by introducing …

Window上安装kafka

kafka在windows上的安装、运行 - 进阶者ryan-su - CSDN博客https://blog.csdn.net/u010283894/article/details/77106159 在Windows环境中安装并使用kafka - 心灵空谷幽兰 - 博客园https://www.cnblogs.com/xinlingyoulan/p/6054361.html?utm_sourceitdadao&utm_mediumref…

数集合有多少个TOJ(2469)

题目链接:http://acm.tju.edu.cn/toj/showp2469.html 感觉这个题目有点问题,算了不管他了,反正A了。 这里要注意的是求这个集合有多少种,那么就是要剔除重复数后,再数一下有多少个。 难一点的算法我也不会,…

linux path环境变量起什么作用,shell基础(5)PATH环境变量的作用和使用方法

释放双眼,带上耳机,听听看~!关于PATH的作用PATH说简单点就是一个字符串变量,当输入命令的时候LINUX会去查找PATH里面记录的路径。比如在根目录/下可以输入命令ls,在/usr目录下也可以输入ls,但其实ls这个命令根本不在这个两个目录下…

天气城市编码对应地区编码_如何在您的城市中建立强大的编码社区-我是如何做到的...

天气城市编码对应地区编码by Billy Le比利勒(Billy Le) 如何在您的城市中建立强大的编码社区-我是如何做到的 (How you can build a strong coding community in your city — and how I did it) Communities are important. They are the bedrock that glues together shared…

python3 自动打包部署war包

2019独角兽企业重金招聘Python工程师标准>>> 1 调用maven 命令打包 mvn -B -f D:/workspace/ksdcourse clean package 2 调用tomcat 部署war包 ; 需要添加 CATALINA_HOME的环境变量 代码如下: #!/usr/bin/python3# -*- coding: utf-8 -*-impo…

python 虚拟环境创建

创建虚拟环境:  sudo apt-get install virtualenv 新建虚拟环境文件夹 venv virtualenv venv 进入虚拟环境 source venv/bin/activate 安装套件列表模块: 用来记录项目中所使用到的各种模块,便于项目部署时统一安装所需模块 pip freeze > requir…

powershell开源新闻及简介

作者:PowerShll传教士 问:微软的PowerShell脚本语言已经开源了 ? 答:绝对真的!已经! 问:源码在哪? 答:微软.net源码网站。 http://referencesource.microsoft.com/ 问&…

linux nginx重新编译安装,Linux系统Nginx编译安装教程

1、下载nginx1.2.4#注:下载地址:http://nginx.org/download/nginx-1.2.4.tar.gzwget -c http://nginx.org/download/nginx-1.2.4.tar.gz2、安装#注:默认安装到/usr/local/nginxtar -zxvf nginx-1.2.4.tar.gzcd nginx-1.2.4./configure如果出现…

htt://3g.hn_根据我对“询问HN:谁在招聘?”的分析,开发人员技能发展趋势

htt://3g.hnby Ryan Williams瑞安威廉姆斯(Ryan Williams) 根据我对“询问HN:谁在招聘?”的分析,开发人员技能发展趋势 (Trending Developer Skills, Based on my Analysis of “Ask HN: Who’s Hiring?”) For people learning to code an…

day1作业二:多级菜单操作

作业二:多级菜单 (1)三级菜单 (2)可以次选择进入各子菜单 (3)所需新知识点:列表、字典 要求:输入back返回上一层,输入quit退出整个程序 思路: &am…

JDK源码分析(5)之 HashMap 相关

HashMap作为我们最常用的数据类型,当然有必要了解一下他内部是实现细节。相比于 JDK7 在JDK8 中引入了红黑树以及hash计算等方面的优化,使得 JDK8 中的HashMap效率要高于以往的所有版本,本文会详细介绍相关的优化,但是主要还是写 …

linux usb init,复制Linux liveUSB导致init.d脚本出错 - 不可能..?

请发表您的想法或想出的任何想法。我很想知道别人在想什么。整体问题当我安装一个简单的Java应用程序(我写的)通过/etc/init.d/在启动(在后台)运行时,它适用于我明确安装它的liveUSB。当我制作该棒的副本时,它永远不会成功启动。在引导liveUSB副本时&…

最小费用最大流模版

#include <iostream> #include <cstring> #include <cstdio> #include <queue> #include <algorithm>using namespace std;const int MAXN10100; const int MAXM40010; const int INF0x3f3f3f3f;struct Edge      //cost代表单位流量流过该…

fpga中的slack_是否想减少部署过程的恐怖程度? 在Slack中构建ChatOps。

fpga中的slackby Rick Mak麦瑞克(Rick Mak) 是否想减少部署过程的恐怖程度&#xff1f; 在Slack中构建ChatOps。 (Want to make the deployment process less scary? Build ChatOps in Slack.) In a company that makes mobile and web products, developers shouldn’t be t…

位运算-查找数组中唯一成对的数

基础实例一&#xff1a;使用位运算判断数的奇偶性 实例代码&#xff1a; public class Test {public static void main(String[] args) {System.out.println(isOdd(49));System.out.println(isOdd(50));}// 与运算public static boolean isOdd(int i){return (i & 1) ! 0;…

Docker实践:Cannot connect to the Docker daemon.

Docker实践&#xff1a;Cannot connect to the Docker daemon.查看docker daemon是否在运行 [rootlocalhost openec]# ps aux | grep dockerroot 3030 0.0 0.0 112656 984 pts/0 S 16:20 0:00 grep --colorauto docker启动docker[rootlocalhost openec]# ser…

linux虚拟终端时间短,使用Screen创建虚拟终端避免Linux远程断线

维护Linux的ssh工具在使用中&#xff0c;一旦遇到网络中断&#xff0c;则当前的shell就会自动关闭当前的工作进度就会丢失&#xff0c;这对于远程升级等比较耗费时间的工作是非常不利的对于远程调适代码也是很不可靠不安全的为此&#xff0c;可以使用screen这个工具来解决这个问…