Просмотр исходного кода

Merge pull request #9123 from lafirest/fix/kick_seesion_redispatch

fix(shared_sub): kick session should not cause session message redispatch
lafirest 3 лет назад
Родитель
Сommit
0546cd3d1a
2 измененных файлов с 82 добавлено и 3 удалено
  1. 9 3
      apps/emqx/src/emqx_session.erl
  2. 73 0
      apps/emqx/test/emqx_shared_sub_SUITE.erl

+ 9 - 3
apps/emqx/src/emqx_session.erl

@@ -47,6 +47,7 @@
 -include("emqx_mqtt.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -ifdef(TEST).
 -compile(export_all).
@@ -802,8 +803,7 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
 -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
 terminate(ClientInfo, Reason, Session) ->
     run_terminate_hooks(ClientInfo, Reason, Session),
-    Reason =/= takenover andalso
-        redispatch_shared_messages(Session),
+    maybe_redispatch_shared_messages(Reason, Session),
     ok.
 
 run_terminate_hooks(ClientInfo, discarded, Session) ->
@@ -813,7 +813,13 @@ run_terminate_hooks(ClientInfo, takenover, Session) ->
 run_terminate_hooks(ClientInfo, Reason, Session) ->
     run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
 
-redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
+maybe_redispatch_shared_messages(takenover, _Session) ->
+    ?tp(debug, ignore_redispatch_shared_messages, #{reason => takenover}),
+    ok;
+maybe_redispatch_shared_messages(kicked, _Session) ->
+    ?tp(debug, ignore_redispatch_shared_messages, #{reason => kicked}),
+    ok;
+maybe_redispatch_shared_messages(_Reason, #session{inflight = Inflight, mqueue = Q}) ->
     AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
     F = fun
         ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->

+ 73 - 0
apps/emqx/test/emqx_shared_sub_SUITE.erl

@@ -22,6 +22,7 @@
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -define(SUITE, ?MODULE).
 
@@ -817,6 +818,78 @@ t_dispatch_qos0(Config) when is_list(Config) ->
     emqtt:stop(ConnPid2),
     ok.
 
+t_redispatch_when_kicked({init, Config}) when is_list(Config) ->
+    Config;
+t_redispatch_when_kicked({'end', Config}) when is_list(Config) ->
+    snabbkaffe:stop(),
+    ok;
+t_redispatch_when_kicked(_) ->
+    ok = ensure_config(sticky, true),
+
+    Group = <<"group1">>,
+    Topic = <<"foo/bar">>,
+    ClientId1 = <<"ClientId1">>,
+    ClientId2 = <<"ClientId2">>,
+    Parent = self(),
+
+    %% emqx_cm:kick_session will cause the emqtt to exit abnormally
+    %% so here need to isolate emqtt with a separate process
+    MkSub = fun(ClientId) ->
+        {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}, {auto_ack, false}]),
+        {ok, _} = emqtt:connect(ConnPid),
+        {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {
+            <<"$share/", Group/binary, "/foo/bar">>, 1
+        }),
+
+        Loop = fun(Self) ->
+            receive
+                wait ->
+                    case last_message(<<"hello1">>, [ConnPid], 6000) of
+                        {true, _} ->
+                            Parent ! {got, ClientId};
+                        _ ->
+                            Parent ! nothing
+                    end,
+                    Self(Self);
+                stop ->
+                    stop
+            end
+        end,
+
+        Loop(Loop)
+    end,
+
+    Receive = fun() ->
+        receive
+            {got, _} = Got -> Got;
+            nothing -> nothing
+        after 6000 ->
+            nothing
+        end
+    end,
+
+    Subs = [erlang:spawn(fun() -> MkSub(ClientId) end) || ClientId <- [ClientId1, ClientId2]],
+    timer:sleep(500),
+
+    ok = snabbkaffe:start_trace(),
+
+    Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
+    emqx:publish(Message),
+
+    [Sub ! wait || Sub <- Subs],
+    Res = Receive(),
+    ?assertMatch({got, _}, Res),
+
+    {got, ClientId} = Res,
+    emqx_cm:kick_session(ClientId),
+
+    Trace = snabbkaffe:collect_trace(500),
+    ?assertMatch([#{reason := kicked}], ?of_kind(ignore_redispatch_shared_messages, Trace)),
+
+    [Sub ! stop || Sub <- Subs],
+    snabbkaffe:stop(),
+    ok.
+
 %%--------------------------------------------------------------------
 %% help functions
 %%--------------------------------------------------------------------