Browse Source

feat(event-topic): add more test cases

Turtle 4 năm trước cách đây
mục cha
commit
459d2154c7

+ 8 - 0
apps/emqx_modules/include/emqx_modules.hrl

@@ -3,3 +3,11 @@
 
 %% Interval for reporting telemetry data, Default: 7d
 -define(REPORT_INTERVAR, 604800).
+
+-define(BASE_TOPICS, [<<"$event/client_connected">>,
+                      <<"$event/client_disconnected">>,
+                      <<"$event/session_subscribed">>,
+                      <<"$event/session_unsubscribed">>,
+                      <<"$event/message_delivered">>,
+                      <<"$event/message_acked">>,
+                      <<"$event/message_dropped">>]).

+ 58 - 22
apps/emqx_modules/src/emqx_event_topic.erl

@@ -18,6 +18,7 @@
 
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include("emqx_modules.hrl").
 
 -export([ enable/0
         , disable/0
@@ -37,22 +38,50 @@
 -endif.
 
 enable() ->
-    emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}),
-    emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}),
-    emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []}),
-    emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []}),
-    emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}),
-    emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []}),
-    emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}).
+    Topics = emqx_config:get([event_topic, topics], []),
+    lists:foreach(fun(Topic) ->
+        case Topic of
+            <<"$event/client_connected">> ->
+                emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []});
+            <<"$event/client_disconnected">> ->
+                emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []});
+            <<"$event/session_subscribed">> ->
+                emqx_hooks:put('session.subscribed', {?MODULE, on_session_subscribed, []});
+            <<"$event/session_unsubscribed">> ->
+                emqx_hooks:put('session.unsubscribed', {?MODULE, on_session_unsubscribed, []});
+            <<"$event/message_delivered">> ->
+                emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []});
+            <<"$event/message_acked">> ->
+                emqx_hooks:put('message.acked', {?MODULE, on_message_acked, []});
+            <<"$event/message_dropped">> ->
+                emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []});
+            _ ->
+                ok
+        end
+    end, Topics).
 
 disable() ->
-    emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
-    emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}),
-    emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed}),
-    emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed}),
-    emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}),
-    emqx_hooks:del('message.acked', {?MODULE, on_message_acked}),
-    emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}).
+    Topics = emqx_config:get([event_topic, topics], []),
+    lists:foreach(fun(Topic) ->
+        case Topic of
+            <<"$event/client_connected">> ->
+                emqx_hooks:del('client.connected', {?MODULE, on_client_connected});
+            <<"$event/client_disconnected">> ->
+                emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected});
+            <<"$event/session_subscribed">> ->
+                emqx_hooks:del('session.subscribed', {?MODULE, on_session_subscribed});
+            <<"$event/session_unsubscribed">> ->
+                emqx_hooks:del('session.unsubscribed', {?MODULE, session_unsubscribed});
+            <<"$event/message_delivered">> ->
+                emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered});
+            <<"$event/message_acked">> ->
+                emqx_hooks:del('message.acked', {?MODULE, on_message_acked});
+            <<"$event/message_dropped">> ->
+                emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped});
+            _ ->
+                ok
+        end
+    end, ?BASE_TOPICS -- Topics).
 
 %%--------------------------------------------------------------------
 %% Callbacks
@@ -62,7 +91,8 @@ on_client_connected(ClientInfo, ConnInfo) ->
     Payload0 = connected_payload(ClientInfo, ConnInfo),
     emqx_broker:safe_publish(
               make_msg(<<"$event/client_connected">>,
-                       emqx_json:encode(Payload0))).
+                       emqx_json:encode(Payload0))),
+    ok.
 
 on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
                        Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) ->
@@ -73,8 +103,9 @@ on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Usernam
                  ts => erlang:system_time(millisecond)
                 },
     emqx_broker:safe_publish(
-              make_msg(<<"$event/client_connected">>,
-                       emqx_json:encode(Payload0))).
+              make_msg(<<"$event/client_disconnected">>,
+                       emqx_json:encode(Payload0))),
+    ok.
 
 on_session_subscribed(_ClientInfo = #{clientid := ClientId,
                                       username := Username},
@@ -87,7 +118,8 @@ on_session_subscribed(_ClientInfo = #{clientid := ClientId,
                 },
     emqx_broker:safe_publish(
               make_msg(<<"$event/session_subscribed">>,
-                       emqx_json:encode(Payload0))).
+                       emqx_json:encode(Payload0))),
+    ok.
 
 on_session_unsubscribed(_ClientInfo = #{clientid := ClientId,
                                         username := Username},
