[华为北向网管NCE开发教程(6)消息订阅

1.作用

之前介绍的都是我们向网管NCE发起请求获取数据,消息订阅则反过来,是网管NCE系统给我们推送信息。其原理和MQ,JMS这些差不多,这里不过多累述。

2.场景

所支持订阅的场景有如下,以告警通知为例,当我们订阅告警通知以后,如果NCE网管有告警通知产生以后,就会给订阅的人发送一个通知(也就是实时告警推送)。那么我们就可以接收到如下的通知。

2024-06-06 00:09:30c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160140.0Z, X.733::EventType=securityAlarm, emsTime=20240605160142.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784751, isClearable=true, affectedTPList=21}
2024-06-06 00:09:36c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160147.0Z, X.733::EventType=securityAlarm, emsTime=20240605160149.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784752, isClearable=true, affectedTPList=21}
2024-06-06 00:09:43c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160155.0Z, X.733::EventType=securityAlarm, emsTime=20240605160156.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784753, isClearable=true, affectedTPList=21}
2024-06-06 00:09:50c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160202.0Z, X.733::EventType=securityAlarm, emsTime=20240605160203.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CLEARED, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784755, isClearable=true, affectedTPList=21}
2024-06-06 00:10:01c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_ALARM<告警通知>,通知参数:{X.733::ProposedRepairActions=, rcaiIndicator=false, probableCauseQualifier=0-2, serviceAffecting=SA_NON_SERVICE_AFFECTING, additionalText=Huawei/NCE;167772242, X.733::CorrelatedNotifications=21, neTime=20240605160213.0Z, X.733::EventType=securityAlarm, emsTime=20240605160214.0Z, objectType=OT_MANAGED_ELEMENT, objectTypeQualifier=, probableCause=UNIDENTIFIED, perceivedSeverity=PS_CRITICAL, nativeEMSName=Huawei/NCE;土默特, nativeProbableCause=NE_NOT_LOGIN, layerRate=1, additionalInfo=21, objectName=21, notificationId=11191929201784756, isClearable=true, affectedTPList=21}

同理,如果我们订阅了文件传输状态通知,当存在文件传输完成的时候会收到如下通知,通知信息中包含了,文件传输完成后,文件的存储地址。

2024-06-06 10:15:26c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786334, fileName=pm/sdh/0605-0606/3145740.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:39c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786335, fileName=pm/sdh/0605-0606/3145734.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
2024-06-06 10:15:42c.c.s.m.c.n.ConsumerNotice - 收到事件通知:NT_FILE_TRANSFER_STATUS<文件传输状态通知>,通知参数:{notificationId=11191929201786336, fileName=pm/sdh/0605-0606/3145739.txt, transferStatus=FT_COMPLETED, percentComplete=100, failureReason=}
通知类型说明
NT_ALARM告警通知
NT_ALARM_UPDATED告警更新通知
NT_TCA性能越限告警通知
NT_OBJECT_CREATION对象创建通知
NT_OBJECT_DELETION对象删除通知
NT_ATTRIBUTE_VALUE_CHANGE属性改变通知
NT_STATE_CHANGE状态改变通知
NT_ROUTE_CHANGE路由改变通知
NT_PROTECTION_SWITCH保护倒换通知
NT_FILE_TRANSFER_STATUS文件传输状态通知
NT_EPROTECTION_SWITCH设备保护倒换通知事件
NT_ASON_RESOURCE_CHANGE智能资源改变通知
NT_PRBSTEST_STATUS伪随机码测试状态通知
NT_WDMPROTECTION_SWITCH波分保护倒换通知
NT_ATMPROTECTION_SWITCH ATM保护倒换通知
NT_RPRPROTECTION_SWITCH RPR保护组倒换通知事件格式
NT_IPPROTECTION_SWITCH Tunnel保护组倒换通知事件格式

3.如何开订阅(SpringBoot为例)

