Просмотр исходного кода

fix: remove atom leaks

Both emqx_resource_managers and emqx_resource_workers leaked atoms as they
created an unique atoms to use as registered names. This is fixed by
removing the need to register the names.

Fixes: https://emqx.atlassian.net/browse/EMQX-8583
Kjell Winblad 3 лет назад
Родитель
Сommit
8c482e03d1

+ 16 - 11
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -54,7 +54,7 @@
 
 % State record
 -record(data, {
-    id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error
+    id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error, pid
 }).
 -type data() :: #data{}.
 
@@ -296,17 +296,16 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
         state = undefined,
         error = undefined
     },
-    Module = atom_to_binary(?MODULE),
-    ProcName = binary_to_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
-    gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []).
+    gen_statem:start_link(?MODULE, {Data, Opts}, []).
 
 init({Data, Opts}) ->
     process_flag(trap_exit, true),
     %% init the cache so that lookup/1 will always return something
-    insert_cache(Data#data.id, Data#data.group, Data),
+    DataWithPid = Data#data{pid = self()},
+    insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid),
     case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
-        true -> {ok, connecting, Data, {next_event, internal, start_resource}};
-        false -> {ok, stopped, Data}
+        true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}};
+        false -> {ok, stopped, DataWithPid}
     end.
 
 terminate(_Reason, _State, Data) ->
@@ -429,6 +428,14 @@ read_cache(ResId) ->
         [] -> not_found
     end.
 
+read_manager_pid_from_cache(ResId) ->
+    case read_cache(ResId) of
+        not_found ->
+            erlang:error(badarg);
+        {_, #data{pid = ManagerPid}} ->
+            ManagerPid
+    end.
+
 delete_cache(ResId, MgrId) ->
     case get_owner(ResId) of
         MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId ->
@@ -649,10 +656,8 @@ do_wait_for_ready(ResId, Retry) ->
 
 safe_call(ResId, Message, Timeout) ->
     try
-        Module = atom_to_binary(?MODULE),
-        MgrId = get_owner(ResId),
-        ProcName = binary_to_existing_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
-        gen_statem:call(ProcName, Message, {clean_timeout, Timeout})
+        ManagerPid = read_manager_pid_from_cache(ResId),
+        gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout})
     catch
         error:badarg ->
             {error, not_found};

+ 68 - 73
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -76,7 +76,7 @@
 -type data() :: #{
     id => id(),
     index => index(),
-    name => atom(),
+    inflight_tid => ets:tid(),
     batch_size => pos_integer(),
     batch_time => timer:time(),
     queue => replayq:q(),
@@ -87,7 +87,7 @@
 callback_mode() -> [state_functions, state_enter].
 
 start_link(Id, Index, Opts) ->
-    gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
+    gen_statem:start_link(?MODULE, {Id, Index, Opts}, []).
 
 -spec sync_query(id(), request(), query_opts()) -> Result :: term().
 sync_query(Id, Request, Opts) ->
@@ -133,11 +133,11 @@ simple_async_query(Id, Request, ReplyFun) ->
     _ = handle_query_result(Id, Result, false, false),
     Result.
 
--spec block(pid() | atom()) -> ok.
+-spec block(pid()) -> ok.
 block(ServerRef) ->
     gen_statem:cast(ServerRef, block).
 
--spec resume(pid() | atom()) -> ok.
+-spec resume(pid()) -> ok.
 resume(ServerRef) ->
     gen_statem:cast(ServerRef, resume).
 
@@ -145,7 +145,6 @@ resume(ServerRef) ->
 init({Id, Index, Opts}) ->
     process_flag(trap_exit, true),
     true = gproc_pool:connect_worker(Id, {Id, Index}),
-    Name = name(Id, Index),
     BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE),
     SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE),
     TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE),
@@ -165,12 +164,12 @@ init({Id, Index, Opts}) ->
     emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)),
     emqx_resource_metrics:inflight_set(Id, Index, 0),
     InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
-    ok = inflight_new(Name, InfltWinSZ, Id, Index),
+    {ok, InflightTID} = inflight_new(InfltWinSZ, Id, Index),
     HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
     St = #{
         id => Id,
         index => Index,
