Prechádzať zdrojové kódy

feat(purge): clear delayed messages

Thales Macedo Garitezi 2 rokov pred
rodič
commit
f988de4ff4

+ 2 - 0
apps/emqx/priv/bpapi.versions

@@ -15,7 +15,9 @@
 {emqx_conf,3}.
 {emqx_dashboard,1}.
 {emqx_delayed,1}.
+{emqx_delayed,2}.
 {emqx_eviction_agent,1}.
+{emqx_eviction_agent,2}.
 {emqx_exhook,1}.
 {emqx_ft_storage_exporter_fs,1}.
 {emqx_ft_storage_fs,1}.

+ 32 - 1
apps/emqx_eviction_agent/src/emqx_eviction_agent.erl

@@ -18,6 +18,7 @@
     disable/1,
     status/0,
     connection_count/0,
+    all_channels_count/0,
     session_count/0,
     session_count/1,
     evict_connections/1,
@@ -27,6 +28,9 @@
     evict_session_channel/3
 ]).
 
+%% RPC targets
+-export([all_local_channels_count/0]).
+
 -behaviour(gen_server).
 
 -export([
@@ -240,6 +244,33 @@ channel_with_session_table(RequiredConnState) ->
         RequiredConnState =:= ConnState
     ]).
 
+-spec all_channels_count() -> non_neg_integer().
+all_channels_count() ->
+    Nodes = emqx:running_nodes(),
+    Timeout = 15_000,
+    Results = emqx_eviction_agent_proto_v2:all_channels_count(Nodes, Timeout),
+    NodeResults = lists:zip(Nodes, Results),
+    Errors = lists:filter(
+        fun
+            ({_Node, {ok, _}}) -> false;
+            ({_Node, _Err}) -> true
+        end,
+        NodeResults
+    ),
+    Errors =/= [] andalso
+        ?SLOG(
+            warning,
+            #{
+                msg => "error_collecting_all_channels_count",
+                errors => maps:from_list(Errors)
+            }
+        ),
+    lists:sum([N || {ok, N} <- Results]).
+
+-spec all_local_channels_count() -> non_neg_integer().
+all_local_channels_count() ->
+    table_count(emqx_cm:all_channels_table(?CONN_MODULES)).
+
 session_count() ->
     session_count(any).
 
@@ -303,7 +334,7 @@ evict_session_channel(Nodes, ClientId, ConnInfo, ClientInfo) ->
             client_info => ClientInfo
         }
     ),
-    case emqx_eviction_agent_proto_v1:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of
+    case emqx_eviction_agent_proto_v2:evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) of
         {badrpc, Reason} ->
             ?SLOG(
                 error,

+ 35 - 0
apps/emqx_eviction_agent/src/proto/emqx_eviction_agent_proto_v2.erl

@@ -0,0 +1,35 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_eviction_agent_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+
+    evict_session_channel/4,
+
+    %% Introduced in v2:
+    all_channels_count/2
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec evict_session_channel(
+    node(),
+    emqx_types:clientid(),
+    emqx_types:conninfo(),
+    emqx_types:clientinfo()
+) -> supervisor:startchild_err() | emqx_rpc:badrpc().
+evict_session_channel(Node, ClientId, ConnInfo, ClientInfo) ->
+    rpc:call(Node, emqx_eviction_agent, evict_session_channel, [ClientId, ConnInfo, ClientInfo]).
+
+%% Introduced in v2:
+-spec all_channels_count([node()], time:time()) -> emqx_rpc:erpc_multicall(non_neg_integer()).
+all_channels_count(Nodes, Timeout) ->
+    erpc:multicall(Nodes, emqx_eviction_agent, all_local_channels_count, [], Timeout).

+ 22 - 6
apps/emqx_modules/src/emqx_delayed.erl

@@ -45,18 +45,22 @@
     code_change/3
 ]).
 
