Quellcode durchsuchen

Merge pull request #7273 from terry-xiaoyu/rule_test_columns

Add more test columns for events received from MQTT bridge
Xinyu Liu vor 4 Jahren
Ursprung
Commit
09404c555f

+ 9 - 10
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl

@@ -213,16 +213,15 @@ maybe_publish_to_local_broker(#{topic := Topic} = Msg, #{remote_topic := SubTopi
 
 format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
         qos := QoS, retain := Retain, topic := Topic}) ->
-    #{event => '$bridges/mqtt',
-      id => emqx_guid:to_hexstr(emqx_guid:gen()),
-      payload => Payload,
-      topic => Topic,
-      qos => QoS,
-      dup => Dup,
-      retain => Retain,
-      pub_props => printable_maps(Props),
-      timestamp => erlang:system_time(millisecond)
-    }.
+    #{ id => emqx_guid:to_hexstr(emqx_guid:gen())
+     , payload => Payload
+     , topic => Topic
+     , qos => QoS
+     , dup => Dup
+     , retain => Retain
+     , pub_props => printable_maps(Props)
+     , message_received_at => erlang:system_time(millisecond)
+     }.
 
 printable_maps(undefined) -> #{};
 printable_maps(Headers) ->

+ 1 - 1
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -537,7 +537,7 @@ t_ingress_mqtt_bridge_with_rules(_) ->
      } = jsx:decode(Rule1),
     %% we also check if the outputs of the rule is triggered
     ?assertMatch(#{inspect := #{
-        event := '$bridges/mqtt',
+        event := <<"$bridges/mqtt", _/binary>>,
         id := MsgId,
         payload := Payload,
         topic := RemoteTopic,

+ 112 - 38
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -72,12 +72,9 @@ reload() ->
             ok = emqx_rule_engine:load_hooks_for_rule(Rule)
         end, emqx_rule_engine:get_rules()).
 
-load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) ->
-    emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
-        [#{bridge_topic => BridgeTopic}]});
 load(Topic) ->
     HookPoint = event_name(Topic),
-    emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [[]]}).
+    emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}).
 
 unload() ->
     lists:foreach(fun(HookPoint) ->
@@ -91,12 +88,6 @@ unload(Topic) ->
 %%--------------------------------------------------------------------
 %% Callbacks
 %%--------------------------------------------------------------------
-on_bridge_message_received(Message, #{bridge_topic := BridgeTopic}) ->
-    case emqx_rule_engine:get_rules_for_topic(BridgeTopic) of
-        [] -> ok;
-        Rules -> emqx_rule_runtime:apply_rules(Rules, Message)
-    end.
-
 on_message_publish(Message = #message{topic = Topic}, _Env) ->
     case ignore_sys_message(Message) of
         true -> ok;
@@ -108,6 +99,9 @@ on_message_publish(Message = #message{topic = Topic}, _Env) ->
     end,
     {ok, Message}.
 
+on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) ->
+    apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
+
 on_client_connected(ClientInfo, ConnInfo, Env) ->
     apply_event('client.connected',
         fun() -> eventmsg_connected(ClientInfo, ConnInfo) end, Env).
@@ -364,7 +358,9 @@ apply_event(EventName, GenEventMsg, _Env) ->
     EventTopic = event_topic(EventName),
     case emqx_rule_engine:get_rules_for_topic(EventTopic) of
         [] -> ok;
-        Rules -> emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
+        Rules ->
+            %% delay the generating of eventmsg after we have found some rules to apply
+            emqx_rule_runtime:apply_rules(Rules, GenEventMsg())
     end.
 
 %%--------------------------------------------------------------------
@@ -383,6 +379,7 @@ event_info() ->
     , event_info_session_subscribed()
     , event_info_session_unsubscribed()
     , event_info_delivery_dropped()
+    , event_info_bridge_mqtt()
     ].
 
 event_info_message_publish() ->
@@ -449,6 +446,13 @@ event_info_session_unsubscribed() ->
         {<<"session unsubscribed">>, <<"会话取消订阅完成"/utf8>>},
         <<"SELECT * FROM \"$events/session_unsubscribed\" WHERE topic =~ 't/#'">>
     ).
+event_info_bridge_mqtt()->
+    event_info_common(
+        <<"$bridges/mqtt:*">>,
+        {<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>},
+        {<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>},
+        <<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">>
+    ).
 
 event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
     #{event => event_topic(Event),
@@ -460,46 +464,51 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
     }.
 
 test_columns('message.dropped') ->