-        name => Name,
+        inflight_tid => InflightTID,
         batch_size => BatchSize,
         batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
         queue => Queue,
@@ -283,14 +282,14 @@ pick_cast(Id, Key, Query) ->
         ok
     end).
 
-do_resume(#{id := Id, name := Name} = Data) ->
-    case inflight_get_first(Name) of
+do_resume(#{id := Id, inflight_tid := InflightTID} = Data) ->
+    case inflight_get_first(InflightTID) of
         empty ->
             retry_queue(Data);
         {Ref, FirstQuery} ->
             %% We retry msgs in inflight window sync, as if we send them
             %% async, they will be appended to the end of inflight window again.
-            retry_inflight_sync(Id, Ref, FirstQuery, Name, Data)
+            retry_inflight_sync(Id, Ref, FirstQuery, InflightTID, Data)
     end.
 
 retry_queue(
@@ -299,7 +298,7 @@ retry_queue(
         id := Id,
         index := Index,
         batch_size := 1,
-        name := Name,
+        inflight_tid := InflightTID,
         resume_interval := ResumeT
     } = Data0
 ) ->
@@ -308,7 +307,7 @@ retry_queue(
         empty ->
             {next_state, running, Data0};
         {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} ->
-            QueryOpts = #{inflight_name => Name},
+            QueryOpts = #{inflight_name => InflightTID},
             Result = call_query(configured, Id, Index, Query, QueryOpts),
             Reply = ?REPLY(undefined, Request, HasBeenSent, Result),
             case reply_caller(Id, Reply) of
@@ -327,7 +326,7 @@ retry_queue(
         id := Id,
         index := Index,
         batch_size := BatchSize,
-        name := Name,
+        inflight_tid := InflightTID,
         resume_interval := ResumeT
     } = Data0
 ) ->
@@ -336,7 +335,7 @@ retry_queue(
         empty ->
             {next_state, running, Data0};
         {Q1, QAckRef, Batch0} ->
-            QueryOpts = #{inflight_name => Name},
+            QueryOpts = #{inflight_name => InflightTID},
             Result = call_query(configured, Id, Index, Batch0, QueryOpts),
             %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue,
             %% we now change the 'from' field to 'undefined' so it will not reply the caller again.
@@ -361,7 +360,7 @@ retry_inflight_sync(
     Id,
     Ref,
     QueryOrBatch,
-    Name,
+    InflightTID,
     #{index := Index, resume_interval := ResumeT} = Data0
 ) ->
     QueryOpts = #{},
@@ -375,7 +374,7 @@ retry_inflight_sync(
             {keep_state, Data0, {state_timeout, ResumeT, resume}};
         %% Send ok or failed but the resource is working
         false ->
-            inflight_drop(Name, Ref, Id, Index),
+            inflight_drop(InflightTID, Ref, Id, Index),
             do_resume(Data0)
     end.
 
@@ -451,11 +450,11 @@ do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_que
     #{
         id := Id,
         index := Index,
-        name := Name
+        inflight_tid := InflightTID
     } = Data0,
     %% unwrap when not batching (i.e., batch size == 1)
     [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
-    QueryOpts = #{inflight_name => Name},
+    QueryOpts = #{inflight_name => InflightTID},
     Result = call_query(configured, Id, Index, Request, QueryOpts),
     IsAsync = is_async(Id),
     Data1 = cancel_flush_timer(Data0),
@@ -489,9 +488,9 @@ do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queu
         id := Id,
         index := Index,
         batch_size := BatchSize,
-        name := Name
+        inflight_tid := InflightTID
     } = Data0,
-    QueryOpts = #{inflight_name => Name},
+    QueryOpts = #{inflight_name => InflightTID},
     Result = call_query(configured, Id, Index, Batch, QueryOpts),
     IsAsync = is_async(Id),
     Data1 = cancel_flush_timer(Data0),
@@ -639,17 +638,17 @@ apply_query_fun(sync, Mod, Id, _Index, ?QUERY(_, Request, _) = _Query, ResSt, _Q
     ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
 apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
-    Name = maps:get(inflight_name, QueryOpts, undefined),
+    InflightTID = maps:get(inflight_name, QueryOpts, undefined),
     ?APPLY_RESOURCE(
         call_query_async,
-        case is_inflight_full(Name) of
+        case is_inflight_full(InflightTID) of
             true ->
                 {async_return, inflight_full};
             false ->
                 ReplyFun = fun ?MODULE:reply_after_query/7,
                 Ref = make_message_ref(),
-                Args = [self(), Id, Index, Name, Ref, Query],
-                ok = inflight_append(Name, Ref, Query, Id, Index),
+                Args = [self(), Id, Index, InflightTID, Ref, Query],
+                ok = inflight_append(InflightTID, Ref, Query, Id, Index),
                 Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt),
                 {async_return, Result}
         end,
@@ -661,25 +660,25 @@ apply_query_fun(sync, Mod, Id, _Index, [?QUERY(_, _, _) | _] = Batch, ResSt, _Qu
     ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
 apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, QueryOpts) ->
     ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}),
-    Name = maps:get(inflight_name, QueryOpts, undefined),
+    InflightTID = maps:get(inflight_name, QueryOpts, undefined),
     ?APPLY_RESOURCE(
         call_batch_query_async,
-        case is_inflight_full(Name) of
+        case is_inflight_full(InflightTID) of
             true ->
                 {async_return, inflight_full};
             false ->
                 ReplyFun = fun ?MODULE:batch_reply_after_query/7,
                 Ref = make_message_ref(),
-                ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]},
+                ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]},
                 Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
