|
@@ -72,12 +72,9 @@ reload() ->
|
|
|
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
|
|
ok = emqx_rule_engine:load_hooks_for_rule(Rule)
|
|
|
end, emqx_rule_engine:get_rules()).
|
|
end, emqx_rule_engine:get_rules()).
|
|
|
|
|
|
|
|
-load(<<"$bridges/", _BridgeId/binary>> = BridgeTopic) ->
|
|
|
|
|
- emqx_hooks:put(BridgeTopic, {?MODULE, on_bridge_message_received,
|
|
|
|
|
- [#{bridge_event_name => BridgeTopic}]});
|
|
|
|
|
load(Topic) ->
|
|
load(Topic) ->
|
|
|
HookPoint = event_name(Topic),
|
|
HookPoint = event_name(Topic),
|
|
|
- emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{}]}).
|
|
|
|
|
|
|
+ emqx_hooks:put(HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}).
|
|
|
|
|
|
|
|
unload() ->
|
|
unload() ->
|
|
|
lists:foreach(fun(HookPoint) ->
|
|
lists:foreach(fun(HookPoint) ->
|
|
@@ -102,7 +99,7 @@ on_message_publish(Message = #message{topic = Topic}, _Env) ->
|
|
|
end,
|
|
end,
|
|
|
{ok, Message}.
|
|
{ok, Message}.
|
|
|
|
|
|
|
|
-on_bridge_message_received(Message, Env = #{bridge_event_name := BridgeTopic}) ->
|
|
|
|
|
|
|
+on_bridge_message_received(Message, Env = #{event_topic := BridgeTopic}) ->
|
|
|
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
|
|
apply_event(BridgeTopic, fun() -> with_basic_columns(BridgeTopic, Message) end, Env).
|
|
|
|
|
|
|
|
on_client_connected(ClientInfo, ConnInfo, Env) ->
|
|
on_client_connected(ClientInfo, ConnInfo, Env) ->
|
|
@@ -451,7 +448,7 @@ event_info_session_unsubscribed() ->
|
|
|
).
|
|
).
|
|
|
event_info_bridge_mqtt()->
|
|
event_info_bridge_mqtt()->
|
|
|
event_info_common(
|
|
event_info_common(
|
|
|
- <<"$bridges/mqtt">>,
|
|
|
|
|
|
|
+ <<"$bridges/mqtt:my_mqtt_bridge">>,
|
|
|
{<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>},
|
|
{<<"MQTT bridge message">>, <<"MQTT 桥接消息"/utf8>>},
|
|
|
{<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>},
|
|
{<<"received a message from MQTT bridge">>, <<"收到来自 MQTT 桥接的消息"/utf8>>},
|
|
|
<<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">>
|
|
<<"SELECT * FROM \"$bridges/mqtt:my_mqtt_bridge\" WHERE topic =~ 't/#'">>
|
|
@@ -467,51 +464,51 @@ event_info_common(Event, {TitleEN, TitleZH}, {DescrEN, DescrZH}, SqlExam) ->
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
test_columns('message.dropped') ->
|
|
test_columns('message.dropped') ->
|
|
|
- [ {<<"reason">>, <<"no_subscribers">>}
|
|
|
|
|
|
|
+ [ {<<"reason">>, [<<"no_subscribers">>, <<"the reason of dropping">>]}
|
|
|
] ++ test_columns('message.publish');
|
|
] ++ test_columns('message.publish');
|
|
|
test_columns('message.publish') ->
|
|
test_columns('message.publish') ->
|
|
|
- [ {<<"clientid">>, <<"c_emqx">>}
|
|
|
|
|
- , {<<"username">>, <<"u_emqx">>}
|
|
|
|
|
- , {<<"topic">>, <<"t/a">>}
|
|
|
|
|
- , {<<"qos">>, 1}
|
|
|
|
|
- , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
|
|
|
|
+ [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid of the sender">>]}
|
|
|
|
|
+ , {<<"username">>, [<<"u_emqx">>, <<"the username of the sender">>]}
|
|
|
|
|
+ , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
|
|
|
|
|
+ , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
|
|
|
|
|
+ , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]}
|
|
|
];
|
|
];
|
|
|
test_columns('delivery.dropped') ->
|
|
test_columns('delivery.dropped') ->
|
|
|
- [ {<<"reason">>, <<"queue_full">>}
|
|
|
|
|
|
|
+ [ {<<"reason">>, [<<"queue_full">>, <<"the reason of dropping">>]}
|
|
|
] ++ test_columns('message.delivered');
|
|
] ++ test_columns('message.delivered');
|
|
|
test_columns('message.acked') ->
|
|
test_columns('message.acked') ->
|
|
|
test_columns('message.delivered');
|
|
test_columns('message.delivered');
|
|
|
test_columns('message.delivered') ->
|
|
test_columns('message.delivered') ->
|
|
|
- [ {<<"from_clientid">>, <<"c_emqx_1">>}
|
|
|
|
|
- , {<<"from_username">>, <<"u_emqx_1">>}
|
|
|
|
|
- , {<<"clientid">>, <<"c_emqx_2">>}
|
|
|
|
|
- , {<<"username">>, <<"u_emqx_2">>}
|
|
|
|
|
- , {<<"topic">>, <<"t/a">>}
|
|
|
|
|
- , {<<"qos">>, 1}
|
|
|
|
|
- , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
|
|
|
|
+ [ {<<"from_clientid">>, [<<"c_emqx_1">>, <<"the clientid of the sender">>]}
|
|
|
|
|
+ , {<<"from_username">>, [<<"u_emqx_1">>, <<"the username of the sender">>]}
|
|
|
|
|
+ , {<<"clientid">>, [<<"c_emqx_2">>, <<"the clientid of the receiver">>]}
|
|
|
|
|
+ , {<<"username">>, [<<"u_emqx_2">>, <<"the username of the receiver">>]}
|
|
|
|
|
+ , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
|
|
|
|
|
+ , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
|
|
|
|
|
+ , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]}
|
|
|
];
|
|
];
|
|
|
test_columns('client.connected') ->
|
|
test_columns('client.connected') ->
|
|
|
- [ {<<"clientid">>, <<"c_emqx">>}
|
|
|
|
|
- , {<<"username">>, <<"u_emqx">>}
|
|
|
|
|
- , {<<"peername">>, <<"127.0.0.1:52918">>}
|
|
|
|
|
|
|
+ [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
|
|
|
|
|
+ , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
|
|
|
|
|
+ , {<<"peername">>, [<<"127.0.0.1:52918">>, <<"the IP address and port of the client">>]}
|
|
|
];
|
|
];
|
|
|
test_columns('client.disconnected') ->
|
|
test_columns('client.disconnected') ->
|
|
|
- [ {<<"clientid">>, <<"c_emqx">>}
|
|
|
|
|
- , {<<"username">>, <<"u_emqx">>}
|
|
|
|
|
- , {<<"reason">>, <<"normal">>}
|
|
|
|
|
|
|
+ [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
|
|
|
|
|
+ , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
|
|
|
|
|
+ , {<<"reason">>, [<<"normal">>, <<"the reason for shutdown">>]}
|
|
|
];
|
|
];
|
|
|
test_columns('session.unsubscribed') ->
|
|
test_columns('session.unsubscribed') ->
|
|
|
test_columns('session.subscribed');
|
|
test_columns('session.subscribed');
|
|
|
test_columns('session.subscribed') ->
|
|
test_columns('session.subscribed') ->
|
|
|
- [ {<<"clientid">>, <<"c_emqx">>}
|
|
|
|
|
- , {<<"username">>, <<"u_emqx">>}
|
|
|
|
|
- , {<<"topic">>, <<"t/a">>}
|
|
|
|
|
- , {<<"qos">>, 1}
|
|
|
|
|
|
|
+ [ {<<"clientid">>, [<<"c_emqx">>, <<"the clientid if the client">>]}
|
|
|
|
|
+ , {<<"username">>, [<<"u_emqx">>, <<"the username if the client">>]}
|
|
|
|
|
+ , {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
|
|
|
|
|
+ , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
|
|
|
];
|
|
];
|
|
|
-test_columns(<<"$bridges/mqtt">>) ->
|
|
|
|
|
- [ {<<"topic">>, <<"t/a">>}
|
|
|
|
|
- , {<<"qos">>, 1}
|
|
|
|
|
- , {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
|
|
|
|
+test_columns(<<"$bridges/mqtt", _/binary>>) ->
|
|
|
|
|
+ [ {<<"topic">>, [<<"t/a">>, <<"the topic of the MQTT message">>]}
|
|
|
|
|
+ , {<<"qos">>, [1, <<"the QoS of the MQTT message">>]}
|
|
|
|
|
+ , {<<"payload">>, [<<"{\"msg\": \"hello\"}">>, <<"the payload of the MQTT message">>]}
|
|
|
].
|
|
].
|
|
|
|
|
|
|
|
columns_with_exam('message.publish') ->
|
|
columns_with_exam('message.publish') ->
|
|
@@ -606,8 +603,8 @@ columns_with_exam('session.subscribed') ->
|
|
|
columns_with_exam('session.unsubscribed') ->
|
|
columns_with_exam('session.unsubscribed') ->
|
|
|
[ columns_example_props(unsub_props)
|
|
[ columns_example_props(unsub_props)
|
|
|
] ++ columns_message_sub_unsub('session.unsubscribed');
|
|
] ++ columns_message_sub_unsub('session.unsubscribed');
|
|
|
-columns_with_exam(<<"$bridges/mqtt">>) ->
|
|
|
|
|
- [ {<<"event">>, <<"$bridges/mqtt">>}
|
|
|
|
|
|
|
+columns_with_exam(<<"$bridges/mqtt", _/binary>> = EventName) ->
|
|
|
|
|
+ [ {<<"event">>, EventName}
|
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
, {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())}
|
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
, {<<"payload">>, <<"{\"msg\": \"hello\"}">>}
|
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
, {<<"peerhost">>, <<"192.168.0.10">>}
|
|
@@ -687,6 +684,8 @@ columns_example_props_specific(unsub_props) ->
|
|
|
%% Helper functions
|
|
%% Helper functions
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
+hook_fun(<<"$bridges/", _/binary>>) ->
|
|
|
|
|
+ on_bridge_message_received;
|
|
|
hook_fun(Event) ->
|
|
hook_fun(Event) ->
|
|
|
case string:split(atom_to_list(Event), ".") of
|
|
case string:split(atom_to_list(Event), ".") of
|
|
|
[Prefix, Name] ->
|
|
[Prefix, Name] ->
|