Просмотр исходного кода

Merge pull request #6036 from emqx/persistent-session-fixes

Persistent session fixes
Tobias Lindahl 4 лет назад
Родитель
Сommit
62f9f6fd91

+ 4 - 0
apps/emqx/etc/emqx_cloud/vm.args

@@ -116,3 +116,7 @@
 
 ## patches dir
 -pa {{ platform_data_dir }}/patches
+
+## Mnesia thresholds
+-mnesia dump_log_write_threshold 5000
+-mnesia dump_log_time_threshold 60000

+ 4 - 0
apps/emqx/etc/emqx_edge/vm.args

@@ -114,3 +114,7 @@
 
 ## patches dir
 -pa {{ platform_data_dir }}/patches
+
+## Mnesia thresholds
+-mnesia dump_log_write_threshold 5000
+-mnesia dump_log_time_threshold 60000

+ 13 - 6
apps/emqx/src/emqx_channel.erl

@@ -1179,20 +1179,27 @@ terminate(_, #channel{conn_state = idle}) -> ok;
 terminate(normal, Channel) ->
     run_terminate_hook(normal, Channel);
 terminate({shutdown, kicked}, Channel) ->
-    _ = emqx_persistent_session:persist(Channel#channel.clientinfo,
-                                        Channel#channel.conninfo,
-                                        Channel#channel.session),
+    persist_if_session(Channel),
     run_terminate_hook(kicked, Channel);
 terminate({shutdown, Reason}, Channel) when Reason =:= discarded;
                                             Reason =:= takeovered ->
     run_terminate_hook(Reason, Channel);
 terminate(Reason, Channel = #channel{will_msg = WillMsg}) ->
     (WillMsg =/= undefined) andalso publish_will_msg(WillMsg),
-    _ = emqx_persistent_session:persist(Channel#channel.clientinfo,
-                                        Channel#channel.conninfo,
-                                        Channel#channel.session),
+    persist_if_session(Channel),
     run_terminate_hook(Reason, Channel).
 
+persist_if_session(#channel{session = Session} = Channel) ->
+    case emqx_session:is_session(Session) of
+        true ->
+            _ = emqx_persistent_session:persist(Channel#channel.clientinfo,
+                                                Channel#channel.conninfo,
+                                                Channel#channel.session),
+            ok;
+        false ->
+            ok
+    end.
+
 run_terminate_hook(_Reason, #channel{session = undefined}) -> ok;
 run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session}) ->
     emqx_session:terminate(ClientInfo, Reason, Session).

+ 8 - 1
apps/emqx/src/emqx_cm.erl

@@ -58,7 +58,9 @@
         , lookup_channels/2
         ]).
 
--export([all_channels/0]).
+-export([ all_channels/0
+        , all_client_ids/0
+        ]).
 
 %% gen_server callbacks
 -export([ init/1
@@ -400,6 +402,11 @@ all_channels() ->
     Pat = [{{'_', '$1'}, [], ['$1']}],
     ets:select(?CHAN_TAB, Pat).
 
+all_client_ids() ->
+    Pat = [{{'$1', '_'}, [], ['$1']}],
+    ets:select(?CHAN_TAB, Pat).
+
+
 %% @doc Lookup channels.
 -spec(lookup_channels(emqx_types:clientid()) -> list(chan_pid())).
 lookup_channels(ClientId) ->

+ 11 - 6
apps/emqx/src/emqx_persistent_session.erl

@@ -179,12 +179,17 @@ timestamp_from_conninfo(ConnInfo) ->
     end.
 
 lookup(ClientID) when is_binary(ClientID) ->
-    case lookup_session_store(ClientID) of
-        none -> none;
-        {value, #session_store{session = S} = SS} ->
-            case persistent_session_status(SS) of
-                expired        -> {expired, S};
-                persistent     -> {persistent, S}
+    case is_store_enabled() of
+        false ->
+            none;
+        true ->
+            case lookup_session_store(ClientID) of
+                none -> none;
+                {value, #session_store{session = S} = SS} ->
+                    case persistent_session_status(SS) of
+                        expired        -> {expired, S};
+                        persistent     -> {persistent, S}
+                    end
             end
     end.
 

+ 4 - 0
apps/emqx/src/emqx_session.erl

@@ -58,6 +58,7 @@
 
 -export([ info/1
         , info/2
+        , is_session/1
         , stats/1
         ]).
 
@@ -202,6 +203,9 @@ init(Opts) ->
 %% Info, Stats
 %%--------------------------------------------------------------------
 
+is_session(#session{}) -> true;
+is_session(_) -> false.
+
 %% @doc Get infos of the session.
 -spec(info(session()) -> emqx_types:infos()).
 info(Session) ->

+ 50 - 26
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -113,6 +113,8 @@ init_per_group(snabbkaffe, Config) ->
     [ {kill_connection_process, true} | Config];
 init_per_group(gc_tests, Config) ->
     %% We need to make sure the system does not interfere with this test group.
+    [maybe_kill_connection_process(ClientId, [{kill_connection_process, true}])
+     || ClientId <- emqx_cm:all_client_ids()],
     emqx_common_test_helpers:stop_apps([]),
     SessionMsgEts = gc_tests_session_store,
     MsgEts = gc_tests_msg_store,
@@ -230,50 +232,70 @@ receive_messages(Count, Msgs) ->
 maybe_kill_connection_process(ClientId, Config) ->
     case ?config(kill_connection_process, Config) of
         true ->
-            [ConnectionPid] = emqx_cm:lookup_channels(ClientId),
-            ?assert(is_pid(ConnectionPid)),
-            Ref = monitor(process, ConnectionPid),
-            ConnectionPid ! die_if_test,
-            receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
-            after 3000 -> error(process_did_not_die)
+            case emqx_cm:lookup_channels(ClientId) of
+                [] ->
+                    ok;
+                [ConnectionPid] ->
+                    ?assert(is_pid(ConnectionPid)),
+                    Ref = monitor(process, ConnectionPid),
+                    ConnectionPid ! die_if_test,
+                    receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok
+                    after 3000 -> error(process_did_not_die)
+                    end,
+                    wait_for_cm_unregister(ClientId)
             end;
         false ->
             ok
     end.
 
-snabbkaffe_sync_publish(Topic, Payloads, Config) ->
+wait_for_cm_unregister(ClientId) ->
+    wait_for_cm_unregister(ClientId, 10).
+
+wait_for_cm_unregister(_ClientId, 0) ->
+    error(cm_did_not_unregister);
+wait_for_cm_unregister(ClientId, N) ->
+    case emqx_cm:lookup_channels(ClientId) of
+        [] -> ok;
+        [_] -> timer:sleep(100), wait_for_cm_unregister(ClientId, N - 1)
+    end.
+
+snabbkaffe_sync_publish(Topic, Payloads) ->
     Fun = fun(Client, Payload) ->
                   ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
                                     , #{?snk_kind := ps_persist_msg, payload := Payload}
                                     )
           end,
-    do_publish(Payloads, Fun, Config).
+    do_publish(Payloads, Fun, true).
 
-publish(Topic, Payloads, Config) ->
+publish(Topic, Payloads) ->
     Fun = fun(Client, Payload) ->
                   {ok, _} = emqtt:publish(Client, Topic, Payload, 2)
           end,
-    do_publish(Payloads, Fun, Config).
+    do_publish(Payloads, Fun, false).
 
-do_publish(Payloads = [_|_], PublishFun, Config) ->
+do_publish(Payloads = [_|_], PublishFun, WaitForUnregister) ->
     %% Publish from another process to avoid connection confusion.
     {Pid, Ref} =
         spawn_monitor(
           fun() ->
                   %% For convenience, always publish using tcp.
                   %% The publish path is not what we are testing.
+                  ClientID = <<"ps_SUITE_publisher">>,
                   {ok, Client} = emqtt:start_link([ {proto_ver, v5}
+                                                  , {clientid, ClientID}
                                                   , {port, 1883} ]),
                   {ok, _} = emqtt:connect(Client),
                   lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads),
-                  ok = emqtt:disconnect(Client)
+                  ok = emqtt:disconnect(Client),
+                  %% Snabbkaffe sometimes fails unless all processes are gone.
+                  [wait_for_cm_unregister(ClientID) || WaitForUnregister]
           end),
     receive
         {'DOWN', Ref, process, Pid, normal} -> ok;
         {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What})
     end;
-do_publish(Payload, PublishFun, Config) ->
-    do_publish([Payload], PublishFun, Config).
+do_publish(Payload, PublishFun, WaitForUnregister) ->
+    do_publish([Payload], PublishFun, WaitForUnregister).
 
 %%--------------------------------------------------------------------
 %% Test Cases
@@ -297,7 +319,7 @@ t_connect_session_expiry_interval(Config) ->
 
     maybe_kill_connection_process(ClientId, Config),
 
-    publish(Topic, Payload, Config),
+    publish(Topic, Payload),
 
     {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
                                        {proto_ver, v5},
@@ -356,6 +378,8 @@ t_cancel_on_disconnect(Config) ->
     {ok, _} = emqtt:ConnFun(Client1),
     ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
 
+    wait_for_cm_unregister(ClientId),
+
     {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
                                        {proto_ver, v5},
                                        {clean_start, false},
@@ -424,7 +448,7 @@ t_process_dies_session_expires(Config) ->
 
     maybe_kill_connection_process(ClientId, Config),
 
-    ok = publish(Topic, [Payload], Config),
+    ok = publish(Topic, [Payload]),
 
     SessionId =
         case ?config(persistent_store_enabled, Config) of
@@ -498,7 +522,7 @@ t_publish_while_client_is_gone(Config) ->
     ok = emqtt:disconnect(Client1),
     maybe_kill_connection_process(ClientId, Config),
 
-    ok = publish(Topic, [Payload1, Payload2], Config),
+    ok = publish(Topic, [Payload1, Payload2]),
 
     {ok, Client2} = emqtt:start_link([ {proto_ver, v5},
                                        {clientid, ClientId},
@@ -544,7 +568,7 @@ t_clean_start_drops_subscriptions(Config) ->
     maybe_kill_connection_process(ClientId, Config),
 
     %% 2.
-    ok = publish(Topic, Payload1, Config),
+    ok = publish(Topic, Payload1),
 
     %% 3.
     {ok, Client2} = emqtt:start_link([ {proto_ver, v5},
@@ -556,7 +580,7 @@ t_clean_start_drops_subscriptions(Config) ->
     ?assertEqual(0, client_info(session_present, Client2)),
     {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
 
-    ok = publish(Topic, Payload2, Config),
+    ok = publish(Topic, Payload2),
     [Msg1] = receive_messages(1),
     ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
 
@@ -571,7 +595,7 @@ t_clean_start_drops_subscriptions(Config) ->
                                      | Config]),
     {ok, _} = emqtt:ConnFun(Client3),
 
-    ok = publish(Topic, Payload3, Config),
+    ok = publish(Topic, Payload3),
     [Msg2] = receive_messages(1),
     ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
 
@@ -625,7 +649,7 @@ t_multiple_subscription_matches(Config) ->
 
     maybe_kill_connection_process(ClientId, Config),
 
-    publish(Topic, Payload, Config),
+    publish(Topic, Payload),
 
     {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
                                        {proto_ver, v5},
@@ -675,9 +699,9 @@ t_lost_messages_because_of_gc(Config) ->
     {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
     emqtt:disconnect(Client1),
     maybe_kill_connection_process(ClientId, Config),
-    publish(Topic, Payload1, Config),
+    publish(Topic, Payload1),
     timer:sleep(2 * Retain),
-    publish(Topic, Payload2, Config),
+    publish(Topic, Payload2),
     emqx_persistent_session_gc:message_gc_worker(),
     {ok, Client2} = emqtt:start_link([ {clientid, ClientId},
                                        {clean_start, false},
@@ -790,7 +814,7 @@ t_snabbkaffe_pending_messages(Config) ->
 
     ?check_trace(
        begin
-           snabbkaffe_sync_publish(Topic, Payloads, Config),
+           snabbkaffe_sync_publish(Topic, Payloads),
            {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
            {ok, _} = emqtt:ConnFun(Client2),
            Msgs = receive_messages(length(Payloads)),
@@ -829,7 +853,7 @@ t_snabbkaffe_buffered_messages(Config) ->
     ok = emqtt:disconnect(Client1),
     maybe_kill_connection_process(ClientId, Config),
 
-    publish(Topic, Payloads1, Config),
+    publish(Topic, Payloads1),
 
     ?check_trace(
        begin
@@ -838,7 +862,7 @@ t_snabbkaffe_buffered_messages(Config) ->
                             #{ ?snk_kind := ps_resume_end }),
            spawn_link(fun() ->
                               ?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000),
-                              publish(Topic, Payloads2, Config)
+                              publish(Topic, Payloads2)
                       end),
            {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]),
            {ok, _} = emqtt:ConnFun(Client2),