Przeglądaj źródła

Merge pull request #12996 from savonarola/0508-fix-cursor-usage

fix(retainer): fix qlc cursor cleanup
Ilia Averianov 1 rok temu
rodzic
commit
a1aedee446

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer.app.src

@@ -2,7 +2,7 @@
 {application, emqx_retainer, [
     {description, "EMQX Retainer"},
     % strict semver, bump manually!
-    {vsn, "5.0.22"},
+    {vsn, "5.0.23"},
     {modules, []},
     {registered, [emqx_retainer_sup]},
     {applications, [kernel, stdlib, emqx, emqx_ctl]},

+ 3 - 1
apps/emqx_retainer/src/emqx_retainer.erl

@@ -47,6 +47,7 @@
     retained_count/0,
     backend_module/0,
     backend_module/1,
+    backend_state/1,
     enabled/0
 ]).
 
@@ -103,6 +104,7 @@
 -callback page_read(backend_state(), emqx_maybe:t(topic()), non_neg_integer(), non_neg_integer()) ->
     {ok, has_next(), list(message())}.
 -callback match_messages(backend_state(), topic(), cursor()) -> {ok, list(message()), cursor()}.
+-callback delete_cursor(backend_state(), cursor()) -> ok.
 -callback clear_expired(backend_state()) -> ok.
 -callback clean(backend_state()) -> ok.
 -callback size(backend_state()) -> non_neg_integer().
@@ -339,7 +341,7 @@ count(Context) ->
 clear_expired(Context) ->
     Mod = backend_module(Context),
     BackendState = backend_state(Context),
-    Mod:clear_expired(BackendState).
+    ok = Mod:clear_expired(BackendState).
 
 -spec store_retained(context(), message()) -> ok.
 store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) ->

+ 104 - 59
apps/emqx_retainer/src/emqx_retainer_dispatcher.erl

@@ -46,15 +46,26 @@
 -type limiter() :: emqx_htb_limiter:limiter().
 -type context() :: emqx_retainer:context().
 -type topic() :: emqx_types:topic().
--type cursor() :: emqx_retainer:cursor().
 
 -define(POOL, ?MODULE).
 
+%% For tests
+-export([
+    dispatch/3
+]).
+
+%% This module is `emqx_retainer` companion
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
 %%%===================================================================
 %%% API
 %%%===================================================================
+
 dispatch(Context, Topic) ->
-    cast({?FUNCTION_NAME, Context, self(), Topic}).
+    dispatch(Context, Topic, self()).
+
+dispatch(Context, Topic, Pid) ->
+    cast({dispatch, Context, Pid, Topic}).
 
 %% reset the client's limiter after updated the limiter's config
 refresh_limiter() ->
@@ -156,7 +167,7 @@ handle_call(Req, _From, State) ->
     | {noreply, NewState :: term(), hibernate}
     | {stop, Reason :: term(), NewState :: term()}.
 handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
