ソースを参照

fix: do not crash on replicant node

zmstone 1 年間 前
コミット
ccd973d13e

+ 23 - 3
apps/emqx/src/emqx_cm_registry_keeper.erl

@@ -20,7 +20,8 @@
 
 -export([
     start_link/0,
-    count/1
+    count/1,
+    purge/0
 ]).
 
 %% gen_server callbacks
@@ -48,7 +49,10 @@ start_link() ->
 init(_) ->
     case mria_config:whoami() =:= replicant of
         true ->
-            ignore;
+            %% Do not run delete loops on replicant nodes
+            %% because the core nodes will do it anyway
+            %% The process is started to serve the 'count' calls
+            {ok, #{no_deletes => true}};
         false ->
             ok = send_delay_start(),
             {ok, #{next_clientid => undefined}}
@@ -71,6 +75,19 @@ count(Since) ->
             gen_server:call(?MODULE, {count, Since}, infinity)
     end.
 
+%% @doc Delete all retained history. Only for tests.
+-spec purge() -> ok.
+purge() ->
+    purge_loop(undefined).
+
+purge_loop(StartId) ->
+    case cleanup_one_chunk(StartId, _IsPurge = true) of
+        '$end_of_table' ->
+            ok;
+        NextId ->
+            purge_loop(NextId)
+    end.
+
 handle_call({count, Since}, _From, State) ->
     {LastCountTime, LastCount} =
         case State of
@@ -128,10 +145,13 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 cleanup_one_chunk(NextClientId) ->
+    cleanup_one_chunk(NextClientId, false).
+
+cleanup_one_chunk(NextClientId, IsPurge) ->
     Retain = retain_duration(),
     Now = now_ts(),
     IsExpired = fun(#channel{pid = Ts}) ->
-        is_integer(Ts) andalso (Ts < Now - Retain)
+        IsPurge orelse (is_integer(Ts) andalso (Ts < Now - Retain))
     end,
     cleanup_loop(NextClientId, ?CLEANUP_CHUNK_SIZE, IsExpired).
 

+ 13 - 4
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -423,7 +423,10 @@ schema("/sessions_count") ->
             responses => #{
                 200 => hoconsc:mk(binary(), #{
                     desc => <<"Number of sessions">>
-                })
+                }),
+                400 => emqx_dashboard_swagger:error_codes(
+                    ['BAD_REQUEST'], <<"Node {name} cannot handle this request.">>
+                )
             }
         }
     }.
@@ -1498,6 +1501,12 @@ message_example() ->
     }.
 
 sessions_count(get, #{query_string := QString}) ->
-    Since = maps:get(<<"since">>, QString, 0),
-    Count = emqx_cm_registry_keeper:count(Since),
-    {200, integer_to_binary(Count)}.
+    try
+        Since = maps:get(<<"since">>, QString, 0),
+        Count = emqx_cm_registry_keeper:count(Since),
+        {200, integer_to_binary(Count)}
+    catch
+        exit:{noproc, _} ->
+            Msg = io_lib:format("Node (~s) cannot handle this request.", [node()]),
+            {400, 'BAD_REQUEST', iolist_to_binary(Msg)}
+    end.

+ 38 - 0
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -798,6 +798,44 @@ t_client_id_not_found(_Config) ->
     %% Inflight messages
     ?assertMatch({error, {Http, _, Body}}, ReqFun(get, PathFun(["inflight_messages"]))).
 
+t_sessions_count(_Config) ->
+    ClientId = atom_to_binary(?FUNCTION_NAME),
+    Topic = <<"t/test_sessions_count">>,
+    Conf0 = emqx_config:get([broker]),
+    Conf1 = hocon_maps:deep_merge(Conf0, #{session_history_retain => 5}),
+    %% from 1 seconds ago, which is for sure less than histry retain duration
+    %% hence force a call to the gen_server emqx_cm_registry_keeper
+    Since = erlang:system_time(seconds) - 1,
+    ok = emqx_config:put(#{broker => Conf1}),
+    {ok, Client} = emqtt:start_link([
+        {proto_ver, v5},
+        {clientid, ClientId},
+        {clean_start, true}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
+    Path = emqx_mgmt_api_test_util:api_path(["sessions_count"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ?assertMatch(
+        {ok, "1"},
+        emqx_mgmt_api_test_util:request_api(
+            get, Path, "since=" ++ integer_to_list(Since), AuthHeader
+        )
+    ),
+    ok = emqtt:disconnect(Client),
+    %% simulate the situation in which the process is not running
+    ok = supervisor:terminate_child(emqx_cm_sup, emqx_cm_registry_keeper),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        emqx_mgmt_api_test_util:request_api(
+            get, Path, "since=" ++ integer_to_list(Since), AuthHeader
+        )
+    ),
+    %% restore default value
+    ok = emqx_config:put(#{broker => Conf0}),
+    ok = emqx_cm_registry_keeper:purge(),
+    ok.
+
 t_mqueue_messages(Config) ->
     ClientId = atom_to_binary(?FUNCTION_NAME),
     Topic = <<"t/test_mqueue_msgs">>,

+ 1 - 1
changes/ce/feat-12326.en.md

@@ -11,4 +11,4 @@ A new gauge `cluster_sessions` is added to the metrics collection. Exposed to pr
 emqx_cluster_sessions_count 1234
 ```
 
-The counter can only be used for an approximate estimation as the collection and calculations are async.
+NOTE: The counter can only be used for an approximate estimation as the collection and calculations are async.