3.1登录NCE

3.1.1CorbaLoginReq

配置文件的登录参数如下

huawei: nce: login: corba:host: 127.0.0.1port: 12001userName: 111111passWord: 111111

配置文件参数注入Spring Bean

import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;import lombok.Data;@Data
@SpringBootConfiguration
@ConfigurationProperties(prefix = "huawei.nce.login.corba")
public class CorbaLoginReq {private String host;private String port;private String userName;private String passWord;
}

3.1.2CorbaLoginRes

登录返回参数

import org.omg.DynamicAny.DynAnyFactory;import lombok.Data;
import mtnm.tmforum.org.emsSession.EmsSession_I;@Data
public class CorbaLoginRes {private org.omg.CORBA.ORB orb;private org.omg.PortableServer.POA rootPOA ;private EmsSession_I emsSession;private DynAnyFactory dynAnyFactory;
}

3.1.3TANmsSession_IImpl

import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.session.Session_I;
/*** NmsSession_IPOA for EMS(NCE) invoking. * @author**/
public class TANmsSession_IImpl extends NmsSession_IPOA {public void eventLossCleared(String endTime) {log("TANmsSession_IImpl.eventLossCleared(String endTime) is invoked by EMS(NCE).");log("endTime:"+endTime);}public void eventLossOccurred(String startTime, String notificationId) {log("TANmsSession_IImpl.eventLossOccurred(String startTime, String notificationId) is invoked by EMS.");log("startTime:"+startTime+", notificationId:"+notificationId);}public Session_I associatedSession() {log("TANmsSession_IImpl.associatedSession() is invoked by EMS(NCE).");return null;}public void endSession() {log("TANmsSession_IImpl.endSession() is invoked by EMS(NCE).");}public void ping() {log("TANmsSession_IImpl.ping() is invoked by EMS(NCE).");}private static void log(String str){System.out.println(str);}
}

3.1.4BaseCorbaService