-    [ {<<"reason">>, <<"no_subscribers">>}
+    [ {<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}
     ] ++ test_columns('message.publish');
 test_columns('message.publish') ->
-    [ {<<"clientid">>, <<"c_emqx">>}
-    , {<<"username">>, <<"u_emqx">>}
-    , {<<"topic">>, <<"t/a">>}
-    , {<<"qos">>, 1}
-    , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
+    [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]}
+    , {<<"username">>, [<<"u_emqx">>, <<"the username of the sender">>]}
+    , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
+    , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
+    , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]}
     ];
 test_columns('delivery.dropped') ->
-    [ {<<"reason">>, <<"queue_full">>}
+    [ {<<"reason">>, [<<"queue_full">>, <<"the reason of dropping">>]}
     ] ++ test_columns('message.delivered');
 test_columns('message.acked') ->
     test_columns('message.delivered');
 test_columns('message.delivered') ->
-    [ {<<"from_clientid">>, <<"c_emqx_1">>}
-    , {<<"from_username">>, <<"u_emqx_1">>}
-    , {<<"clientid">>, <<"c_emqx_2">>}
-    , {<<"username">>, <<"u_emqx_2">>}
-    , {<<"topic">>, <<"t/a">>}
-    , {<<"qos">>, 1}
-    , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
+    [ {<<"from_clientid">>, [<<"c_emqx_1">>, <<"the clientid of the sender">>]}
+    , {<<"from_username">>, [<<"u_emqx_1">>, <<"the username of the sender">>]}
+    , {<<"clientid">>, [<<"c_emqx_2">>, <<"the clientid of the receiver">>]}
+    , {<<"username">>, [<<"u_emqx_2">>, <<"the username of the receiver">>]}
+    , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
+    , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
+    , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]}
     ];
 test_columns('client.connected') ->