@@ -99,7 +131,8 @@ on_session_unsubscribed(_ClientInfo = #{clientid := ClientId,
                 },
     emqx_broker:safe_publish(
               make_msg(<<"$event/session_unsubscribed">>,
-                       emqx_json:encode(Payload0))).
+                       emqx_json:encode(Payload0))),
+    ok.
 
 on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
     case ignore_sys_message(Message) of
@@ -113,7 +146,8 @@ on_message_dropped(Message = #message{from = ClientId}, _, Reason) ->
                 peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined))
             },
             emqx_broker:safe_publish(
-                make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1)))
+                make_msg(<<"$event/message_dropped">>, emqx_json:encode(Payload1))),
+            ok
     end,
     {ok, Message}.
 
@@ -134,7 +168,8 @@ on_message_delivered(_ClientInfo = #{
                 peerhost => ntoa(PeerHost)
             },
             emqx_broker:safe_publish(
-                make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1)))
+                make_msg(<<"$event/message_delivered">>, emqx_json:encode(Payload1))),
+            ok
     end,
     {ok, Message}.
 
@@ -155,7 +190,8 @@ on_message_acked(_ClientInfo = #{
                 peerhost => ntoa(PeerHost)
             },
             emqx_broker:safe_publish(
-                make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1)))
+                make_msg(<<"$event/message_acked">>, emqx_json:encode(Payload1))),
+            ok
     end,
     {ok, Message}.
 

+ 143 - 0
apps/emqx_modules/test/emqx_event_topic_SUITE.erl

