本文介绍如何通过使用映射触发器来处理一致性事件。 基本上,建议使用Oracle Coherence中的分布式数据管理来研究Oracle Coherence API的基本配置和实现。
映射触发器是Oracle Coherence提供最高度定制的缓存管理系统的最重要功能之一。 MapTrigger代表一个功能代理,该代理允许针对基础地图进行验证,拒绝或修改变异操作。 此外,它们还可以防止无效事务,增强安全性,提供事件日志记录和审核以及收集有关数据修改的统计信息。
例如,我们有与NamedCache一起使用的代码,并且我们想在将条目插入地图之前更改条目的行为或内容。 通过启用映射触发器,可以在不修改所有现有代码的情况下进行此更改。
有两种方法可以将“地图触发器”功能添加到应用程序:
1)一种MapTriggerListener可以使用具有指定的高速缓存来注册MapTrigger
2)可以在coherence-cache-config.xml配置文件中使用类工厂机制
在以下示例应用程序中,通过遵循第一种方法来实现MapTrigger功能。 创建了一个称为OTV的新集群,并通过在该集群的两个成员之间使用的用户映射NamedCache对象分发了用户bean。
二手技术:
JDK 1.6.0_35
Spring3.1.2
连贯性3.7.1 Maven的3.0.2 步骤1:建立已完成的专案
创建一个Maven项目,如下所示。 (可以使用Maven或IDE插件来创建它)。
第2步:相干套餐
通过Coherence软件包下载Coherence
步骤3:图书馆
首先,将Spring依赖项添加到Maven的pom.xml中。
<!-- Spring 3.1.2 dependencies --><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring.version}</version></dependency>
Coherence库是手动安装到Local Maven Repository的,其描述如下所示添加到pom.xml中。 同样,如果不使用Maven来管理项目,则可以将coherence.jar文件添加到classpath中。
<!-- Coherence library(from local repository) --><dependency><groupId>com.tangosol</groupId><artifactId>coherence</artifactId><version>3.7.1</version></dependency>
为了创建runnable-jar ,可以使用以下Maven插件。
<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.3.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation='org.apache.maven.plugins.shade.resource.ManifestResourceTransformer'><mainClass>com.otv.exe.Application</mainClass></transformer><transformerimplementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'><resource>META-INF/spring.schemas</resource></transformer></transformers></configuration></execution></executions></plugin>
步骤4:建立otv-coherence-cache-config.xml
第一个Coherence配置文件是otv-coherence-cache-config.xml 。 它包含(分布式或复制的)缓存方案和缓存方案映射配置。 创建的缓存配置应添加到coherence-cache-config.xml中 。
<?xml version='1.0'?><cache-config xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns='http://xmlns.oracle.com/coherence/coherence-cache-config'xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-cache-configcoherence-cache-config.xsd'><caching-scheme-mapping><cache-mapping><cache-name>user-map</cache-name><scheme-name>MapDistCache</scheme-name></cache-mapping></caching-scheme-mapping><caching-schemes><distributed-scheme><scheme-name>MapDistCache</scheme-name><service-name>MapDistCache</service-name><backing-map-scheme><local-scheme><unit-calculator>BINARY</unit-calculator></local-scheme></backing-map-scheme><autostart>true</autostart></distributed-scheme></caching-schemes></cache-config>
步骤5:创建tangosol-coherence-override.xml
第二个Coherence配置文件是tangosol-coherence-override.xml 。 它包含集群,成员身份和可配置缓存工厂配置。
集群的第一个成员的tangosol-coherence-override.xml:
<?xml version='1.0'?><coherence xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns='http://xmlns.oracle.com/coherence/coherence-operational-config'xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'><cluster-config><member-identity><cluster-name>OTV</cluster-name><role-name>OTV1</role-name></member-identity><unicast-listener><well-known-addresses><socket-address id='1'><address>x.x.x.x</address><port>8089</port></socket-address><socket-address id='2'><address>x.x.x.x</address><port>8090</port></socket-address></well-known-addresses><machine-id>1001</machine-id><address>x.x.x.x</address><port>8089</port><port-auto-adjust>true</port-auto-adjust></unicast-listener></cluster-config><configurable-cache-factory-config><init-params><init-param><param-type>java.lang.String</param-type><param-value system-property='tangosol.coherence.cacheconfig'>otv-coherence-cache-config.xml</param-value></init-param></init-params></configurable-cache-factory-config></coherence>
集群的第二个成员的tangosol-coherence-override.xml:
<?xml version='1.0'?><coherence xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xmlns='http://xmlns.oracle.com/coherence/coherence-operational-config'xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'><cluster-config><member-identity><cluster-name>OTV</cluster-name><role-name>OTV2</role-name></member-identity><unicast-listener> <well-known-addresses><socket-address id='1'><address>x.x.x.x</address><port>8090</port></socket-address><socket-address id='2'><address>x.x.x.x</address><port>8089</port></socket-address></well-known-addresses><machine-id>1002</machine-id><address>x.x.x.x</address><port>8090</port><port-auto-adjust>true</port-auto-adjust></unicast-listener></cluster-config><configurable-cache-factory-config><init-params><init-param><param-type>java.lang.String</param-type><param-value system-property='tangosol.coherence.cacheconfig'>otv-coherence-cache-config.xml</param-value></init-param></init-params></configurable-cache-factory-config></coherence>
步骤6:创建applicationContext.xml
Spring配置文件applicationContext.xml已创建。
<beans xmlns='http://www.springframework.org/schema/beans'xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'xsi:schemaLocation='http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans-3.0.xsd'><!-- Beans Declaration --><bean id='userCacheService' class='com.otv.srv.UserCacheService'></bean><bean id='userCacheUpdater' class='com.otv.exe.UserCacheUpdater'><property name='userCacheService' ref='userCacheService' /></bean></beans>
步骤7:创建用户分类
创建了一个新的User Spring bean。 该bean将分布在OTV集群中的两个节点之间。 对于序列化,已经实现了java.io.Serializable接口,但是可以实现PortableObject以获得更好的性能。
package com.otv.user;import java.io.Serializable;/*** User Bean** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public class User implements Serializable {private static final long serialVersionUID = -1963764656789800896L;private String id;private String name;private String surname; public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getSurname() {return surname;}public void setSurname(String surname) {this.surname = surname;}@Overridepublic String toString() {StringBuilder strBuff = new StringBuilder();strBuff.append('id : ').append(id);strBuff.append(', name : ').append(name);strBuff.append(', surname : ').append(surname);return strBuff.toString();}
}
步骤8:建立IUserCacheService接口
为服务层创建了一个新的IUserCacheService接口,以公开缓存功能。
package com.otv.srv;import com.tangosol.net.NamedCache;/*** IUserCacheService Interface exposes User Cache operations** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public interface IUserCacheService {/*** Adds user entries to cache** @param Object key* @param Object value**/void addToUserCache(Object key, Object value);/*** Deletes user entries from cache** @param Object key**/void deleteFromUserCache(Object key);/*** Gets user cache** @retun NamedCache Coherence named cache*/NamedCache getUserCache();}
步骤9:建立UserCacheService IMPL类别
通过实现IUserCacheService创建UserCacheService 。
package com.otv.srv;import com.otv.listener.UserMapListener;
import com.otv.trigger.UserMapTrigger;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import com.tangosol.util.MapTriggerListener;/*** CacheService Class implements the ICacheService** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public class UserCacheService implements IUserCacheService {private NamedCache userCache = null;private static final String USER_MAP = 'user-map';private static final long LOCK_TIMEOUT = -1;public UserCacheService() {setUserCache(CacheFactory.getCache(USER_MAP));getUserCache().addMapListener(new UserMapListener());getUserCache().addMapListener(new MapTriggerListener(new UserMapTrigger()));} /*** Adds user entries to cache** @param Object key* @param Object value**/public void addToUserCache(Object key, Object value) {// key is lockedgetUserCache().lock(key, LOCK_TIMEOUT);try {// application logicgetUserCache().put(key, value);} finally {// key is unlockedgetUserCache().unlock(key);}}/*** Deletes user entries from cache** @param Object key**/public void deleteFromUserCache(Object key) {// key is lockedgetUserCache().lock(key, LOCK_TIMEOUT);try {// application logicgetUserCache().remove(key);} finally {// key is unlockedgetUserCache().unlock(key);}}/*** Gets user cache** @retun NamedCache Coherence named cache*/public NamedCache getUserCache() {return userCache;}public void setUserCache(NamedCache userCache) {this.userCache = userCache;}}
步骤10:创建UserMapTrigger类
通过实现com.tangosol.util.MapTrigger接口,可以创建一个新的UserMapTrigger类。 此触发器在将条目插入到用户映射中之前处理逻辑。
package com.otv.trigger;import org.apache.log4j.Logger;import com.otv.listener.UserMapListener;
import com.otv.user.User;
import com.tangosol.util.MapTrigger;/*** UserMapTrigger executes required logic before the operation is committed** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public class UserMapTrigger implements MapTrigger {private static final long serialVersionUID = 5411263646665358790L;private static Logger logger = Logger.getLogger(UserMapListener.class);/*** Processes user cache entries** @param MapTrigger.Entry entry**/public void process(MapTrigger.Entry entry) {User user = (User) entry.getValue();String id = user.getId();String name = user.getName();String updatedName = name.toUpperCase();String surname = user.getSurname();String updatedSurname = surname.toUpperCase();if (!updatedName.equals(name)) {user.setName(updatedName);}if (!updatedSurname.equals(surname)) {user.setSurname(updatedSurname);}user.setId(user.getName() + '_' + user.getSurname());entry.setValue(user);logger.debug('UserMapTrigger processes the entry before committing. '+ 'oldId : ' + id+ ', newId : ' + ((User)entry.getValue()).getId()+ ', oldName : ' + name+ ', newName : ' + ((User)entry.getValue()).getName()+ ', oldSurname : ' + surname+ ', newSurname : ' + ((User)entry.getValue()).getSurname());}public boolean equals(Object o) {return o != null && o.getClass() == this.getClass();}public int hashCode() {return getClass().getName().hashCode();}
}
步骤11:建立USERMAPLISTENER IMPL类别
创建一个新的UserMapListener类。 该侦听器接收分布式用户映射事件。
package com.otv.listener;import org.apache.log4j.Logger;import com.tangosol.util.MapEvent;
import com.tangosol.util.MapListener;/*** UserMapListener Class listens user cache events** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public class UserMapListener implements MapListener {private static Logger logger = Logger.getLogger(UserMapListener.class);public void entryDeleted(MapEvent me) {logger.debug('Deleted Key = ' + me.getKey() + ', Value = ' + me.getOldValue());}public void entryInserted(MapEvent me) {logger.debug('Inserted Key = ' + me.getKey() + ', Value = ' + me.getNewValue());}public void entryUpdated(MapEvent me) {
// logger.debug('Updated Key = ' + me.getKey() + ', New_Value = ' + me.getNewValue() + ', Old Value = ' + me.getOldValue());}
}
步骤12:创建CacheUpdater类
创建CacheUpdater类以添加新条目以缓存和监视缓存内容。
package com.otv.exe;import java.util.Collection;import org.apache.log4j.Logger;import com.otv.srv.IUserCacheService;
import com.otv.user.User;/*** CacheUpdater Class updates and prints user cache entries** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public class UserCacheUpdater implements Runnable {private static Logger logger = Logger.getLogger(UserCacheUpdater.class);private IUserCacheService userCacheService;/*** Runs the UserCacheUpdater Thread**/public void run() { //New User are created...User user = new User();//Only Name and Surname properties are set and Id property will be set at trigger level.user.setName('James');user.setSurname('Joyce');//Entries are added to cache...getUserCacheService().addToUserCache('user1', user);// The following code block shows the entry which will be inserted via second member of the cluster
// so it should be opened and above code block should be commented-out before the project is built.// user.setName('Thomas');
// user.setSurname('Moore');
// getUserCacheService().addToUserCache('user2', user);//Cache Entries are being printed...printCacheEntries();}/*** Prints User Cache Entries**/@SuppressWarnings('unchecked')private void printCacheEntries() {Collection<User> userCollection = null;try {while(true) {userCollection = (Collection<User>)getUserCacheService().getUserCache().values();for(User user : userCollection) {logger.debug('Cache Content : '+user);}Thread.sleep(60000);}} catch (InterruptedException e) {logger.error('CacheUpdater is interrupted!', e);}}public IUserCacheService getUserCacheService() {return userCacheService;}public void setUserCacheService(IUserCacheService userCacheService) {this.userCacheService = userCacheService;}
}
步骤13:创建应用程序类
创建应用程序类以运行应用程序。
package com.otv.exe;import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;/*** Application class starts the application** @author onlinetechvision.com* @since 29 Oct 2012* @version 1.0.0**/
public class Application {/*** Starts the application** @param String[] args**/public static void main(String[] args) {ApplicationContext context = new ClassPathXmlApplicationContext('applicationContext.xml');UserCacheUpdater cacheUpdater = (UserCacheUpdater) context.getBean('userCacheUpdater');new Thread(cacheUpdater).start();}
}
nbsp;
步骤14:建立专案
生成OTV_Spring_Coherence_MapTrigger项目后,将创建OTV_Spring_Coherence_MapTrigger-0.0.1-SNAPSHOT.jar 。
重要说明:集群的成员具有不同的Coherence配置,因此应为每个成员分别构建项目。
步骤15:通过启动集群成员来运行项目
在集群成员上运行OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar文件后,将在第一个成员的控制台上显示以下输出日志:
--A new cluster is created and First Member joins the cluster and adds a new entry to the cache.
29.10.2012 18:26:44 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : JAMES_JOYCE
, oldName : James, newName : JAMES, oldSurname : Joyce, newSurname : JOYCE
29.10.2012 18:26:44 DEBUG (UserMapListener.java:25) - Inserted Key = user1, Value = id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:26:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE.......--Second Member joins the cluster and adds a new entry to the cache.
29.10.2012 18:27:33 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : THOMAS_MOORE,
oldName : Thomas, newName : THOMAS, oldSurname : Moore, newSurname : MOORE
29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE.......--After second member adds a new entry, cache content is shown as below :
29.10.2012 18:27:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:27:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
第二成员的控制台:
--After Second Member joins the cluster and adds a new entry to the cache, cache content is shown as below and the members has got same entries :.
29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
步骤16:下载
https://github.com/erenavsarogullari/OTV_Spring_Coherence_MapTrigger
参考: Online Technology Vision博客中的JCG合作伙伴 Eren Avsarogullari 使用地图触发功能进行的一致性事件处理 。
翻译自: https://www.javacodegeeks.com/2012/11/coherence-event-processing-by-using-map-trigger-feature.html