java连接imserver_java后端IM消息推送服务开发——协议

最近在一家saas企业使用Mqtt开发IM消息推送服务,把开发中的一些问题记录下来,项目仍在商用中,完整的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Status,这个主题主要是协议protocol部分。

主要技术涉及到MongoDB,webservice,httpclient,Mqtt等

protocol分为四个模块类来实现,当然这是为了以后的扩展性比较好

首先看一下我们的主类,主要是mqtt基础方法的一个框架

public class MqttProtocol

{

private static Logger logger = Logger.getLogger(MqttProtocol.class);

public static final String HOST = "tcp://xx.xx.xx.xx:1883";

private static final String CLIENTID = "yyyy";

private MqttClient client;

private MqttConnectOptions options = new MqttConnectOptions();

//private String userName = "admin";

//private String passWord = "public";

public MqttMessage message;

private PushCallback callback;

/**

* 用于初始化mqttclient客户端,设置回调函数,同时连接mqtt服务器

* @throws MqttException

*/

public MqttProtocol() throws MqttException

{

//MemoryPersistence设置clientid的保存形式,默认为以内存保存

client = new MqttClient(HOST, CLIENTID, new MemoryPersistence());

callback = new PushCallback();

client.setCallback(callback);

options = new MqttConnectOptions();

options.setCleanSession(false);

options.setKeepAliveInterval(60);

connect();

}

/**

* 连接mqtt消息服务器,同时设置了断开重连的功能,主要是为了高可用性考虑,在断网服务器崩溃时候我们的程序仍然不会终止

*/

private void connect()

{

SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);

System.out.println(sdf.format(System.currentTimeMillis()));

boolean tryConnecting = true;

while (tryConnecting) {

try {

client.connect(options);

} catch (Exception e1) {

System.out.println("Connection attempt failed with '"+e1.getCause()+

"'. Retrying.");

}

if (client.isConnected()) {

System.out.println("Connected.");

tryConnecting = false;

} else {

pause();

}

}

}

private void pause() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

// Error handling goes here...

}

}

/**

*

* @param topic

* @param qos

* @throws MqttPersistenceException

* @throws MqttException

* 订阅相关主题

*/

public void subscribe(String topic , int qos) throws MqttPersistenceException,

MqttException

{

client.subscribe(topic, qos);

}

/**

*

* @throws MqttPersistenceException

* @throws MqttException

* 断开连接服务器

*/

public void disconnect() throws MqttPersistenceException,

MqttException

{

client.disconnect();

}

/**

*

* @author binshi

*实现mqttcallback接口,主要用于接收消息后的处理方法

*/