@@ -0,0 +1,143 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_event_topic_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_ct_helpers:boot_modules(all),
+    emqx_ct_helpers:start_apps([emqx_modules]),
+    meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(emqx_schema, includes, fun() -> ["event_topic"] end ),
+    meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_modules_schema:fields(FieldName) end),
+    ok = emqx_config:update([event_topic, topics], [<<"$event/client_connected">>,
+                                                    <<"$event/client_disconnected">>,
+                                                    <<"$event/session_subscribed">>,
+                                                    <<"$event/session_unsubscribed">>,
+                                                    <<"$event/message_delivered">>,
+                                                    <<"$event/message_acked">>,
+                                                    <<"$event/message_dropped">>]),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_helpers:stop_apps([emqx_modules]),
+    meck:unload(emqx_schema).
+
+t_event_topic(_) ->
+    ok = emqx_event_topic:enable(),
+    {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
+    {ok, _} = emqtt:connect(C1),
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_connected">>, qos1),
+    {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>},
+                                 {username, <<"username">>}]),
+    {ok, _} = emqtt:connect(C2),
+    ok = recv_connected(<<"clientid">>),
+
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_subscribed">>, qos1),
+    _ = receive_publish(100),
+    timer:sleep(50),
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C2, <<"test_sub">>, qos1),
+    ok = recv_subscribed(<<"clientid">>),
+    emqtt:unsubscribe(C1, <<"$event/session_subscribed">>),
+    timer:sleep(50),
+
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_delivered">>, qos1),
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_acked">>, qos1),
+    _ = emqx:publish(emqx_message:make(<<"test">>, ?QOS_1, <<"test_sub">>, <<"test">>)),
+    recv_message_publish(<<"clientid">>),
+    recv_message_delivered(<<"clientid">>),
+    recv_message_acked(<<"clientid">>),
+
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/message_dropped">>, qos1),
+    ok= emqtt:publish(C2, <<"test_sub1">>, <<"test">>),
+    recv_message_dropped(<<"clientid">>),
+
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/session_unsubscribed">>, qos1),
+    _ = emqtt:unsubscribe(C2, <<"test_sub">>),
+    ok = recv_unsubscribed(<<"clientid">>),
+
+    {ok, _, [?QOS_1]} = emqtt:subscribe(C1, <<"$event/client_disconnected">>, qos1),
+    ok = emqtt:disconnect(C2),
+    ok = recv_disconnected(<<"clientid">>),
+    ok = emqtt:disconnect(C1),
+    ok = emqx_event_topic:disable().
+
+t_reason(_) ->
+    ?assertEqual(normal, emqx_event_topic:reason(normal)),
+    ?assertEqual(discarded, emqx_event_topic:reason({shutdown, discarded})),
+    ?assertEqual(tcp_error, emqx_event_topic:reason({tcp_error, einval})),
+    ?assertEqual(internal_error, emqx_event_topic:reason(<<"unknown error">>)).
+
+recv_connected(ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
+    ?assertMatch(<<"$event/client_connected">>, Topic),
+    ?assertMatch(#{<<"clientid">> := ClientId,
+                           <<"username">> := <<"username">>,
+                           <<"ipaddress">> := <<"127.0.0.1">>,
+                           <<"proto_name">> := <<"MQTT">>,
+                           <<"proto_ver">> := ?MQTT_PROTO_V4,
+                           <<"connack">> := ?RC_SUCCESS,
+                           <<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps])).
+
+recv_subscribed(_ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
+    ?assertMatch(<<"$event/session_subscribed">>, Topic).
+
+recv_message_dropped(_ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
+    ?assertMatch(<<"$event/message_dropped">>, Topic).
+
+recv_message_delivered(_ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
+    ?assertMatch(<<"$event/message_delivered">>, Topic).
+
+recv_message_publish(_ClientId) ->
+    {ok, #{qos := ?QOS_1, topic := Topic}} = receive_publish(100),
+    ?assertMatch(<<"test_sub">>, Topic).
+
+recv_message_acked(_ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
+    ?assertMatch(<<"$event/message_acked">>, Topic).
+
+recv_unsubscribed(_ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic}} = receive_publish(100),
+    ?assertMatch(<<"$event/session_unsubscribed">>, Topic).
+
+recv_disconnected(ClientId) ->
+    {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
+    ?assertMatch(<<"$event/client_disconnected">>, Topic),
+    ?assertMatch(#{<<"clientid">> := ClientId,
+                           <<"username">> := <<"username">>,
+                           <<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps])).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+receive_publish(Timeout) ->
+    receive
+        {publish, Publish} ->
+            {ok, Publish}
+    after
+        Timeout -> {error, timeout}
+    end.

+ 0 - 85
apps/emqx_modules/test/emqx_presence_SUITE.erl

@@ -1,85 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_presence_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("emqx/include/emqx_mqtt.hrl").
--include_lib("eunit/include/eunit.hrl").
-
-all() -> emqx_ct:all(?MODULE).
-
-init_per_suite(Config) ->
-    emqx_ct_helpers:boot_modules(all),
-    emqx_ct_helpers:start_apps([emqx_modules]),
-    Config.
-
-end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([emqx_modules]).
-
-t_mod_presence(_) ->
-    ok = emqx_presence:enable(),
-    {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
-    {ok, _} = emqtt:connect(C1),
-    {ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
-    %% Connected Presence
-    {ok, C2} = emqtt:start_link([{clientid, <<"clientid">>},
-                                 {username, <<"username">>}]),
-    {ok, _} = emqtt:connect(C2),
-    ok = recv_and_check_presence(<<"clientid">>, <<"connected">>),
-    %% Disconnected Presence
-    ok = emqtt:disconnect(C2),
-    ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
-    ok = emqtt:disconnect(C1),
-    ok = emqx_presence:disable().
-
-t_mod_presence_reason(_) ->
-    ?assertEqual(normal, emqx_presence:reason(normal)),
-    ?assertEqual(discarded, emqx_presence:reason({shutdown, discarded})),
-    ?assertEqual(tcp_error, emqx_presence:reason({tcp_error, einval})),
-    ?assertEqual(internal_error, emqx_presence:reason(<<"unknown error">>)).
-
-recv_and_check_presence(ClientId, Presence) ->
-    {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
-    ?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence],
-                 binary:split(Topic, <<"/">>, [global])),
-    case Presence of
-        <<"connected">> ->
-            ?assertMatch(#{<<"clientid">> := <<"clientid">>,
-                           <<"username">> := <<"username">>,
-                           <<"ipaddress">> := <<"127.0.0.1">>,
-                           <<"proto_name">> := <<"MQTT">>,
-                           <<"proto_ver">> := ?MQTT_PROTO_V4,
-                           <<"connack">> := ?RC_SUCCESS,
-                           <<"clean_start">> := true}, emqx_json:decode(Payload, [return_maps]));
-        <<"disconnected">> ->
-            ?assertMatch(#{<<"clientid">> := <<"clientid">>,
-                           <<"username">> := <<"username">>,
-                           <<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
-    end.
-
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
-
-receive_publish(Timeout) ->
-    receive
-        {publish, Publish} -> {ok, Publish}
-    after
-        Timeout -> {error, timeout}
-    end.