public abstract class BaseClientMgr extends Subject implementsIClientConnect {protected boolean isRunning; //当前是否正在连接
protected boolean isSending; //是否正在发送 线程是否被占用
private int mPort; //连接服务器的端口号
private int mCommunication; //通讯类型
private int heartTimeOutCount = 0; //记录心跳超时次数
protected int function = 1200; //关闭连接功能号
public static final int RESPONSE_SUCCESS = 0x401;public static final int RESPONSE_FAIL = 0x402;public static final int RESPONSE_TIMEOUT = 0x403;public static final int REQUEST_HEARTBEAT_TIMEOUT = 0x410; //心跳超时
public static final int NOT_LOGIN = 0x411; //用户未登录
private String mConnectKey = "BasicServicesMgr";private String mHost; //连接服务器的IP地址
protected ArrayList mEntityMsg = null; //待发送消息集合
protected Context mContext; //Context对象
protected CommunicationThreadManager mManager; //该通讯层管理器
protected ParseByteThread mParseByteThread = null; //数据解析线程
protected ExecutorService executor; //线程连接池
protected BaseClientMgr(String host, intport, String key) {
init(host, port, key);
}//初始化
private void init(String host, intport, String key) {this.mContext =MeiApp.mContext;
isRunning= false;
isSending= false;
mHost=host;
mPort=port;
mConnectKey=key;
mEntityMsg= new ArrayList();
executor= Executors.newFixedThreadPool(10);
mParseByteThread= new ParseByteThread(this);
executor.execute(mParseByteThread);
}protected Handler basicHandler = newHandler() {
@Overridepublic voidhandleMessage(Message msg) {super.handleMessage(msg);switch(msg.what) {caseClientConstants.REQUEST://发送请求 连接占用
if (mEntityMsg != null && mEntityMsg.size() > 0) {
isSending= true;//清除handler的消息
basicHandler.removeMessages(ClientConstants.REQUEST);
basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);//请求类型 当为网络请求时判断网络状态 建立连接//检查连接是否可用
if(isRunning) {//直接发送消息
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_MESSAGE);
basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
}else{//建立连接
basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
Message msgCreate=Message.obtain();
msgCreate.what=ClientConstants.REQUEST_CREATE_CONNECT;
msgCreate.arg1= 0;
basicHandler.sendMessage(msgCreate);
}
}break;caseClientConstants.REQUEST_CREATE_CONNECT://建立连接
Log.i("mbk", "建立连接!");
isConnect("netty");break;caseClientConstants.REQUEST_SEND_MESSAGE://发送消息
Log.i("mbk", "发送消息!");if(isRunning) {if (mEntityMsg.size() > 0) {
Log.i("mbk", "发送数据!");
sendData(mEntityMsg.get(0));
basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);//设置请求超时
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_TIMEOUT, 3000);
}else{
Log.i("mbk", "数据发送完成!");
isSending= false;
}
}else{//重新建立连接
basicHandler.removeMessages(ClientConstants.REQUEST_CREATE_CONNECT);
basicHandler.sendEmptyMessage(ClientConstants.REQUEST_CREATE_CONNECT);
}break;caseClientConstants.REQUEST_SEND_HEARTBEAT:
Log.i("mbk", "发送心跳!");
mManager.sendHeart(function);
heartTimeOutCount++;
Log.i("lzy02", "heartTimeOutCount---------------" +heartTimeOutCount);if (heartTimeOutCount >= 3) {//大于等于3则认为与云棒无连接
callBack(null, null, "心跳超时!", REQUEST_HEARTBEAT_TIMEOUT);
}// //发送心跳
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT,3000);break;case ClientConstants.REQUEST_TIMEOUT://请求超时
Log.i("mbk", "请求超时!");
isRunning= false;
callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);break;
}
}
};public void sendHeartbeat(intfunction) {this.function =function;
}public voidsendData(IEntity entity) {
sendByte(ClientSocketUtils.sendDatas(mEntityMsg.get(0)));
}//建立连接
@Overridepublic voidisConnect(String netType) {
UdpEntity udpEntity= null;int type =CommunicationThreadManager.MBK_COMMUNICATION_NETTY;if (netType.equals("netty")) {//建立一个netty连接
type =CommunicationThreadManager.MBK_COMMUNICATION_NETTY;
mManager= new CommunicationThreadManager(mContext, null, mConnectKey, "192.168.31.241", mPort, type, mCommunicationCallBack);
Log.i("mbk", "发送地址---" + "192.168.31.241");
Log.i("mbk", "发送端口号---" +mPort);/** if (udpEntity != null) { Log.i("lzy02",
* "udpEntity---209----------udpEntity=="+udpEntity.getYunbangIp());
* mManager = new CommunicationThreadManager(mContext, null, mConnectKey,
* "192.168.31.241", mPort, type, mCommunicationCallBack);
* //Toast.makeText(mContext, "已通过Netty发送 ", Toast.LENGTH_SHORT).show();
* Log.i("mbk","netty发送云棒IP号---" + udpEntity.getYunbangIp()); } else {
* Log.i("lzy02", "udpEntity---211----------udpEntity == null");
* callBack(null, null, "无法连接netty!", RESPONSE_FAIL); }*/
//使用netty是时候 清理p2p
P2pClearUp();
}else{
}
Log.i("mbk", "初始化 连接服务器!" +netType);
}
@Overridepublic void sendByte(byte[] b) {try{if (mManager != null) {
mManager.sendDataToServer(newSendData(b));
}else{
isClose();
}
}catch(InterruptedException e) {
isClose();
}
}//服务端回调
private CommunicationCallBack mCommunicationCallBack = newCommunicationCallBack() {
@Overridepublic voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Log.i("mbk", "--------------------------请求异常--------------------------" +mCommunication);
isRunning= false;
callBack(null, null, "请求异常!", RESPONSE_FAIL);
}
@Overridepublic voidconnected(ChannelHandlerContext ctx) {
Log.i("mbk", "--------------------------连接成功--------------------------" +mCommunication);//mChx = ctx;
isRunning = true;
sendAgain();
}
@Overridepublic voidconnectFailure(Exception e) {
Log.i("mbk", "--------------------------连接服务器失败--------------------------" +mCommunication);
isRunning= false;
callBack(null, null, "连接服务器失败!", RESPONSE_FAIL);
}
@Overridepublic void channelRead(ChannelHandlerContext ctx, byte[] msg) {
Log.i("mbk", "--------------------------服务端返回--------------------------" +mCommunication);if (mParseByteThread != null) {
mParseByteThread.sendParseByte(msg);
}
}
@Overridepublic voidcommunicationOutTime() {
Log.i("mbk", "--------------------------连接超时--------------------------" +mCommunication);
isRunning= false;
callBack(null, null, "连接超时!", RESPONSE_TIMEOUT);
}
@Overridepublic voidquestTimeOut() {
Log.i("mbk", "--------------------------请求超时--------------------------" +mCommunication);
isRunning= false;
callBack(null, null, "请求超时!", RESPONSE_TIMEOUT);
}
};
@Overridepublic voidsendAgain() {//连接成功 发起请求
Log.i("mbk", "连接成功,数据重新发送!");//basicHandler.sendEmptyMessage(ClientConstants.REQUEST_SEND_MESSAGE);
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_MESSAGE, 500);
}//接收需要发送的实体
@Overridepublic voidsendEntity(IEntity entity) {if (mEntityMsg != null && entity != null) {
mEntityMsg.add(entity);if (!isSending) {//启动一个发送
Log.i("mbk", "发起请求!REQUEST_NET");
basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
}
}//if (mEntityMsg != null && mEntityMsg.size() == 2) {//mEntityMsg.remove(1);//}
}
@Overridepublic void callBack(PackageHeader header, byte[] data, String desc, inttype) {
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);switch(type) {caseRESPONSE_SUCCESS:
heartTimeOutCount= 0;
basicHandler.sendEmptyMessageDelayed(ClientConstants.REQUEST_SEND_HEARTBEAT,20000);switch(header.getFunction()) {case 9998:
Log.i("mbk", "服务端关闭!");
isClose();break;case 9999:
Log.i("mbk", "成功返回一个心跳!");break;case 999:
Log.i("mbk", "未知错误!");
callBack(null, null, "未知错误", RESPONSE_FAIL);break;default:
responseSuccess(header, data, desc, type);break;
}break;case REQUEST_HEARTBEAT_TIMEOUT://心跳超时3次认为与云棒无连接
/** Intent m2Intent = new Intent(MeiConfigs.NETWORK_PROMPT);
* m2Intent.putExtra("islogin", "3003");
* MeiApp.mContext.sendBroadcast(m2Intent);*/
break;caseRESPONSE_FAIL:
responseFail(header, data, desc, type);break;caseRESPONSE_TIMEOUT:
responseFail(header, data, desc, type);break;
}
}//请求成功
public void responseSuccess(PackageHeader header, byte[] data, String desc, inttype) {try{if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
IEntity entity= mEntityMsg.get(0);if (data != null && data.length > 0) {
entity.onDecode(new String(data, "utf-8"));//Log.i("mbk","云棒返回---" + "---" + new String(data, "utf-8"));//请求成功
Log.i("lzy02", "1--------------" +entity.getCode());
Log.i("mbk", "返回一条数据!");
Message msg=Message.obtain();
msg.obj=entity;
msg.arg1=header.getFunction();
msg.what=type;
entity.getHandler().sendMessage(msg);
}
}
}catch(Exception e) {
e.printStackTrace();
isClose();
}if (mEntityMsg != null && mEntityMsg.size() > 0) {
mEntityMsg.remove(0);
}
basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
isSending= false;if (mEntityMsg.size() > 0) {
basicHandler.sendEmptyMessage(ClientConstants.REQUEST);
}
}//请求失败
public void responseFail(PackageHeader header, byte[] data, String desc, inttype) {
Log.i("mbk", "请求失败! " +desc);
Message msg=Message.obtain();
msg.obj=desc;
msg.arg1= 0;
msg.what=type;if (mEntityMsg.size() > 0 && mEntityMsg.get(0).getHandler() != null) {
mEntityMsg.get(0).getHandler().sendMessage(msg);
}
isClose();
}//请求本地缓存返回
@Overridepublic voidcallBack(IEntity entity, String desc) {
Log.i("mbk", "回一返个缓存数据! ");if ("cache".equals(desc)) {if (entity != null && entity.getHandler() != null) {
Message msg=Message.obtain();
msg.obj=entity;
msg.what=RESPONSE_SUCCESS;
entity.getHandler().sendMessage(msg);
}
}
}public voidP2pClearUp() {if (mManager != null) {
mManager.p2pCleanup();
}
}
@Overridepublic voidisClose() {
Log.i("mbk", "关闭连接!" +isRunning);if (mManager != null) {if(isRunning) {try{
mManager.sendDataToServer(newSendData(ClientSocketUtils.sendExit(function)));
}catch(InterruptedException e) {
}
}else{
mManager.closeTheadManager();
mManager= null;
}
}if (mParseByteThread != null)
mParseByteThread.closeThread();if (mEntityMsg != null) {
mEntityMsg.clear();
}
P2pClearUp();
basicHandler.removeMessages(ClientConstants.REQUEST_SEND_HEARTBEAT);
basicHandler.removeMessages(ClientConstants.REQUEST_TIMEOUT);
isRunning= false;
isSending= false;
}
@Overridepublic void sendMsgFail(String netType, byte[] msg) {
}
@Overridepublic voidconnectFail(String netType) {
}
@Overridepublic voidisClearMsg() {if (mEntityMsg != null) {
mEntityMsg.clear();
}
}
}