public interface BaseCorbaService {/*** @description:登录华为nce-corba* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年3月1日 下午4:19:59*/CorbaLoginRes login();/*** @description:清空登录* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年6月7日 下午3:24:02*/void clearLogin();
}
import java.util.Arrays;
import java.util.List;import org.omg.CosNaming.NameComponent;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynAnyFactoryHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import com.collect.sdh.module.corba.entity.CorbaLoginReq;
import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.entity.TANmsSession_IImpl;
import com.collect.sdh.module.corba.service.BaseCorbaService;import mtnm.tmforum.org.common.Common_IHolder;
import mtnm.tmforum.org.emsMgr.EMSMgr_I;
import mtnm.tmforum.org.emsMgr.EMSMgr_IHelper;
import mtnm.tmforum.org.emsSession.EmsSession_I;
import mtnm.tmforum.org.emsSession.EmsSession_IHolder;
import mtnm.tmforum.org.emsSession.EmsSession_IPackage.managerNames_THolder;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_I;
import mtnm.tmforum.org.emsSessionFactory.EmsSessionFactory_IHelper;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_I;
import mtnm.tmforum.org.equipment.EquipmentInventoryMgr_IHelper;
import mtnm.tmforum.org.equipment.EquipmentOrHolderIterator_IHolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolderList_THolder;
import mtnm.tmforum.org.equipment.EquipmentOrHolder_T;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfoList_THolder;
import mtnm.tmforum.org.equipment.ObjectAdditionalInfo_T;
import mtnm.tmforum.org.equipment.PhysicalLocationInfoList_THolder;
import mtnm.tmforum.org.equipment.PhysicalLocationInfo_T;
import mtnm.tmforum.org.globaldefs.NameAndStringValue_T;
import mtnm.tmforum.org.globaldefs.NamingAttributesIterator_IHolder;
import mtnm.tmforum.org.globaldefs.NamingAttributesList_THolder;
import mtnm.tmforum.org.globaldefs.ProcessingFailureException;
import mtnm.tmforum.org.managedElement.ManagedElementIterator_IHolder;
import mtnm.tmforum.org.managedElement.ManagedElementList_THolder;
import mtnm.tmforum.org.managedElement.ManagedElement_T;
import mtnm.tmforum.org.managedElement.ManagedElement_THolder;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_I;
import mtnm.tmforum.org.managedElementManager.ManagedElementMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_I;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetworkMgr_IHelper;
import mtnm.tmforum.org.multiLayerSubnetwork.MultiLayerSubnetwork_T;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkIterator_IHolder;
import mtnm.tmforum.org.multiLayerSubnetwork.SubnetworkList_THolder;
import mtnm.tmforum.org.nmsSession.NmsSession_I;
import mtnm.tmforum.org.nmsSession.NmsSession_IPOA;
import mtnm.tmforum.org.subnetworkConnection.CCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnectList_THolder;
import mtnm.tmforum.org.subnetworkConnection.CrossConnect_T;
import mtnm.tmforum.org.subnetworkConnection.Route_THolder;
import mtnm.tmforum.org.subnetworkConnection.SNCIterator_IHolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnectionList_THolder;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_T;
import mtnm.tmforum.org.subnetworkConnection.SubnetworkConnection_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointIterator_IHolder;
import mtnm.tmforum.org.terminationPoint.TerminationPointList_THolder;
import mtnm.tmforum.org.terminationPoint.TerminationPoint_T;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkIterator_IHolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLinkList_THolder;
import mtnm.tmforum.org.topologicalLink.TopologicalLink_T;@Service
public class BaseCorbaServiceImpl implements BaseCorbaService {@Autowiredprivate CorbaLoginReq loginReq;private CorbaLoginRes login;/*** @description:清空登录* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年6月7日 下午3:24:02*/@Overridepublic void clearLogin() {login = null;}/*** @description:登录华为nce-corba* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年3月1日 下午4:19:59*/@Overridepublic CorbaLoginRes login() {if(login != null) {/*本应该检测登录是否可用,如果可用,则返回登录信息,不可用则重新登录,(不知道是否可以使用emsSession.ping()来判断)但是没找到华为有这个接口,因此如果出现不可抗力因素导致登录无效,例如网络中断则通过com.collect.sdh.module.test.TestCorbaController.cleanLogin()清空登录*/	return login;}try {login = new CorbaLoginRes();String[] argv = new String[2];argv[0] = "-ORBInitRef";argv[1] = "NameService=corbaloc::" + loginReq.getHost() + ":" + loginReq.getPort() + "/NameService";org.omg.CORBA.ORB orb = org.omg.CORBA.ORB.init(argv, null);org.omg.PortableServer.POA rootPOA = org.omg.PortableServer.POAHelper.narrow(orb.resolve_initial_references("RootPOA"));rootPOA.the_POAManager().activate();DynAnyFactory dynAnyFactory = DynAnyFactoryHelper.narrow(orb.resolve_initial_references("DynAnyFactory"));org.omg.CosNaming.NamingContextExt nc = org.omg.CosNaming.NamingContextExtHelper.narrow(orb.resolve_initial_references("NameService"));org.omg.CosNaming.NameComponent[] name;name = new NameComponent[5];name[0] = new NameComponent("TMF_MTNM", "Class");name[1] = new NameComponent("HUAWEI", "Vendor");name[2] = new NameComponent("Huawei/NCE", "EmsInstance");name[3] = new NameComponent("2.0", "Version");name[4] = new NameComponent("Huawei/NCE", "EmsSessionFactory_I");EmsSessionFactory_I emsSessionFactory = EmsSessionFactory_IHelper.narrow(nc.resolve(name));NmsSession_IPOA pNmsSessionServant = new TANmsSession_IImpl();NmsSession_I nmsSession = pNmsSessionServant._this(orb);EmsSession_IHolder emsSessionInterfaceHolder = new EmsSession_IHolder();emsSessionFactory.getEmsSession(loginReq.getUserName(), loginReq.getPassWord(), nmsSession, emsSessionInterfaceHolder);EmsSession_I emsSession = emsSessionInterfaceHolder.value;login.setDynAnyFactory(dynAnyFactory);login.setOrb(orb);login.setRootPOA(rootPOA);login.setEmsSession(emsSession);return login;} catch (Exception e) {e.printStackTrace();return null;}}
}