private class PushCallback implements MqttCallback {

/**

* 断开后 系统会自动调用这个函数,同时在这个函数里进行重连操作

*/

public void connectionLost(Throwable cause) {

// 连接丢失后,一般在这里面进行重连

System.out.println("连接断开,可以做重连");

connect();

try {

subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);

} catch (MqttPersistenceException e) {

// TODO Auto-generated catch block

e.printStackTrace();

} catch (MqttException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

/**

* 消息成功传送后,系统会自动调用此函数,表明成功向topic发送消息

*/

@Override

public void deliveryComplete(IMqttDeliveryToken arg0) {

// TODO Auto-generated method stub

System.out.println("deliveryComplete---------" + arg0.isComplete());

}

/**

* 连接mongo数据库,返回关于具体collection的Mongocollection

* @param collectionname

* @return

*/

public void messageArrived(String topic, MqttMessage message) throws Exception

{

System.out.println(topic);

SimpleDateFormat sdf= new SimpleDateFormat(Constant.DATE_FORMAT_MDYHMS);

System.out.println(sdf.format(System.currentTimeMillis()));

System.out.println("接收消息主题 : " + topic);

System.out.println("接收消息Qos : " + message.getQos());

System.out.println("接收消息内容 : " + new String(message.getPayload()));

//1 抽取事件信令消息

String messagejudge=new String(message.getPayload());

System.out.println("忽略所有robot消息以及offline离线消息");

JSONObject jo=new JSONObject();

try {

jo=JSONObject.fromObject(messagejudge);

} catch (Exception e) {

e.printStackTrace();

}

String from=jo.getString("from");

System.out.println("获得from"+from);

System.out.println("确定消息是否包含offline,如果包含取得offline,为1就不处理");

String offline=null;

if(messagejudge.contains("offline"))

{

offline=jo.getString("offline");

}

if((offline==null)&&(!from.contains("robot")))

{

System.out.println("处理非系统消息和非离线消息");

String type=jo.getString("type");

System.out.println("获得type"+type);

if(type.equals("shakehand"))

{

System.out.println("处理shakehand消息");

String admin="doyounkowwhy";

if(jo.toString().contains("admin"))

{

admin=jo.getString("admin");

}

System.out.println("取得admin 如果为1定义为客服,否则为普通用户 admin为"+admin);

if(admin.equals("1"))

{

System.out.println("处理客服握手消息");

System.out.println("发送握手成功消息");

MqttTopic retopic=client.getTopic(topic);

MsgOperation.sendSysMsgToClient(from,"0", "1005", "握手成功", null,retopic);

System.out.println("向客户端发送离线未接收的消息");

String convid=jo.getString("convid");

String database="dolina";

String collection="messages";

MongoDBDao.getMongoDBDaoInstance().sendOfflineMsgToClient(from, convid,retopic,database,collection);

}

else

{

System.out.println("处理普通用户的握手消息");

String appid=jo.getString("appid");

String pageid=jo.getString("pageid");

String convid=jo.getString("convid");

MqttTopic retopic=client.getTopic(topic);

MsgOperation.sendShakeHandInfo(from,convid,appid,pageid,retopic);

}

}

else if(type.equals("text")||type.equals("image"))

{

System.out.println("处理图片和文字消息");

String tmpindex=jo.getString("tmpindex");

String convid=jo.getString("convid");

MqttTopic retopic=client.getTopic(topic);

MsgOperation.getTextMsg( tmpindex, from, convid, retopic);

System.out.println("保存图片文字消息");

String database="dolina";

String collection="messages";

MongoDBDao.getMongoDBDaoInstance().saveTextMsg(database,collection,jo);

}

else if(type.equals("ack"))

{

System.out.println("处理ack消息");

String tmpindex=jo.getString("tmpindex");

String convid=jo.getString("convid");

String database="dolina";

String collection="messages";

MongoDBDao.getMongoDBDaoInstance().getAck(tmpindex,convid,from,database,collection);

}

}

}

}

/**

*

* @param args

* @throws MqttException

* 整个工程从这里开始执行,生成可执行jar包,这个设置为主类。

*/

public static void main(String[] args) throws MqttException

{

MqttProtocol signal = new MqttProtocol();

signal.message = new MqttMessage();

/**

server.message.setQos(2);

server.message.setRetained(false);

server.message.setPayload("给客户端124推送的信息".getBytes());

server.subscribe("/engyne/1/7/169573fcbc96a816281192222", 2);

*/

signal.subscribe(Constant.TOPIC_MQTT_PROTOCOL, 2);

System.out.println(signal.message.isRetained() + "------ratained状态");

}

}

接下来使我们的远程连接模块,主要是通过给定的url调用远程接口

public class RemoteOperation

{

private static Logger logger = Logger.getLogger(MqttProtocol.class);

public static JSONObject remoteCall(String url) throws HttpException, IOException

{

HttpClient httpClient = new HttpClient();

GetMethod method =null ;

method=new GetMethod(url);

int retcode = httpClient.executeMethod(method);

if (retcode != HttpStatus.SC_OK)

{// 发送不成功

logger.info("远程调用出错");

return null;

}

else

{

String body = method.getResponseBodyAsString();

logger.info(body+"远程调用php成功");

JSONObject jsonObject=new JSONObject();

try {

jsonObject=JSONObject.fromObject(body);

} catch (Exception e) {

e.printStackTrace();

}

if (method != null)

{

method.releaseConnection();

}

return jsonObject;

}

}

}

