EMQ X Rule Engine (以下简称规则引擎) 用于配置 EMQ X 消息流与设备事件的处理、响应规则。规则引擎不仅提供了清晰、灵活的 "配置式" 的业务集成方案,简化了业务开发流程,提升用户易用性,降低业务系统与 EMQ X 的耦合度;也为 EMQ X 的私有功能定制提供了一个更优秀的基础架构。
EMQ X 在 消息发布或事件触发 时将触发规则引擎,满足触发条件的规则将执行各自的 SQL 语句筛选并处理消息和事件的上下文信息。
消息发布
规则引擎借助响应动作可将特定主题的消息处理结果存储到数据库,发送到 HTTP Server,转发到消息队列 Kafka 或 RabbitMQ,重新发布到新的主题甚至是另一个 Broker 集群中,每个规则可以配置多个响应动作。
选择发布到 t/# 主题的消息,并筛选出全部字段:
SELECT * FROM "t/#"
选择发布到 t/a 主题的消息,并从 JSON 格式的消息内容中筛选出 "x" 字段:
SELECT payload.x as x FROM "t/a"
事件触发
规则引擎使用 $events/ 开头的虚拟主题(事件主题)处理 EMQ X 内置事件,内置事件提供更精细的消息控制和客户端动作处理能力,可用在 QoS1 QoS2 的消息抵达记录、设备上下线记录等业务中。
选择客户端连接事件,筛选 Username 为 ‘emqx‘ 的设备并获取连接信息:
SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = ‘emqx‘
使用 EMQ X 的规则引擎可以灵活地处理消息和事件。使用规则引擎可以方便地实现诸如将消息转换成指定格式,然后存入数据库表,或者发送到消息队列等。
与 EMQ X 规则引擎相关的概念包括: 规则(rule)、动作(action)、资源(resource) 和 资源类型(resource-type)。
规则、动作、资源的关系:
规则: {
SQL 语句,
动作列表: [
{
动作1,
动作参数,
绑定资源: {
资源配置
}
},
{
动作2,
动作参数,
绑定资源: {
资源配置
}
}
]
}
动作和资源类型是由 emqx 或插件的代码提供的,不能通过 API 和 CLI 动态创建。
FROM、SELECT 和 WHERE 子句:
规则引擎的 SQL 语句基本格式为:
SELECT <字段名> FROM <主题> [WHERE <条件>]
FROM
子句将规则挂载到某个主题上SELECT
子句用于对数据进行变换,并选择出感兴趣的字段WHERE
子句用于对 SELECT 选择出来的某个字段施加条件过滤FOREACH、DO 和 INCASE 子句:
如果对于一个数组数据,想针对数组中的每个元素分别执行一些操作并执行 Actions,需要使用 FOREACH-DO-INCASE
语法。其基本格式为:
FOREACH <字段名> [DO <条件>] [INCASE <条件>] FROM <主题> [WHERE <条件>]
FOREACH
子句用于选择需要做 foreach 操作的字段,注意选择出的字段必须为数组类型DO
子句用于对 FOREACH 选择出来的数组中的每个元素进行变换,并选择出感兴趣的字段INCASE
子句用于对 DO 选择出来的某个字段施加条件过滤其中 DO 和 INCASE 子句都是可选的。DO 相当于针对当前循环中对象的 SELECT 子句,而 INCASE 相当于针对当前循环中对象的 WHERE 语句。
SELECT * FROM "t/a"
SELECT * FROM "t/a","t/b"
SELECT * FROM "t/#"
SELECT qos, username, clientid FROM "t/#"
SELECT username FROM "#" WHERE username=‘Steven‘
SELECT payload as p FROM "#" WHERE p.x = 1
SELECT payload as a FROM "#" WHERE a.x.y = 1
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = ‘c1‘
SELECT clientid FROM "$events/session_subscribed" WHERE topic = ‘t/#‘ and qos = 1
SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ ‘t/#‘ and qos = 1
- FROM 子句后面的主题需要用双引号
""
引起来。- WHERE 子句后面接筛选条件,如果使用到字符串需要用单引号
‘‘
引起来。- FROM 子句里如有多个主题,需要用逗号
","
分隔。例如 SELECT * FROM "t/1", "t/2" 。- 可以使用使用
"."
符号对 payload 进行嵌套选择。
假设有 ClientID 为 c_steve
、主题为 t/1
的消息,消息体为 JSON 格式,其中 sensors 字段为包含多个 Object 的数组:
{
"date": "2020-04-24",
"sensors": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}
示例1: 要求将 sensors 里的各个对象,分别作为数据输入重新发布消息到 sensors/${idx}
主题,内容为 ${name}
。即最终规则引擎将会发出 3 条消息:
要完成这个规则,我们需要配置如下动作:
以及如下 SQL 语句:
FOREACH
payload.sensors
FROM "t/#"
示例解析:
这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors,则选取结果为:
[
{
"name": "a",
"idx": 0
},
{
"name": "b",
"idx": 1
},
{
"name": "c",
"idx": 2
}
]
FOREACH 语句将会对于结果数组里的每个对象分别执行 "消息重新发布" 动作,所以将会执行重新发布动作 3 次。
示例2: 要求将 sensors 里的 idx
值大于或等于 1 的对象,分别作为数据输入重新发布消息到 sensors/${idx}
主题,内容为 clientid=${clientid},name=${name},date=${date}
。即最终规则引擎将会发出 2 条消息:
要完成这个规则,我们需要配置如下动作:
以及如下 SQL 语句:
FOREACH
payload.sensors
DO
clientid,
item.name as name,
item.idx as idx
INCASE
item.idx >= 1
FROM "t/#"
示例解析:
这个 SQL 中,FOREACH 子句指定需要进行遍历的数组 sensors
; DO 子句选取每次操作需要的字段,这里我们选了外层的 clientid
字段,以及当前 sensor 对象的 name
和 idx
两个字段,注意 item
代表 sensors 数组中本次循环的对象。INCASE 子句是针对 DO 语句中字段的筛选条件,仅仅当 idx >= 1 满足条件。所以 SQL 的选取结果为:
[
{
"name": "b",
"idx": 1,
"clientid": "c_emqx"
},
{
"name": "c",
"idx": 2,
"clientid": "c_emqx"
}
]
FOREACH 语句将会对于结果数组里的每个对象分别执行 "消息重新发布" 动作,所以将会执行重新发布动作 2 次。
在 DO 和 INCASE 语句里,可以使用 item
访问当前循环的对象,也可以通过在 FOREACH 使用 as
语法自定义一个变量名。所以本例中的 SQL 语句又可以写为:
FOREACH
payload.sensors as s
DO
clientid,
s.name as name,
s.idx as idx
INCASE
s.idx >= 1
FROM "t/#"
示例3: 在示例2 的基础上,去掉 clientid 字段 c_steve
中的 c_
前缀
在 FOREACH 和 DO 语句中可以调用各类 SQL 函数,若要将 c_steve
变为 steve
,则可以把例2 中的 SQL 改为:
FOREACH
payload.sensors as s
DO
nth(2, tokens(clientid,‘_‘)) as clientid,
s.name as name,
s.idx as idx
INCASE
s.idx >= 1
FROM "t/#"
另外,FOREACH 子句中也可以放多个表达式,只要最后一个表达式是指定要遍历的数组即可。比如我们将消息体改一下,sensors 外面多套一层 Object:
{
"date": "2020-04-24",
"data": {
"sensors": [
{"name": "a", "idx":0},
{"name": "b", "idx":1},
{"name": "c", "idx":2}
]
}
}
则 FOREACH 中可以在决定要遍历的数组之前把 data 选取出来:
FOREACH
payload.data as data
data.sensors as s
...
示例1: 将消息中 x 字段的值范围限定在 0~7 之间。
SELECT
CASE WHEN payload.x < 0 THEN 0
WHEN payload.x > 7 THEN 7
ELSE payload.x
END as x
FROM "t/#"
假设消息为:
{"x": 8}
则上面的 SQL 输出为:
{"x": 7}
示例1: 创建一个数组,赋值给变量 a:
SELECT
[1,2,3] as a
FROM
"t/#"
下标从 1 开始,上面的 SQL 输出为:
{
"a": [1, 2, 3]
}
示例2: 从数组中取出第 N 个元素。下标为负数时,表示从数组的右边取:
SELECT
[1,2,3] as a,
a[2] as b,
a[-2] as c
FROM
"t/#"
上面的 SQL 输出为:
{
"b": 2,
"c": 2,
"a": [1, 2, 3]
}
示例3: 从 JSON 格式的 payload 中嵌套的获取值:
SELECT
payload.data[1].id as id
FROM
"t/#"
假设消息为:
{"data": [
{"id": 1, "name": "steve"},
{"id": 2, "name": "bill"}
]}
则上面的 SQL 输出为:
{"id": 1}
示例4: 数组范围(range)操作:
SELECT
[1..5] as a,
a[2..4] as b
FROM
"t/#"
上面的 SQL 输出为:
{
"b": [2, 3, 4],
"a": [1, 2, 3, 4, 5]
}
示例5: 使用下标语法修改数组中的某个元素:
SELECT
payload,
‘STEVE‘ as payload.data[1].name
FROM
"t/#"
假设消息为:
{"data": [
{"id": 1, "name": "steve"},
{"id": 2, "name": "bill"}
]}
则上面的 SQL 输出为:
{
"payload": {
"data": [
{"name": "STEVE", "id": 1},
{"name": "bill", "id": 2}
]
}
}
事件主题名 | 释义 |
---|---|
$events/message_delivered | 消息投递 |
$events/message_acked | 消息确认 |
$events/message_dropped | 消息丢弃 |
$events/client_connected | 连接完成 |
$events/client_disconnected | 连接断开 |
$events/session_subscribed | 订阅 |
$events/session_unsubscribed | 取消订阅 |
SELECT 和 WHERE 子句可用的字段与事件的类型相关。其中 clientid
, username
和 event
是通用字段,每种事件类型都有
event | 事件类型,固定为 "message.publish" |
---|---|
id | MQTT 消息 ID |
clientid | Client ID |
username | 用户名 |
payload | MQTT 消息体 |
peerhost | 客户端的 IPAddress |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
flags | MQTT 消息的 Flags |
headers | MQTT 消息内部与流程处理相关的额外数据 |
timestamp | 事件触发时间 (ms) |
publish_received_at | PUBLISH 消息到达 Broker 的时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "message.delivered" |
---|---|
id | MQTT 消息 ID |
from_clientid | 消息来源 Client ID |
from_username | 消息来源用户名 |
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
payload | MQTT 消息体 |
peerhost | 客户端的 IPAddress |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
flags | MQTT 消息的 Flags |
timestamp | 事件触发时间 (ms) |
publish_received_at | PUBLISH 消息到达 Broker 的时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "message.acked" |
---|---|
id | MQTT 消息 ID |
from_clientid | 消息来源 Client ID |
from_username | 消息来源用户名 |
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
payload | MQTT 消息体 |
peerhost | 客户端的 IPAddress |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
flags | MQTT 消息的 Flags |
timestamp | 事件触发时间 (ms) |
publish_received_at | PUBLISH 消息到达 Broker 的时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "message.dropped" |
---|---|
id | MQTT 消息 ID |
reason | 消息丢弃原因 |
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
payload | MQTT 消息体 |
peerhost | 客户端的 IPAddress |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
flags | MQTT 消息的 Flags |
timestamp | 事件触发时间 (ms) |
publish_received_at | PUBLISH 消息到达 Broker 的时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "client.connected" |
---|---|
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
mountpoint | 主题挂载点(主题前缀) |
peername | 终端的 IPAddress 和 Port |
sockname | emqx 监听的 IPAddress 和 Port |
proto_name | 协议名字 |
proto_ver | 协议版本 |
keepalive | MQTT 保活间隔 |
clean_start | MQTT clean_start |
expiry_interval | MQTT Session 过期时间 |
is_bridge | 是否为 MQTT bridge 连接 |
connected_at | 终端连接完成时间 (s) |
timestamp | 事件触发时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "client.disconnected" |
---|---|
reason | 终端连接断开原因: normal:客户端主动断开 kicked:服务端踢出,通过 REST API keepalive_timeout: keepalive 超时 not_authorized: 认证失败,或者 acl_nomatch = disconnect 时没有权限的 Pub/Sub 会主动断开客户端 tcp_closed: 协议错误 internal_error: 畸形报文解析出错 |
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
peername | 终端的 IPAddress 和 Port |
sockname | emqx 监听的 IPAddress 和 Port |
disconnected_at | 终端连接断开时间 (s) |
timestamp | 事件触发时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "session.subscribed" |
---|---|
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
peerhost | 客户端的 IPAddress |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
timestamp | 事件触发时间 (ms) |
node | 事件触发所在节点 |
event | 事件类型,固定为 "session.unsubscribed" |
---|---|
clientid | 消息目的 Client ID |
username | 消息目的用户名 |
peerhost | 客户端的 IPAddress |
topic | MQTT 主题 |
qos | MQTT 消息的 QoS |
timestamp | 事件触发时间 (ms) |
node | 事件触发所在节点 |
SELECT 语句用于决定最终的输出结果里的字段。比如:
下面 SQL 的输出结果中将只有两个字段 "a" 和 "b":
SELECT a, b FROM "t/#"
WHERE 语句用于对本事件中可用字段,或 SELECT 语句中定义的字段进行条件过滤。比如:
# 选取 username 为 ‘abc‘ 的终端发来的消息,输出结果为所有可用字段:
SELECT * FROM "#" WHERE username = ‘abc‘
## 选取 clientid 为 ‘abc‘ 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意 cid 变量是在 SELECT 语句中定义的,故可在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE cid = ‘abc‘
## 选取 username 为 ‘abc‘ 的终端发来的消息,输出结果将只有 cid 一个字段。
## 注意虽然 SELECT 语句中只选取了 cid 一个字段,所有消息发布事件中的可用字段 (比如 clientid, username 等) 仍然可以在 WHERE 语句中使用:
SELECT clientid as cid FROM "#" WHERE username = ‘abc‘
## 但下面这个 SQL 语句就不能工作了,因为变量 xyz 既不是消息发布事件中的可用字段,又没有在 SELECT 语句中定义:
SELECT clientid as cid FROM "#" WHERE xyz = ‘abc‘
FROM 语句用于选择事件来源。如果是消息发布则填写消息的主题,如果是事件则填写对应的事件主题。
原文:https://www.cnblogs.com/wwjj4811/p/15093269.html