3.2定制通知

3.2.1ConsumerNotice

需要实现接口:org.omg.CosNotifyComm.StructuredPushConsumerPOA

import java.util.HashMap;
import java.util.Map;import org.omg.CosEventComm.Disconnected;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyComm.InvalidEventType;
import org.omg.CosNotifyComm.StructuredPushConsumerPOA;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.AnyUtil;import lombok.extern.log4j.Log4j2;/*** @description:消费通知* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:57:26*/
@Log4j2
public class ConsumerNotice extends StructuredPushConsumerPOA{private CorbaLoginRes loginRes;public ConsumerNotice(CorbaLoginRes loginRes) {super();this.loginRes = loginRes;}private static Map<String, String> noticeTypes = new HashMap<>();static {noticeTypes.put("NT_ALARM", "告警通知");noticeTypes.put("NT_ALARM_UPDATED", "告警更新通知");noticeTypes.put("NT_TCA", "性能越限告警通知");noticeTypes.put("NT_OBJECT_CREATION", "对象创建通知");noticeTypes.put("NT_OBJECT_DELETION", "对象删除通知");noticeTypes.put("NT_ATTRIBUTE_VALUE_CHANGE", "属性改变通知");noticeTypes.put("NT_STATE_CHANGE", "状态改变通知");noticeTypes.put("NT_ROUTE_CHANGE", "路由改变通知");noticeTypes.put("NT_PROTECTION_SWITCH", "保护倒换通知");noticeTypes.put("NT_FILE_TRANSFER_STATUS", "文件传输状态通知");noticeTypes.put("NT_EPROTECTION_SWITCH", "设备保护倒换通知事件");noticeTypes.put("NT_ASON_RESOURCE_CHANGE", "智能资源改变通知");noticeTypes.put("NT_PRBSTEST_STATUS", "伪随机码测试状态通知");noticeTypes.put("NT_WDMPROTECTION_SWITCH", "波分保护倒换通知");noticeTypes.put("NT_ATMPROTECTION_SWITCH", "ATM保护倒换通知");noticeTypes.put("NT_RPRPROTECTION_SWITCH", "RPR保护组倒换通知事件格式");noticeTypes.put("NT_IPPROTECTION_SWITCH", "Tunnel保护组倒换通知事件格式");}@Overridepublic void disconnect_structured_push_consumer() {log.info("Consumer disconnect_structured_push_consumer");}@Overridepublic void push_structured_event(StructuredEvent event) throws Disconnected {String eventType = event.header.fixed_header.event_type.type_name;Map<String, Object> eventData = new HashMap<>(event.filterable_data.length);for (int i = 0; i < event.filterable_data.length; i++) {if (!ObjectUtils.isEmpty(event.filterable_data[i])) {eventData.put(event.filterable_data[i].name, AnyUtil.parseAny( event.filterable_data[i].value, loginRes.getDynAnyFactory()));}}log.info("收到事件通知:{}<{}>,通知参数:{}",eventType, noticeTypes.get(eventType), eventData);}@Overridepublic void offer_change(EventType[] arg0, EventType[] arg1) throws InvalidEventType {}}

3.2.2AnyUtil

用于解析返回的信息。

