Subscription有两种模式,一种是Reporting,另一种是Sampling。
如果定义为Sampling,则这个Subscription是一个Triggered Item,即被激发的订阅,需要一个定义为Reporting的Subscription(称为Triggering Item)与它连接。这样当Triggering Item更新时,会激发Triggered Item更新。代码如下:
public class TriggeringExample implements UaClient {
// public static void main(String[] args) throws Exception {
// TriggeringExample example = new TriggeringExample();
//
// new ClientExampleRunner(example).run();
// }
private final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicLong clientHandles = new AtomicLong(1L);
@Override
public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
// synchronous connect
client.connect().get();
// create a subscription @ 1000ms 一个订阅可以包含多个监控item
UaSubscription subscription = client.getSubscriptionManager()
.createSubscription(1000.0)
.get();
// subscribe to a static value that reports
ReadValueId readValueId1 = new ReadValueId(
new NodeId(2, "ch1.d1.tag1"),
AttributeId.Value.uid(),
null,
QualifiedName.NULL_VALUE
);
//创建监控item, 第一个为Reporting mode
UaMonitoredItem reportingItem = createMonitoredItem(subscription, readValueId1, MonitoringMode.Reporting);
//创建第二个监控item, 它是Sampling mode,需要第一个项激发
ReadValueId readValueId2 = new ReadValueId(
new NodeId(2, "ch1.d1.tag2"),
AttributeId.Value.uid(),
null,
QualifiedName.NULL_VALUE
);
UaMonitoredItem samplingItem = createMonitoredItem(subscription, readValueId2, MonitoringMode.Sampling);
//将Triggering Item与Triggered item(可以有多个)连接起来(注意这里用了异步模式)
subscription.addTriggeringLinks(reportingItem, newArrayList(samplingItem)).get();
// trigger reporting of both by writing to the static item and changing its value
//VariableNode node = client.getAddressSpace().createVariableNode(
// new NodeId(2, "ch1.d1.tag1"));
//通过改变Triggering Item来激发Triggered item向我发送消息,注意这里writeValue使用了异步模式,如果没有这个get(),只能得到一两次消息更新
for(int i=0; i<10000; i++){
Variant newVal = new Variant(Unsigned.ushort(i));
DataValue va = new DataValue(newVal, null, null);
client.writeValue(new NodeId(2, "ch1.d1.tag1"), va).get();
}
//node.writeValue(va);
// client.writeValue(
// new NodeId(2, "ch1.d1.tag1"),
// va
// ).get();
// let the example run for 5 seconds then terminate
Thread.sleep(500000);
future.complete(client);
}
private UaMonitoredItem createMonitoredItem(
UaSubscription subscription,
ReadValueId readValueId,
MonitoringMode monitoringMode
) throws ExecutionException, InterruptedException {
// important: client handle must be unique per item
UInteger clientHandle = uint(clientHandles.getAndIncrement());
MonitoringParameters parameters = new MonitoringParameters(
clientHandle,
1000.0,
null,
uint(10),
true
);
MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
readValueId,
monitoringMode,
parameters
);
BiConsumer<UaMonitoredItem, Integer> onItemCreated =
(item, id) -> item.setValueConsumer(this::onSubscriptionValue);
List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
newArrayList(request),
onItemCreated
).get();
return items.get(0);
}
private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
logger.info(
"回调:subscription value received: item={}, value={}",
item.getReadValueId().getNodeId(), value.getValue());
}
}
Milo-OPC UA处理Subscription和Triggering
原文:https://www.cnblogs.com/myboat/p/11891148.html