-%% gen_server callbacks
+%% API
 -export([
     load/0,
     unload/0,
     load_or_unload/1,
     get_conf/1,
     update_config/1,
+    delayed_count/0,
     list/1,
     get_delayed_message/1,
     get_delayed_message/2,
     delete_delayed_message/1,
     delete_delayed_message/2,
+    clear_all/0,
+    %% rpc target
+    clear_all_local/0,
     cluster_list/1
 ]).
 
@@ -167,6 +171,9 @@ unload() ->
 load_or_unload(Bool) ->
     gen_server:call(?SERVER, {do_load_or_unload, Bool}).
 
+-spec delayed_count() -> non_neg_integer().
+delayed_count() -> mnesia:table_info(?TAB, size).
+
 list(Params) ->
     emqx_mgmt_api:paginate(?TAB, Params, ?FORMAT_FUN).
 
@@ -243,7 +250,7 @@ get_delayed_message(Id) ->
 get_delayed_message(Node, Id) when Node =:= node() ->
     get_delayed_message(Id);
 get_delayed_message(Node, Id) ->
-    emqx_delayed_proto_v1:get_delayed_message(Node, Id).
+    emqx_delayed_proto_v2:get_delayed_message(Node, Id).
 
 -spec delete_delayed_message(binary()) -> with_id_return().
 delete_delayed_message(Id) ->
@@ -258,7 +265,19 @@ delete_delayed_message(Id) ->
 delete_delayed_message(Node, Id) when Node =:= node() ->
     delete_delayed_message(Id);
 delete_delayed_message(Node, Id) ->
-    emqx_delayed_proto_v1:delete_delayed_message(Node, Id).
+    emqx_delayed_proto_v2:delete_delayed_message(Node, Id).
+
+-spec clear_all() -> ok.
+clear_all() ->
+    Nodes = emqx:running_nodes(),
+    _ = emqx_delayed_proto_v2:clear_all(Nodes),
+    ok.
+
+%% rpc target
+-spec clear_all_local() -> ok.
+clear_all_local() ->
+    _ = mria:clear_table(?TAB),
+    ok.
 
 update_config(Config) ->
     emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
@@ -408,9 +427,6 @@ do_publish(Key = {Ts, _Id}, Now, Acc) when Ts =< Now ->
     end,
     do_publish(mnesia:dirty_next(?TAB, Key), Now, [Key | Acc]).
 
--spec delayed_count() -> non_neg_integer().
-delayed_count() -> mnesia:table_info(?TAB, size).
-
 do_load_or_unload(true, State) ->
     emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_DELAY_PUB),
     State;

+ 48 - 0
apps/emqx_modules/src/proto/emqx_delayed_proto_v2.erl

@@ -0,0 +1,48 @@
+%%--------------------------------------------------------------------
+%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_delayed_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    get_delayed_message/2,
+    delete_delayed_message/2,
+
+    %% Introduced in v2:
+    clear_all/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+
+introduced_in() ->
+    "5.2.1".
+
+-spec get_delayed_message(node(), binary()) ->
+    emqx_delayed:with_id_return(map()) | emqx_rpc:badrpc().
+get_delayed_message(Node, Id) ->
+    rpc:call(Node, emqx_delayed, get_delayed_message, [Id]).
+
+-spec delete_delayed_message(node(), binary()) -> emqx_delayed:with_id_return() | emqx_rpc:badrpc().
+delete_delayed_message(Node, Id) ->
+    rpc:call(Node, emqx_delayed, delete_delayed_message, [Id]).
+
+%% Introduced in v2:
+
+-spec clear_all([node()]) -> emqx_rpc:erpc_multicall(ok).
+clear_all(Nodes) ->
+    erpc:multicall(Nodes, emqx_delayed, clear_all_local, []).

+ 3 - 3
apps/emqx_modules/test/emqx_delayed_SUITE.erl