import org.omg.CORBA.Any;
import org.omg.CORBA.TCKind;
import org.omg.DynamicAny.DynAnyFactory;
import org.omg.DynamicAny.DynArray;
import org.omg.DynamicAny.DynEnum;
import org.omg.DynamicAny.DynSequence;
import org.omg.DynamicAny.DynStruct;
import org.omg.DynamicAny.DynUnion;/*** @description:org.omg.DynamicAny格式化工具* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午11:33:17*/
public class AnyUtil {/*** @description:格式化数据* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午11:34:17*/public static String parseAny(Any any, DynAnyFactory factory){if( null==any ){return null;}StringBuilder result = new StringBuilder();try {switch (any.type().kind().value()) {case TCKind._tk_char:result.append(any.extract_char());break;case TCKind._tk_null:break;case TCKind._tk_boolean:result.append(any.extract_boolean());break;case TCKind._tk_short:result.append(any.extract_short());break;case TCKind._tk_long:result.append(any.extract_long());break;case TCKind._tk_double:result.append(any.extract_double());break;case TCKind._tk_float:result.append(any.extract_float());break;case TCKind._tk_octet:result.append(any.extract_octet());break;case TCKind._tk_ulong:result.append(any.extract_ulong());break;case TCKind._tk_string:result.append(any.extract_string());break;case TCKind._tk_enum:{DynEnum dynEnum = (DynEnum) factory.create_dyn_any(any);result.append(dynEnum.get_as_string());break;}case TCKind._tk_any:{any=factory.create_dyn_any(any).get_any();result.append(any);break;}case TCKind._tk_objref:{result.append(any.extract_Object());break;}case TCKind._tk_struct:case TCKind._tk_except:{DynStruct dynstruct = (DynStruct) factory.create_dyn_any(any);org.omg.DynamicAny.NameValuePair[] members = dynstruct.get_members();result.append("{");for (int i = 0; i < members.length; i++) {if(i>0){result.append(" ");}result.append(members[i].id).append(" ").append(parseAny(members[i].value, factory));}result.append("}");break;}case TCKind._tk_union:DynUnion dynunion = (DynUnion) factory.create_dyn_any(any);result.append(dynunion.member_name()).append(" ");result.append(parseAny(dynunion.member().to_any(), factory));break;case TCKind._tk_sequence:DynSequence dynseq = (DynSequence) factory.create_dyn_any(any);Any[] contents = dynseq.get_elements();result.append("{");for (int i = 0; i < contents.length; i++){result.append(parseAny(contents[i], factory));}result.append("}");break;case TCKind._tk_array:DynArray dynarray = (DynArray) factory.create_dyn_any(any);Any[] arrayContents = dynarray.get_elements();result.append("{");for (int i = 0; i < arrayContents.length; i++){result.append(parseAny(arrayContents[i], factory)).append("");}result.append("}");break;default:result.append(any.type().kind().value());}} catch (Exception ex) {ex.printStackTrace();}return new String(result.toString().getBytes(StandardCharsets.ISO_8859_1));}
}

3.3订阅通知

SubscribeNotice 实现 Runnable,即订阅的时候,另起一个线程来订阅。该线程负责订阅。

3.3.1SubscribeNotice