-    [ {<<"clientid">>, <<"c_emqx">>}
-    , {<<"username">>, <<"u_emqx">>}
-    , {<<"peername">>, <<"127.0.0.1:52918">>}
+    [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
+    , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
+    , {<<"peername">>, [<<"127.0.0.1:52918">>, <<"the IP address and port of the client">>]}
     ];
 test_columns('client.disconnected') ->
-    [ {<<"clientid">>, <<"c_emqx">>}
-    , {<<"username">>, <<"u_emqx">>}
-    , {<<"reason">>, <<"normal">>}
+    [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
+    , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
+    , {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]}
     ];
 test_columns('session.unsubscribed') ->
     test_columns('session.subscribed');
 test_columns('session.subscribed') ->
-    [ {<<"clientid">>, <<"c_emqx">>}
-    , {<<"username">>, <<"u_emqx">>}
-    , {<<"topic">>, <<"t/a">>}
-    , {<<"qos">>, 1}
+    [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
+    , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
+    , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
+    , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
+    ];
+test_columns(<<"$bridges/mqtt", _/binary>>) ->
+    [ {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
+    , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
+    , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]}
     ].
 
 columns_with_exam('message.publish') ->
@@ -514,12 +523,15 @@ columns_with_exam('message.publish') ->
     , {<<"flags">>, #{}}
     , {<<"headers">>, undefined}
     , {<<"publish_received_at">>, erlang:system_time(millisecond)}
+    , columns_example_props(pub_props)
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ];
 columns_with_exam('message.delivered') ->
     columns_message_ack_delivered('message.delivered');
 columns_with_exam('message.acked') ->
+    [ columns_example_props(puback_props)
+    ] ++
     columns_message_ack_delivered('message.acked');
 columns_with_exam('message.dropped') ->
     [ {<<"event">>, 'message.dropped'}
@@ -533,6 +545,7 @@ columns_with_exam('message.dropped') ->
     , {<<"qos">>, 1}
     , {<<"flags">>, #{}}
     , {<<"publish_received_at">>, erlang:system_time(millisecond)}
+    , columns_example_props(pub_props)
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ];
@@ -549,6 +562,7 @@ columns_with_exam('delivery.dropped') ->
     , {<<"topic">>, <<"t/a">>}
     , {<<"qos">>, 1}
     , {<<"flags">>, #{}}
+    , columns_example_props(pub_props)
     , {<<"publish_received_at">>, erlang:system_time(millisecond)}
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
@@ -566,6 +580,7 @@ columns_with_exam('client.connected') ->
     , {<<"clean_start">>, true}
     , {<<"expiry_interval">>, 3600}
     , {<<"is_bridge">>, false}
+    , columns_example_props(conn_props)
     , {<<"connected_at">>, erlang:system_time(millisecond)}
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
@@ -577,14 +592,33 @@ columns_with_exam('client.disconnected') ->
     , {<<"username">>, <<"u_emqx">>}
     , {<<"peername">>, <<"192.168.0.10:56431">>}
     , {<<"sockname">>, <<"0.0.0.0:1883">>}
+    , columns_example_props(disconn_props)
     , {<<"disconnected_at">>, erlang:system_time(millisecond)}
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ];
 columns_with_exam('session.subscribed') ->
-    columns_message_sub_unsub('session.subscribed');
+    [ columns_example_props(sub_props)
+    ] ++ columns_message_sub_unsub('session.subscribed');
 columns_with_exam('session.unsubscribed') ->
-    columns_message_sub_unsub('session.unsubscribed').
+    [ columns_example_props(unsub_props)
+    ] ++ columns_message_sub_unsub('session.unsubscribed');
+columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) ->
+    [ {<<"event">>, EventName}
+    , {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
+    , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
+    , {<<"peerhost">>, <<"192.168.0.10">>}
+    , {<<"topic">>, <<"t/a">>}
+    , {<<"qos">>, 1}
+    , {<<"dup">>, false}
+    , {<<"retain">>, false}
+    , columns_example_props(pub_props)
+    %% the time that we receiced the message from remote broker
+    , {<<"message_received_at">>, erlang:system_time(millisecond)}
+    %% the time that the rule is triggered
+    , {<<"timestamp">>, erlang:system_time(millisecond)}
+    , {<<"node">>, node()}
+    ].
 
 columns_message_sub_unsub(EventName) ->
     [ {<<"event">>, EventName}
@@ -610,14 +644,48 @@ columns_message_ack_delivered(EventName) ->
     , {<<"qos">>, 1}
     , {<<"flags">>, #{}}
     , {<<"publish_received_at">>, erlang:system_time(millisecond)}
+    , columns_example_props(pub_props)
     , {<<"timestamp">>, erlang:system_time(millisecond)}
     , {<<"node">>, node()}
     ].
 
+columns_example_props(PropType) ->
+    Props = columns_example_props_specific(PropType),
+    UserProps = #{
+        'User-Property' => #{<<"foo">> => <<"bar">>},
+        'User-Property-Pairs' => [
+            #{key => <<"foo">>}, #{value => <<"bar">>}
+        ]
+    },
+    {PropType, maps:merge(Props, UserProps)}.
+
+columns_example_props_specific(pub_props) ->
+    #{ 'Payload-Format-Indicator' => 0
+     , 'Message-Expiry-Interval' => 30
+     };
+columns_example_props_specific(puback_props) ->
+    #{ 'Reason-String' => <<"OK">>
+     };
+columns_example_props_specific(conn_props) ->
+    #{ 'Session-Expiry-Interval' => 7200
+     , 'Receive-Maximum' => 32
+     };
+columns_example_props_specific(disconn_props) ->
+    #{ 'Session-Expiry-Interval' => 7200
+     , 'Reason-String' => <<"Redirect to another server">>
+     , 'Server Reference' => <<"192.168.22.129">>
+     };
+columns_example_props_specific(sub_props) ->
+    #{};
+columns_example_props_specific(unsub_props) ->
+    #{}.
+
 %%--------------------------------------------------------------------
 %% Helper functions
 %%--------------------------------------------------------------------
 