下面是Mongo数据库的相关操作的一个封装,设计为单例模式,相当于每次都使用同一个client打开连接,类似于连接池的概念,当然业务逻辑部分可以更换

public class MongoDBDao

{

private static Logger logger = Logger.getLogger(MongoDBDao.class);

/**

* MongoClient的实例代表数据库连接池,是线程安全的,可以被多线程共享,客户端在多线程条件下仅维持一个实例即可

* Mongo是非线程安全的,目前mongodb API中已经建议用MongoClient替代Mongo

*/

private MongoClient mongoClient = null;

/**

*

* 私有的构造函数

* 作者:shibin

*/

private MongoDBDao(){

if(mongoClient == null){

String url = Constant.MONGO_MQTT_URL;

String user = Constant.MONGO_MQTT_USER;

String password = Constant.MONGO_MQTT_PASSWORD;

String database = Constant.MONGO_MQTT_DATABASE;

int port = 27017;

ServerAddress serverAddress = new ServerAddress(url, port);

ListserverAddresses = new ArrayList();

serverAddresses.add(serverAddress);

MongoCredential credential = MongoCredential.createCredential(user, database, password.toCharArray());

Listcredentials = new ArrayList();

credentials.add(credential);

mongoClient = new MongoClient(serverAddresses, credentials);

System.out.println(mongoClient);

System.out.println("初始化client完成");

}

}

/********单例模式声明开始,采用饿汉式方式生成,保证线程安全********************/

//类初始化时,自行实例化,饿汉式单例模式

private static final MongoDBDao mongoDBDao = new MongoDBDao();

/**

*

* 方法名:getMongoDBDaoImplInstance

* 作者:shibin

*

* 描述:单例的静态工厂方法

* @return

*/

public static MongoDBDao getMongoDBDaoInstance(){

return mongoDBDao;

}

public void sendOfflineMsgToClient(String from, String convid,MqttTopic retopic,String database,String collection) throws MqttPersistenceException, MqttException

{

System.out.println("获得message的连接");

MongoDatabase mongoDatabase = mongoClient.getDatabase(database);

MongoCollection mongoCollection = mongoDatabase.getCollection(collection);

System.out.println("取得convid所对应的msg列表");

BasicDBObject query = new BasicDBObject();

query.put("_id", convid);

FindIterableiterable=null;

iterable = mongoCollection.find(query);

if(iterable.first()!=null)

{

System.out.println(iterable.first());

String res= iterable.first().toJson();

JSONObject jo=new JSONObject();

try {

jo=JSONObject.fromObject(res);

} catch (Exception e) {

e.printStackTrace();

}

JSONArray jsonArray=jo.getJSONArray("msg");

for(int i=0;i

doc.put("read", read);

Document tdoc = new Document();

tdoc.put("msg", doc);

UpdateOptions updateOptions=new UpdateOptions();

updateOptions.upsert(true);

mongoCollection.updateOne(filter, new Document("$addToSet", tdoc), updateOptions);

iterable = mongoCollection.find(query);

System.out.println("更新message之后的值"+iterable.first());

}

public void getAck(String tmpindex,String convid,String from,String database,String collection)

{

System.out.println("接收到ack消息后更新message中的read字段");

MongoDatabase mongoDatabase = mongoClient.getDatabase(database);

MongoCollection mongoCollection = mongoDatabase.getCollection(collection);

BasicDBObject query = new BasicDBObject();

query.put("_id", convid);

query.put("msg.tmpindex", tmpindex);

BasicDBObject query1 = new BasicDBObject();

query1.put("_id", convid);

FindIterable iterable;

FindIterable iterable2;

iterable = mongoCollection.find(query1);

iterable2 = mongoCollection.find(query);

System.out.println("更新message满足id过滤条件之前的值"+iterable.first());

System.out.println("更新message满足id和tmpindex过滤条件之前的值"+iterable2.first());

if(iterable2.first()!=null)

{

Document doc = new Document();

doc.put("msg.$.read", from);

UpdateOptions updateOptions=new UpdateOptions();

updateOptions.upsert(true);

mongoCollection.updateOne(query, new Document("$addToSet", doc), updateOptions);

}

iterable = mongoCollection.find(query1);

System.out.println("更新messages之后的值"+iterable.first());

}

}

剩下的关于业务逻辑方面的就不多说了,主要是关于mqtt高可用性断开重连的功能以及mongo相关的操作

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/432471.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

好多邮箱的SMTP设置

http://731771490.diandian.com/post/2011-04-20/19576550转载于:https://www.cnblogs.com/mantian/p/3828837.html

新手向:如何理解OpenGL中着色器,渲染管线,光栅化等概念

首先,光栅化(Rasterize/rasteriztion)。 这个词儿Adobe官方翻译成栅格化或者像素化。没错,就是把矢量图形转化成像素点儿的过程。我们屏幕上显示的画面都是由像素组成,而三维物体都是点线面构成的。要让点线面&#xf…

tankwar的java坦克子弹撞墙_tankwar

tankwar 是使用java开发的一个单机版的小游戏 (未使用任何游戏引擎).和90经典版的坦克大战有些不同, 这里是纯坦克之间的战争, 英雄坦克并不用保护它的家.特色:1. 游戏共设6个关卡. (支持无限关卡)关卡设置中包括敌人坦克数量, 移动速度, 子弹速度, 以及子弹的杀伤力, 炸弹数量…

材质加载

UMaterial* Material LoadObject<UMaterial>(NULL, TEXT("/Game/VertexColorMaterial"), NULL, LOAD_None, NULL);

java如何添加进程_如何创建一个进程,如何进程调用进程

java一般用线程就够了&#xff0c;多进程优势在于每个进程互不干扰&#xff0c;劣势在于太耗费内存&#xff0c;任何事情都不是绝对的&#xff0c;在编写Java程序时&#xff0c;有时候需要在Java程序中执行另外一个程序。1、启动程序Java提供了两种方法用来启动其它程序&#x…

输入学生成绩,并按升序排列 Ascending.java

import java.util.Arrays;import java.util.Scanner;public class Ascending{ public static void main(String[] args){ Scanner innew Scanner(System.in); int[] arrnew int[10]; for(int i0;i<arr.length;i){ System.out.println("请输入第"(i1)"个学生的…

java判断ftp创建目录是否成功_Java判断Ftp服务器目录是否存在,若不存在创建目录 ....

解决方法1&#xff1a;package com.soft4j.log4j;import java.io.IOException;import sun.net.ftp.FtpClient;public class FtpTest{static String middle_ftpServer "10.103.2.250";static String middle_user "ora9iftp";static String middle_passwor…

第三方静态库的使用

1. 工程根目录下创建文件夹“ThirdParty”&#xff0c;然后在此文件夹下创建“includes”和“libs”文件夹 2. 拷贝静态库的头文件和库文件到上面的文件夹下 3. 修改工程的build.cs文件&#xff0c;增加下面代码1&#xff09;在类中private string ThirdPartyPath { …

UITableView:改变 TableHeaderView 的高度

参考&#xff1a;http://stackoverflow.com/a/526825 有这么一种需求&#xff0c;在列表顶端显示一些别样的数据&#xff0c;而这个别样的数据则需要通过一个别样的 View 来展现&#xff0c;它便是 UITableView 的 tableHeaderView。 倘若 tableHeaderView 里的内容很固定&…

perl 远程 mysql_写的一个perl脚本,用于发送远程MySQL命令 -电脑资料

想写一些简化管理操作的脚本&#xff0c;下面是基础脚本之一&#xff0c;对于一个从来没使用过perl脚本的我来说还是有些难度的&#xff0c;直接上代码。此脚本用于发送远程MySQL命令并且接收结果&#xff0c;功能比较简单&#xff0c;后面会渐渐完善。#!/usr/bin/perl use Get…

【翻译】使用Ext JS设计响应式应用程序

原文&#xff1a;Designing Responsive Applications with Ext JS在当今这个时代&#xff0c;用户都希望Web应用程序无论在形状还是大小上&#xff0c;既能在桌面电脑&#xff0c;也能在移动设备上使用。使应用程序能适应不同的需求渐成趋势。幸运的是&#xff0c;Ext JS 5提供…

java各种的不好_译文《最常见的10种Java异常问题》

封面&#xff1a;洛小汐译者&#xff1a;潘潘知彼知己&#xff0c;方能百战不殆。前言本文总结了有关Java异常的十大常见问题。目录检查型异常(checked) vs. 非检查型异常(Unchecked)异常管理的最佳实践箴言为什么在try代码块中声明的变量不能在catch或者finally中被引用&#…

Hadoop 开源调度系统zeus(二)

紧跟之前Hadoop 开源调度系统zeus(一) 本节主要介绍一下zeus的架构&#xff1a; 先给一个zeus的架构图 无论Master还是Worker都有一套WEB UI&#xff0c;无论从哪个上面去看&#xff0c;看到的结果都是一样的&#xff0c;实际上一般仅仅看主 Master&#xff1a;调度内核&#x…

ENQUEUE_UNIQUE_RENDER_COMMAND_ONEPARAMETER

该宏非常重要&#xff0c; 其作用是&#xff1a; 创建一个渲染命令任务&#xff0c;并将该任务压入渲染队列&#xff0c;待渲染线程执行. 主要用于多线程渲染&#xff01;#define ENQUEUE_UNIQUE_RENDER_COMMAND_ONEPARAMETER(TypeName,ParamType1,ParamName1,ParamValue1,Code…

python十条建议_十条建议帮你提高Python编程效率

程序员的时间很宝贵&#xff0c;Python这门语言虽然足够简单、优雅&#xff0c;但并不是说你使用Python编程&#xff0c;效率就一定会高。要想节省时间、提高效率&#xff0c;还是需要注意很多地方的。今天就与大家分享资深Python程序员总结的10点建议&#xff0c;帮助大家大幅…

Django:快速搭建简单的Blog

一&#xff0c;创建项目 1, 为blog创建名为mysite的工程项目&#xff1a; django-admin.py startproject mysite2, 项目结构如下&#xff1a; mysite├── manage.py└── mysite├── __init__.py├── settings.py├── urls.py└── wsgi.pymanage.py ----- Django项目…

添加纹理到材质

1. 导入一个图片资源作为纹理贴图 2. 新建一个材质 3. 打开材质编辑器&#xff0c;将“TextureSample”和“TextureCoordinate”两个材质表达式放到蓝图中 4. 设置“TextureSample”的纹理属性为第1步导入的纹理贴图 5. 将“TextureCoordinate”的输出与“TextureSample”的UVs…

c# 路径空格---ProcessStartInfo参数问题

今天在整合程序的时候&#xff0c;要从一个程序转到另一个程序 当然要使用&#xff1a; ProcessStartInfo startInfo new ProcessStartInfo("\\Program Files\\IE\\IE.exe", s); Process.Start(startInfo); 不过对于wm来说 不支持Process.St…

发票 ocr java_OCR识别技术—增值税发票识别

增值税发票识别产品背景在经济活动中产生大量的增值税发票需要进行扫描、数据录入、人工校对等工作。传统的人工录入方式&#xff0c;用户需要投入大量的人力成本和时间成本&#xff0c;不仅抬高了运营成本&#xff0c;而且录入速度难以提升&#xff0c;错误率难以降低&#xf…