-    {ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
+    {ok, Limiter2} = dispatch(Context, Pid, Topic, Limiter),
     {noreply, State#{limiter := Limiter2}};
 handle_cast({refresh_limiter, Conf}, State) ->
     BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
@@ -234,86 +245,120 @@ format_status(_Opt, Status) ->
 cast(Msg) ->
     gen_server:cast(worker(), Msg).
 
--spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
-dispatch(Context, Pid, Topic, Cursor, Limiter) ->
+-spec dispatch(context(), pid(), topic(), limiter()) -> {ok, limiter()}.
+dispatch(Context, Pid, Topic, Limiter) ->
     Mod = emqx_retainer:backend_module(Context),
-    case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
-        false ->
-            {ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
-            deliver(Result, Context, Pid, Topic, undefined, Limiter);
+    State = emqx_retainer:backend_state(Context),
+    case emqx_topic:wildcard(Topic) of
         true ->
-            {ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]),
-            deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
+            {ok, Messages, Cursor} = Mod:match_messages(State, Topic, undefined),
+            dispatch_with_cursor(Context, Messages, Cursor, Pid, Topic, Limiter);
+        false ->
+            {ok, Messages} = Mod:read_message(State, Topic),
+            dispatch_at_once(Messages, Pid, Topic, Limiter)
+    end.
+
+dispatch_at_once(Messages, Pid, Topic, Limiter0) ->
+    case deliver(Messages, Pid, Topic, Limiter0) of
+        {ok, Limiter1} ->
+            {ok, Limiter1};
+        {drop, Limiter1} ->
+            {ok, Limiter1};
+        no_receiver ->
+            ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
+            {ok, Limiter0}
     end.
 
--spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) ->
-    {ok, limiter()}.
-deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
+dispatch_with_cursor(Context, [], Cursor, _Pid, _Topic, Limiter) ->
+    ok = delete_cursor(Context, Cursor),
     {ok, Limiter};
-deliver([], Context, Pid, Topic, Cursor, Limiter) ->
-    dispatch(Context, Pid, Topic, Cursor, Limiter);
-deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
+dispatch_with_cursor(Context, Messages0, Cursor0, Pid, Topic, Limiter0) ->
+    case deliver(Messages0, Pid, Topic, Limiter0) of
+        {ok, Limiter1} ->
+            {ok, Messages1, Cursor1} = match_next(Context, Topic, Cursor0),
+            dispatch_with_cursor(Context, Messages1, Cursor1, Pid, Topic, Limiter1);
+        {drop, Limiter1} ->
+            ok = delete_cursor(Context, Cursor0),
+            {ok, Limiter1};
+        no_receiver ->
+            ?tp(debug, retainer_dispatcher_no_receiver, #{topic => Topic}),
+            ok = delete_cursor(Context, Cursor0),
+            {ok, Limiter0}
+    end.
+
+match_next(_Context, _Topic, undefined) ->
+    {ok, [], undefined};
+match_next(Context, Topic, Cursor) ->
+    Mod = emqx_retainer:backend_module(Context),
+    State = emqx_retainer:backend_state(Context),
+    Mod:match_messages(State, Topic, Cursor).
+
+delete_cursor(_Context, undefined) ->
+    ok;
+delete_cursor(Context, Cursor) ->
+    Mod = emqx_retainer:backend_module(Context),
+    State = emqx_retainer:backend_state(Context),
+    Mod:delete_cursor(State, Cursor).
+
+-spec deliver([emqx_types:message()], pid(), topic(), limiter()) ->
+    {ok, limiter()} | {drop, limiter()} | no_receiver.
+deliver(Messages, Pid, Topic, Limiter) ->
     case erlang:is_process_alive(Pid) of
         false ->
-            {ok, Limiter};
+            no_receiver;
         _ ->
-            DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
-            case DeliverNum of
+            BatchSize = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
+            case BatchSize of
                 0 ->
-                    do_deliver(Result, Pid, Topic),
+                    deliver_to_client(Messages, Pid, Topic),
                     {ok, Limiter};
                 _ ->
-                    case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of
-                        {ok, Limiter2} ->
-                            deliver([], Context, Pid, Topic, Cursor, Limiter2);
-                        {drop, Limiter2} ->
-                            {ok, Limiter2}
-                    end
+                    deliver_in_batches(Messages, BatchSize, Pid, Topic, Limiter)
             end
     end.
 
-do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
+deliver_in_batches([], _BatchSize, _Pid, _Topic, Limiter) ->
     {ok, Limiter};
-do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
-    {Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
-    case emqx_htb_limiter:consume(Num, Limiter) of
-        {ok, Limiter2} ->
-            do_deliver(ToDelivers, Pid, Topic),
-            do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
-        {drop, _} = Drop ->
+deliver_in_batches(Msgs, BatchSize, Pid, Topic, Limiter0) ->
+    {BatchActualSize, Batch, RestMsgs} = take(BatchSize, Msgs),
+    case emqx_htb_limiter:consume(BatchActualSize, Limiter0) of
+        {ok, Limiter1} ->
+            ok = deliver_to_client(Batch, Pid, Topic),
+            deliver_in_batches(RestMsgs, BatchSize, Pid, Topic, Limiter1);
+        {drop, _Limiter1} = Drop ->
             ?SLOG(debug, #{
                 msg => "retained_message_dropped",
                 reason => "reached_ratelimit",
-                dropped_count => length(ToDelivers)
+                dropped_count => BatchActualSize
             }),
             Drop
     end.
 
-do_deliver([Msg | T], Pid, Topic) ->
-    case emqx_banned:look_up({clientid, Msg#message.from}) of
-        [] ->
-            Pid ! {deliver, Topic, Msg},
-            ok;
-        _ ->
-            ?tp(
-                notice,
-                ignore_retained_message_deliver,
-                #{
-                    reason => "client is banned",
-                    clientid => Msg#message.from
-                }
-            )
-    end,
-    do_deliver(T, Pid, Topic);
-do_deliver([], _, _) ->
+deliver_to_client([Msg | T], Pid, Topic) ->
+    _ =
+        case emqx_banned:look_up({clientid, Msg#message.from}) of
+            [] ->
+                Pid ! {deliver, Topic, Msg};
+            _ ->
+                ?tp(
+                    notice,
+                    ignore_retained_message_deliver,
+                    #{
+                        reason => "client is banned",
+                        clientid => Msg#message.from
+                    }
+                )
+        end,
+    deliver_to_client(T, Pid, Topic);
+deliver_to_client([], _, _) ->
     ok.
 
-safe_split(N, List) ->
-    safe_split(N, List, 0, []).
+take(N, List) ->
+    take(N, List, 0, []).
 
-safe_split(0, List, Count, Acc) ->
+take(0, List, Count, Acc) ->
     {Count, lists:reverse(Acc), List};
-safe_split(_N, [], Count, Acc) ->
+take(_N, [], Count, Acc) ->
     {Count, lists:reverse(Acc), []};
-safe_split(N, [H | T], Count, Acc) ->
-    safe_split(N - 1, T, Count + 1, [H | Acc]).
+take(N, [H | T], Count, Acc) ->
+    take(N - 1, T, Count + 1, [H | Acc]).

+ 9 - 2
apps/emqx_retainer/src/emqx_retainer_mnesia.erl

@@ -35,6 +35,7 @@
     read_message/2,
     page_read/4,
     match_messages/3,
+    delete_cursor/2,
     clear_expired/1,
     clean/1,
     size/1
@@ -205,7 +206,7 @@ delete_message(_State, Topic) ->
 read_message(_State, Topic) ->
     {ok, read_messages(Topic)}.
 
-match_messages(_State, Topic, undefined) ->
+match_messages(State, Topic, undefined) ->
     Tokens = topic_to_tokens(Topic),
     Now = erlang:system_time(millisecond),
     QH = msg_table(search_table(Tokens, Now)),
@@ -214,7 +215,7 @@ match_messages(_State, Topic, undefined) ->
             {ok, qlc:eval(QH), undefined};
         BatchNum when is_integer(BatchNum) ->
             Cursor = qlc:cursor(QH),
-            match_messages(undefined, Topic, {Cursor, BatchNum})
+            match_messages(State, Topic, {Cursor, BatchNum})
     end;
 match_messages(_State, _Topic, {Cursor, BatchNum}) ->
     case qlc_next_answers(Cursor, BatchNum) of
@@ -224,6 +225,11 @@ match_messages(_State, _Topic, {Cursor, BatchNum}) ->
             {ok, Rows, {Cursor, BatchNum}}
     end.
 
+delete_cursor(_State, {Cursor, _}) ->
+    qlc:delete_cursor(Cursor);
+delete_cursor(_State, undefined) ->
+    ok.
+
 page_read(_State, Topic, Page, Limit) ->
     Now = erlang:system_time(millisecond),
     QH =
@@ -562,6 +568,7 @@ reindex(NewIndices, Force, StatusFun) when
 
             %% Fill index records in batches.
             QH = qlc:q([Topic || #retained_message{topic = Topic} <- ets:table(?TAB_MESSAGE)]),
+
             ok = reindex_batch(qlc:cursor(QH), 0, StatusFun),
 
             %% Enable read indices and unlock reindexing.

+ 108 - 34
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -21,6 +21,7 @@
 
 -include("emqx_retainer.hrl").
 
+-include_lib("emqx/include/asserts.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -96,14 +97,19 @@ end_per_group(_Group, Config) ->
     emqx_retainer_mnesia:populate_index_meta(),
     Config.
 
-init_per_testcase(t_get_basic_usage_info, Config) ->
+init_per_testcase(_TestCase, Config) ->
     mnesia:clear_table(?TAB_INDEX),
     mnesia:clear_table(?TAB_MESSAGE),
     emqx_retainer_mnesia:populate_index_meta(),
-    Config;
-init_per_testcase(_TestCase, Config) ->
     Config.
 
+end_per_testcase(t_flow_control, _Config) ->
+    restore_delivery();
+end_per_testcase(t_cursor_cleanup, _Config) ->
+    restore_delivery();
+end_per_testcase(_TestCase, _Config) ->
+    ok.
+
 app_spec() ->
     {emqx_retainer, ?BASE_CONF}.
 
@@ -405,19 +411,7 @@ t_stop_publish_clear_msg(_) ->
     ok = emqtt:disconnect(C1).
 
 t_flow_control(_) ->
-    Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
-    LimiterCfg = make_limiter_cfg(Rate),
-    JsonCfg = make_limiter_json(<<"1/1s">>),
-    emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
-    emqx_retainer:update_config(#{
-        <<"delivery_rate">> => <<"1/1s">>,
-        <<"flow_control">> =>
-            #{
-                <<"batch_read_number">> => 1,
-                <<"batch_deliver_number">> => 1,
-                <<"batch_deliver_limiter">> => JsonCfg
-            }
-    }),
+    setup_slow_delivery(),
     {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
     {ok, _} = emqtt:connect(C1),
     emqtt:publish(
@@ -442,23 +436,60 @@ t_flow_control(_) ->
     {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
     ?assertEqual(3, length(receive_messages(3))),
     End = erlang:system_time(millisecond),
+
     Diff = End - Begin,
 
     ?assert(
-        Diff > timer:seconds(2.5) andalso Diff < timer:seconds(3.9),
+        Diff > timer:seconds(2.1) andalso Diff < timer:seconds(3.9),
         lists:flatten(io_lib:format("Diff is :~p~n", [Diff]))
     ),
 
     ok = emqtt:disconnect(C1),
+    ok.
+
+t_cursor_cleanup(_) ->
+    setup_slow_delivery(),
+    {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
+    {ok, _} = emqtt:connect(C1),
+    lists:foreach(
+        fun(I) ->
+            emqtt:publish(
+                C1,
+                <<"retained/", (integer_to_binary(I))/binary>>,
+                <<"this is a retained message">>,
+                [{qos, 0}, {retain, true}]
+            )
+        end,
+        lists:seq(1, 5)
+    ),
+    {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
+
+    snabbkaffe:start_trace(),
+
+    ?assertWaitEvent(
+        emqtt:disconnect(C1),
+        #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/#">>},
+        2000
+    ),
+
+    ?assertEqual(0, qlc_process_count()),
+
+    {Pid, Ref} = spawn_monitor(fun() -> ok end),
+    receive
+        {'DOWN', Ref, _, _, _} -> ok
+    after 1000 -> ct:fail("should receive 'DOWN' message")
+    end,
+
+    ?assertWaitEvent(
+        emqx_retainer_dispatcher:dispatch(emqx_retainer:context(), <<"retained/1">>, Pid),
+        #{?snk_kind := retainer_dispatcher_no_receiver, topic := <<"retained/1">>},
+        2000
+    ),
+
+    ?assertEqual(0, qlc_process_count()),
+
+    snabbkaffe:stop(),
 
-    emqx_limiter_server:del_bucket(emqx_retainer, internal),
-    emqx_retainer:update_config(#{
-        <<"flow_control">> =>
-            #{
-                <<"batch_read_number">> => 1,
-                <<"batch_deliver_number">> => 1
-            }
-    }),
     ok.
 
 t_clear_expired(_) ->
@@ -849,15 +880,21 @@ with_conf(ConfMod, Case) ->
     end.
 
 make_limiter_cfg(Rate) ->
-    Client = #{
-        rate => Rate,
-        initial => 0,
-        burst => 0,
-        low_watermark => 1,
-        divisible => false,
-        max_retry_time => timer:seconds(5),
-        failure_strategy => force
-    },
+    make_limiter_cfg(Rate, #{}).
+
+make_limiter_cfg(Rate, ClientOpts) ->
+    Client = maps:merge(
+        #{
+            rate => Rate,
+            initial => 0,
+            burst => 0,
+            low_watermark => 1,
+            divisible => false,
+            max_retry_time => timer:seconds(5),
+            failure_strategy => force
+        },
+        ClientOpts
+    ),
     #{client => Client, rate => Rate, initial => 0, burst => 0}.
 
 make_limiter_json(Rate) ->
@@ -909,3 +946,40 @@ do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
     Res = emqtt:publish(Client, Topic, Payload, Opts),
     ct:sleep(Time),
     Res.
+
+setup_slow_delivery() ->
+    Rate = emqx_ratelimiter_SUITE:to_rate("1/1s"),
+    LimiterCfg = make_limiter_cfg(Rate),
+    JsonCfg = make_limiter_json(<<"1/1s">>),
+    emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg),
+    emqx_retainer:update_config(#{
+        <<"delivery_rate">> => <<"1/1s">>,
+        <<"flow_control">> =>
+            #{
+                <<"batch_read_number">> => 1,
+                <<"batch_deliver_number">> => 1,
+                <<"batch_deliver_limiter">> => JsonCfg
+            }
+    }).
+
+restore_delivery() ->
+    emqx_limiter_server:del_bucket(emqx_retainer, internal),
+    emqx_retainer:update_config(#{
+        <<"flow_control">> =>
+            #{
+                <<"batch_read_number">> => 1,
+                <<"batch_deliver_number">> => 1
+            }
+    }).
+
+qlc_processes() ->
+    lists:filter(
+        fun(Pid) ->
+            {current_function, {qlc, wait_for_request, 3}} =:=
+                erlang:process_info(Pid, current_function)
+        end,
+        erlang:processes()
+    ).
+
+qlc_process_count() ->
+    length(qlc_processes()).

+ 1 - 0
changes/ce/fix-12996.en.md

@@ -0,0 +1 @@
+Fix process leak in `emqx_retainer` application. Previously, client disconnection while receiving retained messages could cause a process leak.