+hook_fun(<<"$bridges/", _/binary>>) ->
+    on_bridge_message_received;
 hook_fun(Event) ->
     case string:split(atom_to_list(Event), ".") of
         [Prefix, Name] ->
@@ -646,6 +714,7 @@ event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
 event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
 event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
 event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
+event_name(<<"$bridges/", _/binary>> = Topic) -> Topic;
 event_name(_) -> 'message.publish'.
 
 event_topic('client.connected') -> <<"$events/client_connected">>;
@@ -656,7 +725,8 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
 event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
-event_topic('message.publish') -> <<"$events/message_publish">>.
+event_topic('message.publish') -> <<"$events/message_publish">>;
+event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic.
 
 printable_maps(undefined) -> #{};
 printable_maps(Headers) ->
@@ -665,6 +735,10 @@ printable_maps(Headers) ->
                 AccIn#{K => ntoa(V0)};
             ('User-Property', V0, AccIn) when is_list(V0) ->
                 AccIn#{
+                    %% The 'User-Property' field is for the convenience of querying properties
+                    %% using the '.' syntax, e.g. "SELECT 'User-Property'.foo as foo"
+                    %% However, this does not allow duplicate property keys. To allow
+                    %% duplicate keys, we have to use the 'User-Property-Pairs' field instead.
                     'User-Property' => maps:from_list(V0),
                     'User-Property-Pairs' => [#{
                         key => Key,

+ 4 - 16
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -69,6 +69,7 @@ get_selected_data(Selected, _Envs, _Args) ->
     Selected.
 
 is_publish_topic(<<"$events/", _/binary>>) -> false;
+is_publish_topic(<<"$bridges/", _/binary>>) -> false;
 is_publish_topic(_Topic) -> true.
 
 flatten([]) -> [];
@@ -83,21 +84,8 @@ echo_action(Data, Envs) ->
 fill_default_values(Event, Context) ->
     maps:merge(envs_examp(Event), Context).
 
-envs_examp(<<"$events/", _/binary>> = EVENT_TOPIC) ->
-    EventName = emqx_rule_events:event_name(EVENT_TOPIC),
+envs_examp(EventTopic) ->
+    EventName = emqx_rule_events:event_name(EventTopic),
     emqx_rule_maps:atom_key_map(
         maps:from_list(
-            emqx_rule_events:columns_with_exam(EventName)));
-envs_examp(_) ->
-    #{id => emqx_guid:to_hexstr(emqx_guid:gen()),
-      clientid => <<"c_emqx">>,
-      username => <<"u_emqx">>,
-      payload => <<"{\"id\": 1, \"name\": \"ha\"}">>,
-      peerhost => <<"127.0.0.1">>,
-      topic => <<"t/a">>,
-      qos => 1,
-      flags => #{sys => true, event => true},
-      publish_received_at => emqx_plugin_libs_rule:now_ms(),
-      timestamp => emqx_plugin_libs_rule:now_ms(),
-      node => node()
-    }.
+            emqx_rule_events:columns_with_exam(EventName))).