@@ -164,15 +164,15 @@ t_cluster(_) ->
 
     ?assertMatch(
         {ok, _},
-        emqx_delayed_proto_v1:get_delayed_message(node(), Id)
+        emqx_delayed_proto_v2:get_delayed_message(node(), Id)
     ),
 
     %% The 'local' and the 'fake-remote' values should be the same,
     %% however there is a race condition, so we are just assert that they are both 'ok' tuples
     ?assertMatch({ok, _}, emqx_delayed:get_delayed_message(Id)),
-    ?assertMatch({ok, _}, emqx_delayed_proto_v1:get_delayed_message(node(), Id)),
+    ?assertMatch({ok, _}, emqx_delayed_proto_v2:get_delayed_message(node(), Id)),
 
-    ok = emqx_delayed_proto_v1:delete_delayed_message(node(), Id),
+    ok = emqx_delayed_proto_v2:delete_delayed_message(node(), Id),
 
     ?assertMatch(
         {error, _},

+ 11 - 11
apps/emqx_node_rebalance/src/emqx_node_rebalance_purge.erl

@@ -43,7 +43,7 @@
 %% gen_statem states
 -define(disabled, disabled).
 -define(purging, purging).
--define(cleaning_retained_messages, cleaning_retained_messages).
+-define(cleaning_data, cleaning_data).
 
 -type start_opts() :: #{
     purge_rate => pos_integer()
@@ -80,7 +80,7 @@ start_link() ->
 
 callback_mode() -> handle_event_function.
 
-%% states: disabled, purging, cleaning_retained_messages
+%% states: disabled, purging, cleaning_data
 
 init([]) ->
     {ok, disabled, #{}}.
@@ -130,35 +130,35 @@ handle_event(
         purge_rate := PurgeRate
     } = Data
 ) ->
-    case emqx_eviction_agent:status() of
-        {enabled, #{sessions := Sessions}} when Sessions > 0 ->
+    case emqx_eviction_agent:all_channels_count() of
+        Sessions when Sessions > 0 ->
             ok = purge_sessions(PurgeRate),
-            ?tp(debug, cluster_purge_evict_session, #{purge_rate => PurgeRate}),
-            ?SLOG(
+            ?tp(
                 warning,
+                "cluster_purge_evict_sessions",
                 #{
-                    msg => "cluster_purge_evict_sessions",
                     count => Sessions,
                     purge_rate => PurgeRate
                 }
             ),
             NewData = Data#{current_sessions => Sessions},
             {keep_state, NewData, [{state_timeout, ?EVICT_INTERVAL, purge}]};
-        {enabled, #{sessions := 0}} ->
+        _Sessions = 0 ->
             NewData = Data#{current_conns => 0},
             ?SLOG(warning, #{msg => "cluster_purge_evict_sessions_done"}),
-            {next_state, ?cleaning_retained_messages, NewData, [
+            {next_state, ?cleaning_data, NewData, [
                 {state_timeout, 0, clean_retained_messages}
             ]}
     end;
 handle_event(
     state_timeout,
     clean_retained_messages,
-    ?cleaning_retained_messages,
+    ?cleaning_data,
     Data
 ) ->
-    ?SLOG(warning, #{msg => "cluster_purge_cleaning_retained_messages"}),
+    ?SLOG(warning, #{msg => "cluster_purge_cleaning_data"}),
     ok = emqx_retainer:clean(),
+    ok = emqx_delayed:clear_all(),
     ?tp(warning, "cluster_purge_done", #{}),
     ok = disable_purge(),
     ?tp(warning, "cluster_purge_finished_successfully", #{}),

+ 2 - 9
apps/emqx_node_rebalance/test/emqx_node_rebalance_cli_SUITE.erl

@@ -368,14 +368,7 @@ emqx_node_rebalance_cli(Node, Args) ->
 with_some_sessions(Fn) ->
     emqx_common_test_helpers:with_mock(
         emqx_eviction_agent,
-        status,
-        fun() ->
-            case meck:passthrough([]) of
-                {enabled, Status = #{sessions := _}} ->
-                    {enabled, Status#{sessions := 100}};
-                Res ->
-                    Res
-            end
-        end,
+        all_channels_count,
+        fun() -> 100 end,
         Fn
     ).

+ 19 - 12
apps/emqx_node_rebalance/test/emqx_node_rebalance_purge_SUITE.erl

@@ -109,6 +109,10 @@ app_specs() ->
                         #{enable => true}
                 }
         }},
+        {emqx_modules, #{
+            config =>
+                #{delayed => #{enable => true}}
+        }},
         emqx_eviction_agent,
         emqx_node_rebalance
     ].
@@ -133,15 +137,8 @@ with_some_sessions(Node, Fn) ->
     erpc:call(Node, fun() ->
         emqx_common_test_helpers:with_mock(
             emqx_eviction_agent,
-            status,
-            fun() ->
-                case meck:passthrough([]) of
-                    {enabled, Status = #{sessions := _}} ->
-                        {enabled, Status#{sessions := 100}};
-                    Res ->
-                        Res
-                end
-            end,
+            all_channels_count,
+            fun() -> 100 end,
             Fn
         )
     end).
@@ -317,8 +314,13 @@ t_session_purged(Config) ->
     Port1 = get_mqtt_port(Node1, tcp),
     Port2 = get_mqtt_port(Node2, tcp),
 
-    Node1Clients = emqtt_connect_many(Port1, 20, _StartN1 = 1),
-    Node2Clients = emqtt_connect_many(Port2, 20, _StartN2 = 21),
+    %% N.B.: it's important to have an asymmetric number of clients for this test, as
+    %% otherwise the scenario might happen to finish successfully due to the wrong
+    %% reasons!
+    NumClientsNode1 = 5,
+    NumClientsNode2 = 35,
+    Node1Clients = emqtt_connect_many(Port1, NumClientsNode1, _StartN1 = 1),
+    Node2Clients = emqtt_connect_many(Port2, NumClientsNode2, _StartN2 = 21),
     lists:foreach(
         fun(C) ->
             ClientId = proplists:get_value(clientid, emqtt:info(C)),
@@ -327,6 +329,8 @@ t_session_purged(Config) ->
             Payload = ClientId,
             Opts = [{retain, true}],
             ok = emqtt:publish(C, Topic, Props, Payload, Opts),
+            DelayedTopic = emqx_topic:join([<<"$delayed/120">>, Topic]),
+            ok = emqtt:publish(C, DelayedTopic, Payload),
             {ok, _, [?RC_GRANTED_QOS_0]} = emqtt:subscribe(C, Topic),
             ok
         end,
@@ -334,12 +338,13 @@ t_session_purged(Config) ->
     ),
 
     ?assertEqual(40, erpc:call(Node2, emqx_retainer, retained_count, [])),
+    ?assertEqual(NumClientsNode1, erpc:call(Node1, emqx_delayed, delayed_count, [])),
+    ?assertEqual(NumClientsNode2, erpc:call(Node2, emqx_delayed, delayed_count, [])),
 
     {ok, SRef0} = snabbkaffe:subscribe(
         ?match_event(#{?snk_kind := "cluster_purge_done"}),
         15_000
     ),
-    %% ok = rpc:call(Node1, emqx_node_rebalance_purge, start_global, [Nodes, opts(Config)]),
     ok = rpc:call(Node1, emqx_node_rebalance_purge, start, [opts(Config)]),
     {ok, _} = snabbkaffe:receive_events(SRef0),
 
@@ -347,6 +352,8 @@ t_session_purged(Config) ->
     ?assertEqual([], erpc:call(Node2, emqx_cm, all_channels, [])),
     ?assertEqual(0, erpc:call(Node1, emqx_retainer, retained_count, [])),
     ?assertEqual(0, erpc:call(Node2, emqx_retainer, retained_count, [])),
+    ?assertEqual(0, erpc:call(Node1, emqx_delayed, delayed_count, [])),
+    ?assertEqual(0, erpc:call(Node2, emqx_delayed, delayed_count, [])),
 
     ok = drain_exits(Node1Clients ++ Node2Clients),