-                ok = inflight_append(Name, Ref, Batch, Id, Index),
+                ok = inflight_append(InflightTID, Ref, Batch, Id, Index),
                 Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt),
                 {async_return, Result}
         end,
         Batch
     ).
 
-reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), Result) ->
+reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), Result) ->
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
@@ -687,10 +686,10 @@ reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent),
         true ->
             ?MODULE:block(Pid);
         false ->
-            drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
+            drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index)
     end.
 
-batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
+batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) ->
     %% NOTE: 'inflight' is the count of messages that were sent async
     %% but received no ACK, NOT the number of messages queued in the
     %% inflight window.
@@ -698,16 +697,16 @@ batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) ->
         true ->
             ?MODULE:block(Pid);
         false ->
-            drop_inflight_and_resume(Pid, Name, Ref, Id, Index)
+            drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index)
     end.
 
-drop_inflight_and_resume(Pid, Name, Ref, Id, Index) ->
-    case is_inflight_full(Name) of
+drop_inflight_and_resume(Pid, InflightTID, Ref, Id, Index) ->
+    case is_inflight_full(InflightTID) of
         true ->
-            inflight_drop(Name, Ref, Id, Index),
+            inflight_drop(InflightTID, Ref, Id, Index),
             ?MODULE:resume(Pid);
         false ->
-            inflight_drop(Name, Ref, Id, Index)
+            inflight_drop(InflightTID, Ref, Id, Index)
     end.
 
 %%==============================================================================
@@ -757,82 +756,85 @@ get_first_n_from_queue(Q, N) ->
 %% the inflight queue for async query
 -define(MAX_SIZE_REF, -1).
 -define(SIZE_REF, -2).
