最近要弄一个消息推送的功能,在网上找了很多的关于pushlet的文章,虽然写的都很详细,但是本人看了以后却总觉得是模棱两可···不知道如何下手,最终参考了这些文章中的一些内容,并结合官网的源代码,做了自己的修改。
第一部分 修改的地方
首先修改了nl.justobjects.pushlet.core.Session,增加了even字段,增加了getEvent()方法,同时修改了create()方法,修改如下:
// Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
// Distributable under LGPL license. See terms of license at gnu.org.package nl.justobjects.pushlet.core;import nl.justobjects.pushlet.util.Log;
import nl.justobjects.pushlet.util.PushletException;/*** Represents client pushlet session state.** @author Just van den Broecke - Just Objects ©* @version $Id: Session.java,v 1.8 2007/11/23 14:33:07 justb Exp $*/
public class Session implements Protocol, ConfigDefs {private Controller controller;private Subscriber subscriber;/** 增加了even字段 */private Event event;private String userAgent;private long LEASE_TIME_MILLIS = Config.getLongProperty(SESSION_TIMEOUT_MINS) * 60 * 1000;private volatile long timeToLive = LEASE_TIME_MILLIS;public static String[] FORCED_PULL_AGENTS = Config.getProperty(LISTEN_FORCE_PULL_AGENTS).split(",");private String address = "unknown";private String format = FORMAT_XML;private String id;/*** Protected constructor as we create through factory method.*/protected Session() {}/*** Create instance through factory method.** @param anId* a session id* @return a Session object (or derived)* @throws PushletException* exception, usually misconfiguration*//** 修改前的create方法 */// public static Session create(String anId) throws PushletException {// Session session;// try {// session = (Session) Config.getClass(SESSION_CLASS,// "nl.justobjects.pushlet.core.Session").newInstance();// } catch (Throwable t) {// throw new PushletException(// "Cannot instantiate Session from config", t);// }//// // Init session// session.id = anId;// session.controller = Controller.create(session);// session.subscriber = Subscriber.create(session);// return session;// }public static Session create(String anId, Event anEvent)throws PushletException {Session session;try {session = (Session) Config.getClass(SESSION_CLASS,"nl.justobjects.pushlet.core.Session").newInstance();} catch (Throwable t) {throw new PushletException("Cannot instantiate Session from config", t);}// Init sessionsession.id = anId;session.controller = Controller.create(session);session.subscriber = Subscriber.create(session);session.event = anEvent;return session;}/*** 增加了getEVent方法*/public Event getEvent() {return event;}/*** Return (remote) Subscriber client's IP address.*/public String getAddress() {return address;}/*** Return command controller.*/public Controller getController() {return controller;}/*** Return Event format to send to client.*/public String getFormat() {return format;}/*** Return (remote) Subscriber client's unique id.*/public String getId() {return id;}/*** Return subscriber.*/public Subscriber getSubscriber() {return subscriber;}/*** Return remote HTTP User-Agent.*/public String getUserAgent() {return userAgent;}/*** Set address.*/protected void setAddress(String anAddress) {address = anAddress;}/*** Set event format to encode.*/protected void setFormat(String aFormat) {format = aFormat;}/*** Set client HTTP UserAgent.*/public void setUserAgent(String aUserAgent) {userAgent = aUserAgent;}/*** Decrease time to live.*/public void age(long aDeltaMillis) {timeToLive -= aDeltaMillis;}/*** Has session timed out?*/public boolean isExpired() {return timeToLive <= 0;}/*** Keep alive by resetting TTL.*/public void kick() {timeToLive = LEASE_TIME_MILLIS;}public void start() {SessionManager.getInstance().addSession(this);}public void stop() {subscriber.stop();SessionManager.getInstance().removeSession(this);}/*** Info.*/public void info(String s) {Log.info("S-" + this + ": " + s);}/*** Exceptional print util.*/public void warn(String s) {Log.warn("S-" + this + ": " + s);}/*** Exceptional print util.*/public void debug(String s) {Log.debug("S-" + this + ": " + s);}public String toString() {return getAddress() + "[" + getId() + "]";}
}/** $Log: Session.java,v $ Revision 1.8 2007/11/23 14:33:07 justb core classes* now configurable through factory* * Revision 1.7 2005/02/28 15:58:05 justb added SimpleListener example* * Revision 1.6 2005/02/28 12:45:59 justb introduced Command class* * Revision 1.5 2005/02/28 09:14:55 justb sessmgr/dispatcher factory/singleton* support* * Revision 1.4 2005/02/25 15:13:01 justb session id generation more robust* * Revision 1.3 2005/02/21 16:59:08 justb SessionManager and session lease* introduced* * Revision 1.2 2005/02/21 12:32:28 justb fixed publish event in Controller* * Revision 1.1 2005/02/21 11:50:46 justb ohase1 of refactoring Subscriber into* Session/Controller/Subscriber*/
然后修改了nl.justobjects.pushlet.core.SessionManager,修改了createSession()方法:
// Copyright (c) 2000 Just Objects B.V. <just@justobjects.nl>
// Distributable under LGPL license. See terms of license at gnu.org.package nl.justobjects.pushlet.core;import nl.justobjects.pushlet.util.Log;
import nl.justobjects.pushlet.util.PushletException;
import nl.justobjects.pushlet.util.Rand;
import nl.justobjects.pushlet.util.Sys;import java.rmi.server.UID;
import java.util.*;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;/*** Manages lifecycle of Sessions.** @author Just van den Broecke - Just Objects ©* @version $Id: SessionManager.java,v 1.12 2007/12/04 13:55:53 justb Exp $*/
public class SessionManager implements ConfigDefs {/*** Singleton pattern: single instance.*/private static SessionManager instance;static {// Singleton + factory pattern: create single instance// from configured class nametry {instance = (SessionManager) Config.getClass(SESSION_MANAGER_CLASS,"nl.justobjects.pushlet.core.SessionManager").newInstance();Log.info("SessionManager created className=" + instance.getClass());} catch (Throwable t) {Log.fatal("Cannot instantiate SessionManager from config", t);}}/*** Timer to schedule session leasing TimerTasks.*/private Timer timer;private final long TIMER_INTERVAL_MILLIS = 60000;/*** Map of active sessions, keyed by their id, all access is through mutex.*/private Map sessions = new HashMap(13);/*** Cache of Sessions for iteration and to allow concurrent modification.*/private Session[] sessionCache = new Session[0];/*** State of SessionCache, becomes true whenever sessionCache out of sync* with sessions Map.*/private boolean sessionCacheDirty = false;/*** Lock for any operation on Sessions (Session Map and/or -cache).*/private final Object mutex = new Object();/*** Singleton pattern: protected constructor needed for derived classes.*/protected SessionManager() {}/*** Visitor pattern implementation for Session iteration.* <p/>* This method can be used to iterate over all Sessions in a threadsafe way.* See Dispatcher.multicast and broadcast methods for examples.** @param visitor* the object that should implement method parm* @param method* the method to be called from visitor* @param args* arguments to be passed in visit method, args[0] will always be* Session object*/public void apply(Object visitor, Method method, Object[] args) {synchronized (mutex) {// Refresh Session cache if required// We use a cache for two reasons:// 1. to prevent concurrent modification from within visitor method// 2. some optimization (vs setting up Iterator for each apply()if (sessionCacheDirty) {// Clear out existing cachefor (int i = 0; i < sessionCache.length; i++) {sessionCache[i] = null;}// Refill cache and update statesessionCache = (Session[]) sessions.values().toArray(sessionCache);sessionCacheDirty = false;}// Valid session cache: loop and call supplied Visitor methodSession nextSession;for (int i = 0; i < sessionCache.length; i++) {nextSession = sessionCache[i];// Session cache may not be entirely filledif (nextSession == null) {break;}try {// First argument is always a Session objectargs[0] = nextSession;// Use Java reflection to call the method passed by the// Visitormethod.invoke(visitor, args);} catch (IllegalAccessException e) {Log.warn("apply: illegal method access: ", e);} catch (InvocationTargetException e) {Log.warn("apply: method invoke: ", e);}}}}/*** Create new Session (but add later).*//*** public Session createSession(Event anEvent) throws PushletException { * return Session.create(createSessionId());}* * * 这是原来的createSession方法,可以看到,虽然有anEvent参数··但是却没有在createSession的时候使用*/public Session createSession(Event anEvent) throws PushletException {// Trivialreturn Session.create(createSessionId(), anEvent);}/*** Singleton pattern: get single instance.*/public static SessionManager getInstance() {return instance;}/*** Get Session by session id.*/public Session getSession(String anId) {synchronized (mutex) {return (Session) sessions.get(anId);}}/*** Get copy of listening Sessions.*/public Session[] getSessions() {synchronized (mutex) {return (Session[]) sessions.values().toArray(new Session[0]);}}/*** Get number of listening Sessions.*/public int getSessionCount() {synchronized (mutex) {return sessions.size();}}/*** Get status info.*/public String getStatus() {Session[] sessions = getSessions();StringBuffer statusBuffer = new StringBuffer();statusBuffer.append("SessionMgr: " + sessions.length + " sessions \\n");for (int i = 0; i < sessions.length; i++) {statusBuffer.append(sessions[i] + "\\n");}return statusBuffer.toString();}/*** Is Session present?.*/public boolean hasSession(String anId) {synchronized (mutex) {return sessions.containsKey(anId);}}/*** Add session.*/public void addSession(Session session) {synchronized (mutex) {sessions.put(session.getId(), session);sessionCacheDirty = true;}// log(session.getId() + " at " + session.getAddress() + " adding ");info(session.getId() + " at " + session.getAddress() + " added ");}/*** Register session for removal.*/public Session removeSession(Session aSession) {synchronized (mutex) {Session session = (Session) sessions.remove(aSession.getId());if (session != null) {info(session.getId() + " at " + session.getAddress()+ " removed ");}sessionCacheDirty = true;return session;}}/*** Starts us.*/public void start() throws PushletException {if (timer != null) {stop();}timer = new Timer(false);timer.schedule(new AgingTimerTask(), TIMER_INTERVAL_MILLIS,TIMER_INTERVAL_MILLIS);info("started; interval=" + TIMER_INTERVAL_MILLIS + "ms");}/*** Stopis us.*/public void stop() {if (timer != null) {timer.cancel();timer = null;}synchronized (mutex) {sessions.clear();}info("stopped");}/*** Create unique Session id.*/protected String createSessionId() {// Use UUID if specified in config (thanks Uli Romahn)if (Config.hasProperty(SESSION_ID_GENERATION)&& Config.getProperty(SESSION_ID_GENERATION).equals(SESSION_ID_GENERATION_UUID)) {// We want to be Java 1.4 compatible so use UID class (1.5+ we may// use java.util.UUID).return new UID().toString();}// Other cases use random name// Create a unique session id// In 99.9999 % of the cases this should be generated at once// We need the mutext to prevent the chance of creating// same-valued ids (thanks Uli Romahn)synchronized (mutex) {String id;while (true) {id = Rand.randomName(Config.getIntProperty(SESSION_ID_SIZE));if (!hasSession(id)) {// Created unique session idbreak;}}return id;}}/*** Util: stdout printing.*/protected void info(String s) {Log.info("SessionManager: " + new Date() + " " + s);}/*** Util: stdout printing.*/protected void warn(String s) {Log.warn("SessionManager: " + s);}/*** Util: stdout printing.*/protected void debug(String s) {Log.debug("SessionManager: " + s);}/*** Manages Session timeouts.*/private class AgingTimerTask extends TimerTask {private long lastRun = Sys.now();private long delta;private Method visitMethod;public AgingTimerTask() throws PushletException {try {// Setup Visitor Methods for callback from SessionManagerClass[] argsClasses = { Session.class };visitMethod = this.getClass().getMethod("visit", argsClasses);} catch (NoSuchMethodException e) {throw new PushletException("Failed to setup AgingTimerTask", e);}}/*** Clock tick callback from Timer.*/public void run() {long now = Sys.now();delta = now - lastRun;lastRun = now;debug("AgingTimerTask: tick");// Use Visitor pattern to loop through Session objects (see visit()// below)getInstance().apply(this, visitMethod, new Object[1]);}/*** Callback from SessionManager during apply()*/public void visit(Session aSession) {try {// Age the leaseaSession.age(delta);debug("AgingTimerTask: visit: " + aSession);// Stop session if lease expiredif (aSession.isExpired()) {info("AgingTimerTask: Session expired: " + aSession);aSession.stop();}} catch (Throwable t) {warn("AgingTimerTask: Error in timer task : " + t);}}}
}/** $Log: SessionManager.java,v $ Revision 1.12 2007/12/04 13:55:53 justb* reimplement SessionManager concurrency (prev version was not thread-safe!)* * Revision 1.11 2007/11/23 14:33:07 justb core classes now configurable through* factory* * Revision 1.10 2007/11/10 14:47:45 justb make session key generation* configurable (can use uuid)* * Revision 1.9 2007/11/10 14:17:18 justb minor cosmetic changes just commit now* * Revision 1.8 2007/07/02 08:12:16 justb redo to original version of session* cache (with break, but nullify array first)* * Revision 1.7 2007/07/02 07:33:02 justb small fix in sessionmgr for holes in* sessioncache array (continue i.s.o. break)* * Revision 1.6 2006/11/18 12:13:47 justb made SessionManager constructor* protected to allow constructing derived classes* * Revision 1.5 2005/02/28 15:58:05 justb added SimpleListener example* * Revision 1.4 2005/02/28 12:45:59 justb introduced Command class* * Revision 1.3 2005/02/28 09:14:55 justb sessmgr/dispatcher factory/singleton* support* * Revision 1.2 2005/02/25 15:13:01 justb session id generation more robust* * Revision 1.1 2005/02/21 16:59:09 justb SessionManager and session lease* introduced*/
接着··就大胆的修改了nl.justobjects.pushlet.core.EventPullSource,这里我修改了
abstract protected Event pullEvent();
改为了
abstract protected void pullEvent();
83 public void run() {
84 Log.debug(getClass().getName() + ": starting...");
85 alive = true;
86 while (alive) {
87 try {
88
89 Thread.sleep(getSleepTime());
90
91 // Stopped during sleep: end loop.
92 if (!alive) {
93 break;
94 }
95
96 // If passivated wait until we get
97 // get notify()-ied. If there are no subscribers
98 // it wasts CPU to remain producing events...
99 synchronized (this) {
00 while (!active) {
01 Log.debug(getClass().getName() + ": waiting...");
02 wait();
03 }
04 }
05
06 } catch (InterruptedException e) {
07 break;
08 }
09
10 try {
11 // Derived class should produce an event.
12 Event event = pullEvent();
13
14 // Let the publisher push it to subscribers.
15 Dispatcher.getInstance().multicast(event);
16 } catch (Throwable t) {
17 Log.warn("EventPullSource exception while multicasting ", t);
18 t.printStackTrace();
19 }
20 }
21 Log.debug(getClass().getName() + ": stopped");
22 }
23}
改为了
public void run() {Log.debug(getClass().getName() + ": starting...");alive = true;while (alive) {try {Thread.sleep(getSleepTime());// Stopped during sleep: end loop.if (!alive) {break;}// If passivated wait until we get// get notify()-ied. If there are no subscribers// it wasts CPU to remain producing events...synchronized (this) {while (!active) {Log.debug(getClass().getName() + ": waiting...");wait();}}} catch (InterruptedException e) {break;}try {// Derived class should produce an event.pullEvent();// Let the publisher push it to subscribers.//Dispatcher.getInstance().multicast(event);} catch (Throwable t) {Log.warn("EventPullSource exception while multicasting ", t);t.printStackTrace();}}Log.debug(getClass().getName() + ": stopped");}
修改的原因··是原来的run线程启动以后会通过getEvent()来获得event然后通过 Dispatcher.getInstance().multicast(event); 将事件广播了出去;我这里的改造以后的思路是,pullEvent()不再返回event了,线程中也不去进行广播了,所有的操作,包括event的创建以及event的广播还是单播都在pullEvent()中进行。
最后就是js的修改了,修改ajax-pushlet-client.js
给PL增加字段parameters;修改后如下:
var PL = {NV_P_FORMAT: 'p_format=xml-strict',NV_P_MODE: 'p_mode=pull',pushletURL: null,webRoot: null,sessionId: null,STATE_ERROR: -2,STATE_ABORT: -1,STATE_NULL: 1,STATE_READY: 2,STATE_JOINED: 3,STATE_LISTENING: 3,state: 1,<span style="color:#cc0000;">parameters:[],</span>
......}
_doRequest方法修改:
_doRequest: function(anEvent, aQuery) {
// Check if we are not in any error state
if (PL.state < 0) {
PL._setStatus('died (' + PL.state + ')');
return;
}
// We may have (async) requests outstanding and thus
// may have to wait for them to complete and change state.
var waitForState = false;
if (anEvent == 'join' || anEvent == 'join-listen') {
// We can only join after initialization
waitForState = (PL.state < PL.STATE_READY);
} else if (anEvent == 'leave') {
PL.state = PL.STATE_READY;
} else if (anEvent == 'refresh') {
// We must be in the listening state
if (PL.state != PL.STATE_LISTENING) {
return;
}
} else if (anEvent == 'listen') {
// We must have joined before we can listen
waitForState = (PL.state < PL.STATE_JOINED);
} else if (anEvent == 'subscribe' || anEvent == 'unsubscribe') {
// We must be listeing for subscription mgmnt
waitForState = (PL.state < PL.STATE_LISTENING);
} else {
// All other requests require that we have at least joined
waitForState = (PL.state < PL.STATE_JOINED);
}
// May have to wait for right state to issue request
if (waitForState == true) {
PL._setStatus(anEvent + ' , waiting... state=' + PL.state);
setTimeout(function() {
PL._doRequest(anEvent, aQuery);
}, 100);
return;
}
// ASSERTION: PL.state is OK for this request
// Construct base URL for GET
var url = PL.pushletURL + '?p_event=' + anEvent;
// Optionally attach query string
if (aQuery) {
url = url + '&' + aQuery;
}
// Optionally attach session id
if (PL.sessionId != null) {
url = url + '&p_id=' + PL.sessionId;
if (anEvent == 'p_leave') {
PL.sessionId = null;
}
}
//这里是我修改的地方,我的数组中的偶数是参数名,奇数是参数内容,这里把我的参数拼接到了url中。
if(PL.parameters.length > 0) {
url+="&" + PL.parameters[0] + "=" + PL.parameters[1];
}
PL.debug('_doRequest', url);
PL._getXML(url, PL._onResponse);
// uncomment to use synchronous XmlHttpRequest
//var rsp = PL._getXML(url);
//PL._onResponse(rsp); */
},
额,修改完了,可以配置pushlet的相关参数,来使用pushlet了。
在web.xml中配置如下参数
<servlet><servlet-name>pushlet</servlet-name><servlet-class>nl.justobjects.pushlet.servlet.Pushlet</servlet-class><load-on-startup>3</load-on-startup></servlet><servlet-mapping><servlet-name>pushlet</servlet-name><url-pattern>/admin/pushlet.srv</url-pattern></servlet-mapping>
这是pushlet配置的基本参数,这里我配置的是/admin/pushlet.srv,是因为项目的路径是localhost:8080/项目名 ,而我的页面是在localhost:8080/项目名/admin/下面,所以加了/admin/pushlet.srv。如果你的页面就在项目目录下,就不用加前面的/admin了。
在sources.properties中配置如下参数
#
# Properties file for EventSource objects to be instantiated.
#
# Place this file in the CLASSPATH (e.g. WEB-INF/classes) or directly under WEB-INF.
#
# $Id: sources.properties,v 1.2 2007/11/10 14:12:16 justb Exp $
#
# Each EventSource is defined as <key>=<classname>
# 1. <key> should be unique within this file but may be any name
# 2. <classname> is the full class name
#
#
# Define Pull Sources here. These classes must be derived from
# nl.justobjects.pushlet.core.EventPullSource
# Inner classes are separated with a $ sign from the outer class.
source1=org.calonlan.soulpower.plug.HwPushlet$MessClazz
# TO BE DONE IN NEXT VERSION
# define Push Sources here. These must implement the interface
# nl.justobjects.pushlet.core.EventSource
这里把我的pushlet的实现类配置进去了。
最后就是如何使用了。
单播:
package org.calonlan.soulpower.plug;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;import nl.justobjects.pushlet.core.Dispatcher;
import nl.justobjects.pushlet.core.Event;
import nl.justobjects.pushlet.core.EventPullSource;
import nl.justobjects.pushlet.core.Session;
import nl.justobjects.pushlet.core.SessionManager;public class HwPushlet {private static String driver = "com.mysql.jdbc.Driver";private static String dbName = "dlw";private static String userName = "root";private static String passWord = "root";private static String url = "jdbc:mysql://localhost:3306/";static public class MessClazz extends EventPullSource {@Overrideprotected long getSleepTime() {return 1000 * 60 * 3;}@Overrideprotected void pullEvent() {Session[] sessions = SessionManager.getInstance().getSessions();for (int i = 0; i < sessions.length; i++) {String userId = sessions[i].getEvent().getField("uid");Event event = Event.createDataEvent("/mipc/he");Connection conn = null;String x = "";try {Class.forName(driver);conn = DriverManager.getConnection(url + dbName, userName,passWord);Statement statement = conn.createStatement();String sql = "select * from weiorder where mystate='0' and bname='"+ userId + "'";ResultSet rst = statement.executeQuery(sql);if (rst.next()) {x = "1";} else {x = "2";}} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {try {conn.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}event.setField("mess", x);Dispatcher.getInstance().unicast(event, sessions[i].getId());}}}}
页面js代码
<script type="text/javascript">
PL.parameters.push('uid');
PL.parameters.push('${user.username}');
PL._init();
PL.joinListen('/mipc/he');
function onData(event) {if(event.get("mess")=="1"){$.messager.show({title:'系统消息',msg:'您有新的订单请注意处理',timeout:0,showType:'slide'});document.getElementById('audio_player').play();}// 离开 // PL.leave();
}
</script>
广播:广播的还js页面的请求就和官网一样了,pullEvent中也只是简单的进行广播。代码如下
package org.calonlan.soulpower.plug;import nl.justobjects.pushlet.core.Dispatcher;
import nl.justobjects.pushlet.core.Event;
import nl.justobjects.pushlet.core.EventPullSource;public class HwPushlet {static public class MessClazz extends EventPullSource {@Overrideprotected long getSleepTime() {return 1000 * 60 * 3;}@Overrideprotected void pullEvent() {Event event = Event.createDataEvent("/mipc/he");String x = "";event.setField("mess", x);Dispatcher.getInstance().multicast(event);}}}
额,时间比较仓促···最近项目太忙了,唉。这里简单的记录一下。也不知道这种是不是很落后或是很复杂的修改方法。还忘高手们批评指正。修改后的jar包和js请在http://download.csdn.net/detail/u012613903/9483881下载。