import org.omg.CORBA.IntHolder;
import org.omg.CORBA.Object;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannelHolder;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyComm.StructuredPushConsumerHelper;
import org.springframework.util.ObjectUtils;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.utils.JsonUtils;import lombok.extern.log4j.Log4j2;/*** @description:订阅消费通知* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午9:33:18*/
@Log4j2
public class SubscribeNotice implements Runnable{/*** 登录corba成功后的参数*/private CorbaLoginRes loginRes;/*** 记录订阅通知的通道ID的存储文件地址*/private String poxyIdPath;public SubscribeNotice(CorbaLoginRes loginRes, String poxyIdPath) {super();this.loginRes = loginRes;this.poxyIdPath = poxyIdPath;}@Overridepublic void run() {try {//获取通道IntHolder poxyId = new IntHolder();poxyId.value = getPoxyId(poxyIdPath);EventChannelHolder eventChannel = new EventChannelHolder();loginRes.getEmsSession().getEventChannel(eventChannel);//ConsumerNotice extends StructuredPushConsumerPOA 为消费者ConsumerNotice consumerNotice = new ConsumerNotice(loginRes);ConsumerAdmin defaultConsumerAdmin = eventChannel.value.default_consumer_admin();//连接通道,如果发现通道已经打开,则先关闭之前的通道(已经打开的通道即使不可以,北向接口并未释放该接口的资源,但是会限制连接通道(数量 < 3))try {if (poxyId.value > 0){log.info("释放旧的消费通道:{}", poxyId.value);ProxySupplier oldSupplier = defaultConsumerAdmin.get_proxy_supplier(poxyId.value);assert (oldSupplier != null);StructuredProxyPushSupplier myOldPoxy = StructuredProxyPushSupplierHelper.narrow(oldSupplier);myOldPoxy.disconnect_structured_push_supplier();}}catch (Exception e) {e.printStackTrace();}ProxySupplier tmpSupplier = defaultConsumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, poxyId);StructuredProxyPushSupplier proxyPushSupplier = StructuredProxyPushSupplierHelper.narrow(tmpSupplier);Object servant = loginRes.getRootPOA().servant_to_reference(consumerNotice);proxyPushSupplier.connect_structured_push_consumer(StructuredPushConsumerHelper.narrow(servant));savePoxyId(poxyIdPath, poxyId.value);log.info("保存此次的消费通道:{}", poxyId.value);loginRes.getOrb().run();} catch (Exception e) {e.printStackTrace();}}/*** @description:获取已经连接的消费通道ID* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:40:57*/public int getPoxyId(String path) {int poxyId = -1;//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库String str = JsonUtils.readStringFromSystemPath(path);if(!ObjectUtils.isEmpty(str)) {poxyId = Integer.parseInt(str);}return poxyId;}/*** @description:保存已经连接的消费通道ID* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 上午10:41:33*/public void savePoxyId(String path, int poxyId) {//备注,这里没有提供JsonUtils,这里你可以改为存储到数据库或者其他地方,这里我是将记录的poxyId 存储到文件中,因为我采集的程序不需要连接数据库JsonUtils.writeStringToSystemPath(path, String.valueOf(poxyId));}
}

3.3.2JsonUtils

为了保证代码完整性,如果你完全抄上面的代码,这里提供了代码需要的两个文件操作示例

public static String readStringFromSystemPath(String path) {String data = "";try {InputStream inputStream = new FileInputStream(path);byte[] bdata = FileCopyUtils.copyToByteArray(inputStream);data = new String(bdata, StandardCharsets.UTF_8);} catch (FileNotFoundException e) {log.info("文件不存在,文件地址:{}", path);} catch (Exception e) {log.info("读取文件失败,文件地址:{},失败原因:{}", path,e.getMessage());} return data;}public static void writeStringToSystemPath(String filePath, String str) {Writer write = null;try {File file = new File(filePath);if(file.exists()) {file.delete();}if (!file.getParentFile().exists()) {file.getParentFile().mkdirs();}if(file.createNewFile()) {write = new OutputStreamWriter(new FileOutputStream(file), StandardCharsets.UTF_8);write.write(str);write.flush();}} catch (Exception e) {e.printStackTrace();} finally {if(write !=null ) {try {write.close();} catch (IOException e) {e.printStackTrace();}}}}

3.4启动订阅

这里我们使用SpringBoot启动的时候启动订阅,即实现ApplicationRunner,然后使用线程池的单线程来启动上面我们编写的线程。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;import com.collect.sdh.module.corba.entity.CorbaLoginRes;
import com.collect.sdh.module.corba.service.BaseCorbaService;/*** @description:启动订阅corba的消费* @author:hutao* @mail:hutao1@epri.sgcc.com.cn* @date:2024年5月7日 下午4:18:16*/
@Component
public class SubscribeRunner implements ApplicationRunner  {@Value(value = "${file-save-path}")private String poxyIdPath;@Autowiredprivate BaseCorbaService baseCorbaService;@Overridepublic void run(ApplicationArguments args) throws Exception {poxyIdPath = poxyIdPath + "poxyId";CorbaLoginRes login = baseCorbaService.login();ExecutorService executor = Executors.newSingleThreadExecutor();executor.submit(new SubscribeNotice(login, poxyIdPath));}
}

4.效果展示

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/856874.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Talk|北京大学张嘉曌:NaVid - 视觉语言导航大模型

本期为TechBeat人工智能社区第602期线上Talk。 北京时间6月20日(周四)20:00&#xff0c;北京大学博士生—张嘉曌的Talk已经准时在TechBeat人工智能社区开播&#xff01; 他与大家分享的主题是: “NaVid - 视觉语言导航大模型”&#xff0c;NaVid是首个专为视觉语言导航&#xf…

深入理解Java并发锁

在Java中&#xff0c;并发锁是用来控制多个线程对共享资源的访问&#xff0c;确保数据的一致性和完整性。Java提供了多种并发锁机制&#xff0c;包括内置锁&#xff08;synchronized&#xff09;、显示锁&#xff08;如ReentrantLock&#xff09;、原子变量、并发容器以及一些高…

计算机考研|20所超高性价比院校,别错过!

这题我太会了&#xff0c;给大家推荐20所性价比非常高的计算机考研院校&#xff01; 985和211都有&#xff0c;这些学校不搞歧视&#xff0c;公平竞争&#xff0c;非常有能力的同学报考。 ✅厦门大学 (985)&#xff1a;不歧视双非&#xff0c;全靠实力&#xff0c;校园环境还…

vscode安装所需插件 个人记录版

vscode安装所需插件 个人记录版 仅做参考 设置

通信系统的最佳线性均衡器(2)---自适应滤波算法

本篇文章是博主在通信等领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对通信等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章分类在通信领域笔记&#xff1a;…

好用的便签是什么 电脑桌面上好用的便签

作为一名文字工作者&#xff0c;我经常需要在繁杂的思绪中捕捉灵感&#xff0c;记录下那些一闪而过的想法。在寻找一款适合电脑桌面的便签应用时&#xff0c;我偶然发现了敬业签便签软件简直是为我量身定制的&#xff0c;它不仅界面简洁&#xff0c;操作便捷&#xff0c;更重要…

`THREE.PointsMaterial` 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性,比如颜色、大小和透明度。

demo案例 THREE.PointsMaterial 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性&#xff0c;比如颜色、大小和透明度。下面是对其构造函数的参数、属性和方法的详细讲解。 构造函数 const material new THREE.PointsMaterial(parameters);参数&am…

阿里AI图片编辑新项目,人人都可做设计师。MimicBrush本地一键整合包下载

最近阿里巴巴联合香港大学开源了一个创新图像编辑工具&#xff1a;MimicBrush&#xff0c;这个工具相当于是一个局部重绘工具。它通过先进的AI技术&#xff0c;能够将一张图片的某一部分融合到另一张图片上。 MimicBrush&#xff0c;一款颠覆传统的图像编辑神器&#xff0c;不过…

深度学习 --- stanford cs231学习笔记四(训练神经网络的几个重要组成部分之一,激活函数)

训练神经网络的几个重要组成部分 一 1&#xff0c;激活函数&#xff08;activation functions&#xff09; 激活函数是神经网络之于线性分类器的最大进步&#xff0c;最大贡献&#xff0c;即&#xff0c;引入了非线性。这些非线性函数可以被分成两大类&#xff0c;饱和非线性函…

一站式家装服务管理系统的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;管理员管理&#xff0c;装修风格管理&#xff0c;主材管理&#xff0c;用户管理&#xff0c;基础数据管理 前台账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;装修风格&#xff0…

Chained Together卡顿严重怎么办 链在一起卡顿频繁的解决方法

链在一起/Chained Together这款游戏特别适合四个人一起玩&#xff0c;游戏里四个玩家将会被锁链“链在一起”&#xff0c;然后一起在各个地图闯关&#xff0c;不仅考验玩家们的默契配合&#xff0c;还考验智慧和勇气。在链在一起中&#xff0c;玩家需要根据地形和岩浆的分布情况…

淘宝订单物流信息获取:详细操作指南

一、前言 淘宝作为中国最大的电商平台之一&#xff0c;其订单管理系统为商家提供了丰富的接口&#xff0c;用于查询、管理和跟踪订单信息。本指南将详细介绍如何通过淘宝订单接口获取物流信息&#xff0c;帮助商家更好地管理订单和提供优质的物流服务。 二、获取物流信息的步…

php框架的文档和社区支持如何?

在选择 php 框架时&#xff0c;文档和社区支持至关重要,拥有全面的文档和庞大活跃的社区&#xff0c;而 symfony 的文档内容丰富但更适合高级开发人员。codeigniter 4 的文档易于理解&#xff0c;社区规模较小但活跃。yii 2 的文档全面深入&#xff0c;但格式可能令初学者困惑。…

牛拜克拉丝的wordpress免费企业模板

wordpress免费企业模板 挺简洁实用的wordpress免费企业模板&#xff0c;黄色模板搭建公司网站。 演示 https://www.wpniu.com/themes/40.html

allegro 打开 brd文件时提示 WARNING(SPMHDB-212) 告警 应该如何解决呢?

WARNING(SPMHDB-212) &#xff1a;This design has functionality disabled due tothe current product plus options selected. The following features aredisabled: Maximum Crosstalk, Maximum Peak Crosstalk. [help] 步骤 按下图 将allegro编辑器为可设计高速线相关的…

Elastisearch集群(单节点)

目录 一、文件下载 二、创建linux es用户 三、上传、解压canal、es、kibana 四、配置es通讯证书&#xff08;生成证书给es配置使用&#xff09; 五、配置elastisearch 六、修改系统配置 七、添加ik分词器支持&#xff08;可选&#xff09; 八、给文件赋值权限 九、设置…

71-TCP协议工作原理及实战

一 服务器端 #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include <QTcpServer> // 专门用于建立TCP连接并传输数据信息 #include <QtNetwork> // 此模块提供开发TCP/IP客户端和服务器的类QT_BEGIN_NAMESPACE namespace Ui { class M…

164万年后的日期解析引发的OOM

名词解释 商家销项发票业务&#xff08;平台给商家开票&#xff09;&#xff0c;是平台提供给商家的工具产品&#xff0c;商家购买了平台的服务&#xff0c;那么平台需要开票给商家。 前言 本文所描述的问题&#xff0c;是应用的OOM引发的接口成功率下跌&#xff0c;排查过程中…

【LLM之NL2SQL】DAIL-SQL论文阅读笔记

研究背景 该研究旨在提供一个全面、系统的评估框架&#xff0c;用于评估基于大型语言模型&#xff08;LLM&#xff09;的Text-to-SQL技术。特别强调了不同的提示工程策略的有效性和效率&#xff0c;以及开源LLM的可行性。研究的重点是评估在零样本和少样本场景下的不同问题表示…

webgis 之 地图投影

地图投影 什么是地图投影目的种类等角投影的分类墨卡托投影Web 墨卡托投影 参考小结 为了更好地展示地球上的数据&#xff0c;需要将地球投影到一个平面上。地图投影是一个数学问题&#xff0c;按照一定的几何关系&#xff0c;将地球上的经纬度坐标映射到一个平面上的坐标。地球…