工具:Android studio
软件方法及协议:socket、protobuf
实现原理:
通过本地建立一个socket,绑定服务器IP和port,然后connect,再开启另外线程定时心跳(注意这里的心跳不是自定义发送数据,而是采用socket本身的心跳功能sendUrgentData,否则有坑),心跳失败则自动重连,另一方面,启动循环从缓存获取socket数据。
大致框架
推送实现流程图
实现代码:
public class QpushClient implements Runnable {
protected static volatile QpushClient mInstance;//volatile可保证可见性和有序性
protected Handler mHandler;
protected InetSocketAddress mAddress;
String mIMEI;
protected String TAG = "QpushClient";
//socket连接的超时时间
private final int CONNECT_TIME_OUT = 5 * 1000;
//巡检周期
private final int CHECK_PERIOD = 2 * 1000;
//连接尝试间隔时间
private final int CONNECT_PERIOD = 30 * 1000;
private final int HEARTBEART_PERIOD = 30 * 1000;
//若连接失败或响应失败,则尝试次数为9,若仍无效,则不再尝试
private final int CONNECT_TRY_TIMES = 9;
private final int SEND_MSG_TYPE_HEARTBEAT = 1; //心跳包
private final int SEND_MSG_TYPE_SOCKET_LOGIN = 2; //发送socket登录包
//连接尝试次数
private int mConnectCount;
Socket mClientSocket;
String mHost;
int mPort;
//设置是否去读取数据
boolean isStartRecieveMsg = false;
//开启心跳检测
boolean isKeepHeartBeat = false;
BufferedReader mReader;
ScheduledExecutorService executor;//定位定时器
HeartBeatTask mHeartBeatTask;
private QpushClient(Handler handler) {
mHandler = handler;
}
public static QpushClient getInstance(Handler handler) {
if (mInstance == null) {
synchronized(QpushClient.class){ //线程安全,所以加锁
if(mInstance == null){
mInstance = new QpushClient(handler);
}}
}
return mInstance;
}
public void init(String host, int port,String imei) {
mHost = host;
mPort = port;
mIMEI = imei;
new Thread(this).start();
isStartRecieveMsg = true;
isKeepHeartBeat = true;
}
@Override
public void run() {
mAddress = new InetSocketAddress(getIP(mHost), mPort);
//尝试连接,若未连接,则设置尝试次数
while (mConnectCount < CONNECT_TRY_TIMES) {
connect();
if (!mClientSocket.isConnected()) {
mConnectCount++;
sleep(CONNECT_PERIOD);
} else {
mConnectCount = 0;//连接上,则恢复置0
break;
}
}
if (mClientSocket.isConnected()) {
keepHeartBeat();
recvProtobufMsg();
// recvStringMsg();
}
}
private void connect() {
try {
if (mClientSocket == null) {
mClientSocket = new Socket();
}
mClientSocket.connect(mAddress, CONNECT_TIME_OUT);
Driver.HeartBeat heartbeat = Driver.HeartBeat.newBuilder().setImei(mIMEI).build();
Driver.ClientMessage socketLogin = Driver.ClientMessage.newBuilder().setType(1).setHeartBeat(heartbeat).build();
sendMsg(socketLogin,SEND_MSG_TYPE_SOCKET_LOGIN);
} catch (IOException e) {
e.printStackTrace();
Log.e(TAG, "连接失败 mClientSocket.connect fail ,ip=" + mAddress.getHostName() + ";port=" + mAddress.getPort() + ";detail:" + e.getMessage());
}
}
/**
* 心跳维护
*/
private void keepHeartBeat() {
//设置心跳频率,启动心跳
if (isKeepHeartBeat) {
if (mHeartBeatTask == null) {
mHeartBeatTask = new HeartBeatTask();
}
try {
if (executor != null) {
executor.shutdownNow();
executor = null;
}
executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(
mHeartBeatTask,
1000, //initDelay
HEARTBEART_PERIOD, //period
TimeUnit.MILLISECONDS);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
/**
* @param message
* @param type 1=login;2=心跳;
*/
public void sendMsg(String message, int type) {
PrintWriter writer;
try {
writer = new PrintWriter(new OutputStreamWriter(mClientSocket.getOutputStream(), "UTF-8"), true);
writer.println(message);
Log.e(TAG, "sendMsg Socket.isClosed()=" + mClientSocket.isClosed() + ";connect=" + mClientSocket.isConnected());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
switch (type) {
case SEND_MSG_TYPE_HEARTBEAT:
mHandler.obtainMessage(QpushService.PUSH_TYPE_PROTO_DATA, "发送心跳异常").sendToTarget();
break;
}
}
}
/**
* @param message
* @param type 1=login;2=心跳;
*/
public void sendMsg(Driver.ClientMessage message, int type) {
try {
message.writeTo(mClientSocket.getOutputStream());
Log.e(TAG, "sendMsg success");
} catch (IOException e) {
// TODO Auto-generated catch block
if (type == SEND_MSG_TYPE_HEARTBEAT) {
//心跳失败
Log.e(TAG, "心跳失败");
if (mClientSocket.isClosed()) {
connect();
}
} else {
Log.e(TAG, "发送数据失败");
}
e.printStackTrace();
}
}
/**
* 不断的检测是否有服务器推送的数据过来
*/
public void recvStringMsg() {
while (mClientSocket != null && mClientSocket.isConnected() && !mClientSocket.isClosed()) {
try {
mReader = new BufferedReader(new InputStreamReader(mClientSocket.getInputStream(), "UTF-8"));
String data = mReader.readLine();
Log.e(TAG, "recvStringMsg data=" + data);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
}
sleep(2000);
}
sleep(CHECK_PERIOD);
}
/**
* 不断的检测是否有服务器推送的数据过来
*/
public void recvProtobufMsg() {
while (isStartRecieveMsg) {
try {
byte[] resultByte = recvByteMsg(mClientSocket.getInputStream());
if (resultByte != null) {
Driver.ClientMessage retMsg = Driver.ClientMessage.parseFrom(resultByte);
mHandler.obtainMessage(QpushService.PUSH_TYPE_PROTO_DATA, retMsg).sendToTarget();
} else {
Log.e(TAG, "resultByte is null");
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
}
sleep(5 * 1000);
}
}
/**
* 接收server的信息
*
* @return
*/
public byte[] recvByteMsg(InputStream inpustream) {
try {
byte len[] = new byte[1024];
int count = inpustream.read(len);
byte[] temp = new byte[count];
for (int i = 0; i < count; i++) {
temp[i] = len[i];
}
return temp;
} catch (Exception localException) {
localException.printStackTrace();
}
return null;
}
class HeartBeatTask implements Runnable {
@Override
public void run() {
//执行发送心跳
try {
mClientSocket.sendUrgentData(65);
} catch (IOException e) {
e.printStackTrace();
try {
Log.e(TAG, "socket心跳异常,尝试断开,重连");
mClientSocket.close();
mClientSocket = null;
//然后尝试重连
connect();
} catch (IOException e1) {
e1.printStackTrace();
}
}
Log.e(TAG, "发送心跳,Socket.isClosed()=" + mClientSocket.isClosed() + ";connect" + mClientSocket.isConnected());
}
}
/**
* 通过域名获取IP
*
* @param domain
* @return
*/
public String getIP(String domain) {
String IPAddress = "";
InetAddress ReturnStr1 = null;
try {
ReturnStr1 = java.net.InetAddress.getByName(domain);
IPAddress = ReturnStr1.getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
Log.e(TAG, "获取IP失败" + e.getMessage());
}
return IPAddress;
// return "192.168.3.121";
}
private void sleep(long sleepTime) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 销毁socket
*/
public void onDestory() {
if (mClientSocket != null) {
try {
mClientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
mClientSocket = null;
}
}
/*
* Ready for use.
*/
public void close() {
try {
if (mClientSocket != null && !mClientSocket.isClosed())
mClientSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
QpushService类:主要负责启动socket客户端及管理消息
public class QpushService extends Service {
//数据推送,不显示到通知栏
final static int PUSH_TYPE_DATA = 1;
final static int PUSH_TYPE_PROTO_DATA = 2;
Handler mHandler;
String TAG = "QpushService";
QpushClient mClient;
@Override
public void onCreate() {
Log.e(TAG, "onCreate");
initHandler();
super.onCreate();
}
/**
* 初始化handler,用于接收推送的消息
*/
private void initHandler() {
mHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
switch (msg.what) {
case PUSH_TYPE_DATA:
Log.e(TAG, "PUSH_TYPE_DATA");
String data = (String) msg.obj;
ToastUtil.showShort(QpushService.this.getApplicationContext(), data);
case PUSH_TYPE_PROTO_DATA:
Driver.ClientMessage clientMessage = (Driver.ClientMessage) msg.obj;
Log.e(TAG, "PUSH_TYPE_PROTO_DATA");
Intent intentCancelRighr = new Intent();
intentCancelRighr.setAction(PushReceiverAction.PUSH_ACTION);
intentCancelRighr.putExtra("data", clientMessage);
getApplicationContext().sendBroadcast(intentCancelRighr);
break;
}
}
};
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
Log.e(TAG, "onBind");
return null;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
mClient = QpushClient.getInstance(mHandler);
mClient.init(HttpUrls.SOCKET_HOST, HttpUrls.SOCKET_PORT, AppUtils.getPesudoUniqueID());
return super.onStartCommand(intent, flags, startId);
}
@Override
public void onDestroy() {
Log.e(TAG, "onDestroy");
super.onDestroy();
mClient.onDestory();
}
}
PushReceiver类:接收service消息的广播,运行的主进程,防止接收不到数据
public class PushReceiver extends BroadcastReceiver {
Context mContext;
String TAG = "PushReceiver";
@Override
public void onReceive(Context context, Intent intent) {
mContext = context;
if (intent.getAction().equals(PushReceiverAction.PUSH_ACTION)) {
//推送的通知消息
Driver.ClientMessage clientMessage = (Driver.ClientMessage) intent.getSerializableExtra("data");
if (clientMessage != null) {
switch (clientMessage.getType()) {
case 1: //socket登录,不做处理
Log.e(TAG, "socket登录消息,sid=" + clientMessage.getHeartBeat().getSid());
MyApplication.sid = clientMessage.getHeartBeat().getSid();
// showNotification("socket","登录成功",1);
break;
case 2://通知告知被迫取消领取红包资格
Log.e(TAG, "取消抢红包资格,title=" + clientMessage.getLogOut().getTitle() + ";message=" + clientMessage.getLogOut().getContent());
ToastUtil.showShort(mContext, "被迫取消抢红包资格");
Intent intentCancelRighr = new Intent();
intentCancelRighr.setAction(PushReceiverAction.CANCEL_REDPACKET_RIGHT);
intentCancelRighr.putExtra("data", clientMessage);
context.sendBroadcast(intentCancelRighr);
MyApplication.mLoginStatus = 1;
showNotification(clientMessage.getLogOut().getTitle(), clientMessage.getLogOut().getContent(), 1);
break;
case 3: //领取红包/未领到红包消息
Intent intentGo = new Intent();
intentGo.setAction(clientMessage.getRedPacket().getResult() == 1 ? PushReceiverAction.GET_REDPACKET_ACTION : PushReceiverAction.DONT_GET_REDPACKET_ACTION);
intentGo.putExtra("data", clientMessage);
context.sendBroadcast(intentGo);
Log.e(TAG, "红包消息,content=" + clientMessage.getRedPacket().getContent());
showNotification(clientMessage.getRedPacket().getTitle(), clientMessage.getRedPacket().getContent(), 2);
break;
}
} else {
Log.e(TAG, "clientMessage is null");
}
}
}
/**
* 在状态栏显示通知
*/
@SuppressWarnings("deprecation")
private void showNotification(String title, String content, int type) {
// 创建一个NotificationManager的引用
NotificationManager notificationManager = (NotificationManager) mContext.getSystemService(android.content.Context
.NOTIFICATION_SERVICE);
NotificationCompat.Builder builder = new NotificationCompat.Builder(mContext);
builder.setContentTitle(title)
.setContentText(content)
.setTicker(title)
// .setContentIntent(getDefalutIntent(Notification.FLAG_AUTO_CANCEL))//点击通知栏设置意图
.setWhen(System.currentTimeMillis())
.setPriority(Notification.PRIORITY_HIGH)
// .setAutoCancel(true)
.setOngoing(false)//ture,设置他为一个正在进行的通知。他们通常是用来表示一个后台任务,用户积极参与(如播放音乐)或以某种方式正在等待,因此占用设备(如一个文件下载,同步操作,主动网络连接)
.setSmallIcon(R.drawable.logo);
Notification notification = builder.build();
Log.e(TAG, "isSoundOn:" + SharePreferUtils.isSoundOn(mContext));
if (SharePreferUtils.isSoundOn(mContext) && type == 2) { //type==红包消息
notification.sound = Uri.parse("android.resource://" + mContext.getPackageName() + "/" + R.raw.diaoluo_da);
}
Intent notificationIntent;
if (type == 2) {
//进入我的资产
notificationIntent = HomeActivity.toWitchFragmentOfHome(mContext, Constants.TOWITCHOFHOME_MYASSETS);
} else {
//进入主页流量球
notificationIntent = HomeActivity.toWitchFragmentOfHome(mContext, Constants.TOWITCHOFHOME_BALL);
}
Log.e(TAG, "type=" + type);
// 点击该通知后要跳转的Activity
PendingIntent contentItent = PendingIntent.getActivity(mContext, 0, notificationIntent, 0);
notification.contentIntent = contentItent;
notification.flags = Notification.FLAG_AUTO_CANCEL;
// 把Notification传递给NotificationManager
notificationManager.notify(type, notification);
}
}
Driver类:该类是通过proto编译出来的,代码量比较大,Driver.proto文件贴下来,Driver.java文件我附上下载链接,编译教程见:protobuf在Android推送中的使用方法(比json、xml都要好的数据结构)
Driver.proto文件:
syntax = "proto3";
//编译教程:http://www.jianshu.com/p/8036003cb849
message ClientMessage
{
int32 type = 1; //类型1=socket登录;2=取消抢红包资格;3=领取红包消息
HeartBeat heartBeat = 2; //socket登录
LogOut logOut = 3; //取消抢红包资格消息
RedPacket redPacket = 4; //领取&未抢到红包消息
}
message HeartBeat
{
string sid = 1; //服务ID
string imei = 2; //手机唯一编号imei
}
message LogOut
{
string sid = 1; //服务ID
int32 result = 2; //1成功,2失败
string title = 3; //消息标题
string content = 4; //消息内容
}
message RedPacket
{
string sid = 1; //服务ID
int32 result = 2; //1成功,2失败
int32 price = 3; //红包金额单位分
string title = 4; //消息标题
string content = 5; //消息内容
}