-inflight_new(Name, InfltWinSZ, Id, Index) ->
-    _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]),
-    inflight_append(Name, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
+inflight_new(InfltWinSZ, Id, Index) ->
+    TableId = ets:new(
+        emqx_resource_worker_inflight_tab,
+        [ordered_set, public, {write_concurrency, true}]
+    ),
+    inflight_append(TableId, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index),
     %% we use this counter because we might deal with batches as
     %% elements.
-    inflight_append(Name, ?SIZE_REF, 0, Id, Index),
-    ok.
+    inflight_append(TableId, ?SIZE_REF, 0, Id, Index),
+    {ok, TableId}.
 
-inflight_get_first(Name) ->
-    case ets:next(Name, ?MAX_SIZE_REF) of
+inflight_get_first(InflightTID) ->
+    case ets:next(InflightTID, ?MAX_SIZE_REF) of
         '$end_of_table' ->
             empty;
         Ref ->
-            case ets:lookup(Name, Ref) of
+            case ets:lookup(InflightTID, Ref) of
                 [Object] ->
                     Object;
                 [] ->
                     %% it might have been dropped
-                    inflight_get_first(Name)
+                    inflight_get_first(InflightTID)
             end
     end.
 
 is_inflight_full(undefined) ->
     false;
-is_inflight_full(Name) ->
-    [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?MAX_SIZE_REF),
+is_inflight_full(InflightTID) ->
+    [{_, {max_size, MaxSize}}] = ets:lookup(InflightTID, ?MAX_SIZE_REF),
     %% we consider number of batches rather than number of messages
     %% because one batch request may hold several messages.
-    Size = inflight_num_batches(Name),
+    Size = inflight_num_batches(InflightTID),
     Size >= MaxSize.
 
-inflight_num_batches(Name) ->
+inflight_num_batches(InflightTID) ->
     %% Note: we subtract 2 because there're 2 metadata rows that hold
     %% the maximum size value and the number of messages.
     MetadataRowCount = 2,
-    case ets:info(Name, size) of
+    case ets:info(InflightTID, size) of
         undefined -> 0;
         Size -> max(0, Size - MetadataRowCount)
     end.
 
-inflight_num_msgs(Name) ->
-    [{_, Size}] = ets:lookup(Name, ?SIZE_REF),
+inflight_num_msgs(InflightTID) ->
+    [{_, Size}] = ets:lookup(InflightTID, ?SIZE_REF),
     Size.
 
 inflight_append(undefined, _Ref, _Query, _Id, _Index) ->
     ok;
-inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
+inflight_append(InflightTID, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) ->
     Batch = mark_as_sent(Batch0),
-    ets:insert(Name, {Ref, Batch}),
+    ets:insert(InflightTID, {Ref, Batch}),
     BatchSize = length(Batch),
-    ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
+    ets:update_counter(InflightTID, ?SIZE_REF, {2, BatchSize}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ok;
-inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
+inflight_append(InflightTID, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) ->
     Query = mark_as_sent(Query0),
-    ets:insert(Name, {Ref, Query}),
-    ets:update_counter(Name, ?SIZE_REF, {2, 1}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
+    ets:insert(InflightTID, {Ref, Query}),
+    ets:update_counter(InflightTID, ?SIZE_REF, {2, 1}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ok;
-inflight_append(Name, Ref, Data, _Id, _Index) ->
-    ets:insert(Name, {Ref, Data}),
+inflight_append(InflightTID, Ref, Data, _Id, _Index) ->
+    ets:insert(InflightTID, {Ref, Data}),
     %% this is a metadata row being inserted; therefore, we don't bump
     %% the inflight metric.
     ok.
 
 inflight_drop(undefined, _, _Id, _Index) ->
     ok;
-inflight_drop(Name, Ref, Id, Index) ->
+inflight_drop(InflightTID, Ref, Id, Index) ->
     Count =
-        case ets:take(Name, Ref) of
+        case ets:take(InflightTID, Ref) of
             [{Ref, ?QUERY(_, _, _)}] -> 1;
             [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch);
             _ -> 0
         end,
-    Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}),
-    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)),
+    Count > 0 andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
+    emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
     ok.
 
 %%==============================================================================
@@ -868,13 +870,6 @@ assert_ok_result(R) ->
 queue_count(Q) ->
     replayq:count(Q).
 
--spec name(id(), integer()) -> atom().
-name(Id, Index) ->
-    Mod = atom_to_list(?MODULE),
-    Id1 = binary_to_list(Id),
-    Index1 = integer_to_list(Index),
-    list_to_atom(lists:concat([Mod, ":", Id1, ":", Index1])).
-
 disk_queue_dir(Id, Index) ->
     QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
     filename:join([emqx:data_dir(), "resource_worker", node(), QDir]).

+ 1 - 0
changes/v5.0.14/fix-8583.en.md

@@ -0,0 +1 @@
+Potential leaks of atoms that could lead to a crash if a lot of resources were created have been removed.

+ 1 - 0
changes/v5.0.14/fix-8583.zh.md

@@ -0,0 +1 @@
+如果创建了大量的资源,可能会导致崩溃的潜在的原子泄漏已经被删除。