Parcourir la source

Merge pull request #9327 from JimMoen/fix-disconnect-hook-by-takenover

fix(channel): session takenover & discarded maybe call disconnect hook
Zaiming (Stone) Shi il y a 3 ans
Parent
commit
c19462cb87

+ 7 - 3
apps/emqx/src/emqx_channel.erl

@@ -2098,7 +2098,7 @@ parse_topic_filters(TopicFilters) ->
     lists:map(fun emqx_topic:parse/1, TopicFilters).
     lists:map(fun emqx_topic:parse/1, TopicFilters).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
-%% Ensure disconnected
+%% Maybe & Ensure disconnected
 
 
 ensure_disconnected(
 ensure_disconnected(
     Reason,
     Reason,
@@ -2205,6 +2205,7 @@ shutdown(success, Reply, Packet, Channel) ->
 shutdown(Reason, Reply, Packet, Channel) ->
 shutdown(Reason, Reply, Packet, Channel) ->
     {shutdown, Reason, Reply, Packet, Channel}.
     {shutdown, Reason, Reply, Packet, Channel}.
 
 
+%% mqtt v5 connected sessions
 disconnect_and_shutdown(
 disconnect_and_shutdown(
     Reason,
     Reason,
     Reply,
     Reply,
@@ -2214,9 +2215,12 @@ disconnect_and_shutdown(
 ) when
 ) when
     ConnState =:= connected orelse ConnState =:= reauthenticating
     ConnState =:= connected orelse ConnState =:= reauthenticating
 ->
 ->
-    shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), Channel);
+    NChannel = ensure_disconnected(Reason, Channel),
+    shutdown(Reason, Reply, ?DISCONNECT_PACKET(reason_code(Reason)), NChannel);
+%% mqtt v3/v4 sessions, mqtt v5 other conn_state sessions
 disconnect_and_shutdown(Reason, Reply, Channel) ->
 disconnect_and_shutdown(Reason, Reply, Channel) ->
-    shutdown(Reason, Reply, Channel).
+    NChannel = ensure_disconnected(Reason, Channel),
+    shutdown(Reason, Reply, NChannel).
 
 
 sp(true) -> 1;
 sp(true) -> 1;
 sp(false) -> 0.
 sp(false) -> 0.

+ 1 - 0
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -33,6 +33,7 @@
 all() -> emqx_common_test_helpers:all(?MODULE).
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
+    emqx_common_test_helpers:boot_modules(all),
     emqx_channel_SUITE:set_test_listener_confs(),
     emqx_channel_SUITE:set_test_listener_confs(),
     ?check_trace(
     ?check_trace(
         ?wait_async_action(
         ?wait_async_action(

+ 166 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -91,7 +91,13 @@ groups() ->
             t_sqlparse_new_map,
             t_sqlparse_new_map,
             t_sqlparse_invalid_json
             t_sqlparse_invalid_json
         ]},
         ]},
-        {events, [], [t_events]},
+        {events, [], [
+            t_events,
+            t_event_client_disconnected_normal,
+            t_event_client_disconnected_kicked,
+            t_event_client_disconnected_discarded,
+            t_event_client_disconnected_takenover
+        ]},
         {telemetry, [], [
         {telemetry, [], [
             t_get_basic_usage_info_0,
             t_get_basic_usage_info_0,
             t_get_basic_usage_info_1
             t_get_basic_usage_info_1
@@ -474,6 +480,165 @@ t_events(_Config) ->
     client_connack_failed(),
     client_connack_failed(),
     ok.
     ok.
 
 
+t_event_client_disconnected_normal(_Config) ->
+    SQL =
+        "select * "
+        "from \"$events/client_disconnected\" ",
+    RepubT = <<"repub/to/disconnected/normal">>,
+
+    {ok, TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [republish_action(RepubT, <<>>)]
+        }
+    ),
+
+    {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
+    ct:sleep(200),
+    {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
+    {ok, _} = emqtt:connect(Client1),
+    emqtt:disconnect(Client1),
+
+    receive
+        {publish, #{topic := T, payload := Payload}} ->
+            ?assertEqual(RepubT, T),
+            ?assertMatch(#{<<"reason">> := <<"normal">>}, emqx_json:decode(Payload, [return_maps]))
+    after 1000 ->
+        ct:fail(wait_for_repub_disconnected_normal)
+    end,
+    emqtt:stop(Client),
+
+    delete_rule(TopicRule).
+
+t_event_client_disconnected_kicked(_Config) ->
+    SQL =
+        "select * "
+        "from \"$events/client_disconnected\" ",
+    RepubT = <<"repub/to/disconnected/kicked">>,
+
+    {ok, TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [republish_action(RepubT, <<>>)]
+        }
+    ),
+
+    {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
+    ct:sleep(200),
+
+    {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
+    {ok, _} = emqtt:connect(Client1),
+    %% the process will receive {'EXIT',{shutdown,tcp_closed}}
+    unlink(Client1),
+
+    emqx_cm:kick_session(<<"emqx">>),
+
+    receive
+        {publish, #{topic := T, payload := Payload}} ->
+            ?assertEqual(RepubT, T),
+            ?assertMatch(#{<<"reason">> := <<"kicked">>}, emqx_json:decode(Payload, [return_maps]))
+    after 1000 ->
+        ct:fail(wait_for_repub_disconnected_kicked)
+    end,
+
+    emqtt:stop(Client),
+    delete_rule(TopicRule).
+
+t_event_client_disconnected_discarded(_Config) ->
+    SQL =
+        "select * "
+        "from \"$events/client_disconnected\" ",
+    RepubT = <<"repub/to/disconnected/discarded">>,
+
+    {ok, TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [republish_action(RepubT, <<>>)]
+        }
+    ),
+
+    {ok, Client} = emqtt:start_link([{clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, RepubT, 0),
+    ct:sleep(200),
+
+    {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
+    {ok, _} = emqtt:connect(Client1),
+    %% the process will receive {'EXIT',{shutdown,tcp_closed}}
+    unlink(Client1),
+
+    {ok, Client2} = emqtt:start_link([
+        {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, true}
+    ]),
+    {ok, _} = emqtt:connect(Client2),
+
+    receive
+        {publish, #{topic := T, payload := Payload}} ->
+            ?assertEqual(RepubT, T),
+            ?assertMatch(
+                #{<<"reason">> := <<"discarded">>}, emqx_json:decode(Payload, [return_maps])
+            )
+    after 1000 ->
+        ct:fail(wait_for_repub_disconnected_discarded)
+    end,
+    emqtt:stop(Client),
+    emqtt:stop(Client2),
+
+    delete_rule(TopicRule).
+
+t_event_client_disconnected_takenover(_Config) ->
+    SQL =
+        "select * "
+        "from \"$events/client_disconnected\" ",
+    RepubT = <<"repub/to/disconnected/takenover">>,
+
+    {ok, TopicRule} = emqx_rule_engine:create_rule(
+        #{
+            sql => SQL,
+            id => ?TMP_RULEID,
+            actions => [republish_action(RepubT, <<>>)]
+        }
+    ),
+
+    {ok, ClientRecv} = emqtt:start_link([
+        {clientid, <<"get_repub_client">>}, {username, <<"emqx0">>}
+    ]),
+    {ok, _} = emqtt:connect(ClientRecv),
+    {ok, _, _} = emqtt:subscribe(ClientRecv, RepubT, 0),
+    ct:sleep(200),
+
+    {ok, Client1} = emqtt:start_link([{clientid, <<"emqx">>}, {username, <<"emqx">>}]),
+    {ok, _} = emqtt:connect(Client1),
+    %% the process will receive {'EXIT',{shutdown,tcp_closed}}
+    unlink(Client1),
+
+    {ok, Client2} = emqtt:start_link([
+        {clientid, <<"emqx">>}, {username, <<"emqx">>}, {clean_start, false}
+    ]),
+    {ok, _} = emqtt:connect(Client2),
+
+    receive
+        {publish, #{topic := T, payload := Payload}} ->
+            ?assertEqual(RepubT, T),
+            ?assertMatch(
+                #{<<"reason">> := <<"takenover">>}, emqx_json:decode(Payload, [return_maps])
+            )
+    after 1000 ->
+        ct:fail(wait_for_repub_disconnected_discarded)
+    end,
+
+    emqtt:stop(ClientRecv),
+    emqtt:stop(Client2),
+
+    delete_rule(TopicRule).
+
 client_connack_failed() ->
 client_connack_failed() ->
     {ok, Client} = emqtt:start_link(
     {ok, Client} = emqtt:start_link(
         [
         [