瀏覽代碼

fix: set load config done after update tnx_id

zhongwencool 2 年之前
父節點
當前提交
f7513b900a
共有 3 個文件被更改,包括 29 次插入37 次删除
  1. 10 19
      apps/emqx_conf/src/emqx_cluster_rpc.erl
  2. 14 13
      apps/emqx_conf/src/emqx_conf_app.erl
  3. 5 5
      apps/emqx_conf/src/emqx_conf_sup.erl

+ 10 - 19
apps/emqx_conf/src/emqx_cluster_rpc.erl

@@ -17,7 +17,7 @@
 -behaviour(gen_server).
 
 %% API
--export([start_link/1, mnesia/1]).
+-export([start_link/0, mnesia/1]).
 
 %% Note: multicall functions are statically checked by
 %% `emqx_bapi_trans' and `emqx_bpapi_static_checks' modules. Don't
@@ -30,7 +30,8 @@
     skip_failed_commit/1,
     fast_forward_to_commit/2,
     on_mria_stop/1,
-    wait_for_cluster_rpc/0
+    wait_for_cluster_rpc/0,
+    maybe_init_tnx_id/2
 ]).
 -export([
     commit/2,
@@ -69,12 +70,6 @@
 -compile(export_all).
 -compile(nowarn_export_all).
 
-start_link() ->
-    start_link(-1).
-
-start_link(Node, Name, RetryMs) ->
-    start_link(-1, Node, Name, RetryMs).
-
 -endif.
 
 -define(INITIATE(MFA), {initiate, MFA}).
@@ -115,11 +110,11 @@ mnesia(boot) ->
         {attributes, record_info(fields, cluster_rpc_commit)}
     ]).
 
-start_link(TnxId) ->
-    start_link(TnxId, node(), ?MODULE, get_retry_ms()).
+start_link() ->
+    start_link(node(), ?MODULE, get_retry_ms()).
 
-start_link(TnxId, Node, Name, RetryMs) ->
-    case gen_server:start_link({local, Name}, ?MODULE, [TnxId, Node, RetryMs], []) of
+start_link(Node, Name, RetryMs) ->
+    case gen_server:start_link({local, Name}, ?MODULE, [Node, RetryMs], []) of
         {ok, Pid} ->
             {ok, Pid};
         {error, {already_started, Pid}} ->
@@ -303,26 +298,22 @@ wait_for_cluster_rpc() ->
 %%%===================================================================
 
 %% @private
-init([TnxId, Node, RetryMs]) ->
+init([Node, RetryMs]) ->
     register_mria_stop_cb(fun ?MODULE:on_mria_stop/1),
     {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
     State = #{node => Node, retry_interval => RetryMs, is_leaving => false},
     %% Now continue with the normal catch-up process
     %% That is: apply the missing transactions after the config
     %% was copied until now.
-    {ok, State, {continue, {?CATCH_UP, TnxId}}}.
+    {ok, State, {continue, {?CATCH_UP, init}}}.
 
 %% @private
-handle_continue({?CATCH_UP, TnxId}, State = #{node := Node}) ->
+handle_continue({?CATCH_UP, init}, State) ->
     %% emqx app must be started before
     %% trying to catch up the rpc commit logs
     ok = wait_for_emqx_ready(),
     ok = wait_for_cluster_rpc(),
-    %% The init transaction ID is set in emqx_conf_app after
-    %% it has fetched the latest config from one of the core nodes
-    ok = maybe_init_tnx_id(Node, TnxId),
     {noreply, State, catch_up(State)};
-%% @private
 handle_continue(?CATCH_UP, State) ->
     {noreply, State, catch_up(State)}.
 

+ 14 - 13
apps/emqx_conf/src/emqx_conf_app.erl

@@ -31,17 +31,16 @@
 -define(DEFAULT_INIT_TXN_ID, -1).
 
 start(_StartType, _StartArgs) ->
-    {ok, TnxId} =
-        try
-            {ok, _} = init_conf()
-        catch
-            C:E:St ->
-                %% logger is not quite ready.
-                io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
-                init:stop(1)
-        end,
+    try
+        ok = init_conf()
+    catch
+        C:E:St ->
+            %% logger is not quite ready.
+            io:format(standard_error, "Failed to load config~n~p~n~p~n~p~n", [C, E, St]),
+            init:stop(1)
+    end,
     ok = emqx_config_logger:refresh_config(),
-    emqx_conf_sup:start_link(TnxId).
+    emqx_conf_sup:start_link().
 
 stop(_State) ->
     ok.
@@ -94,10 +93,12 @@ sync_data_from_node() ->
 %% Internal functions
 %% ------------------------------------------------------------------------------
 
-init_load() ->
+init_load(TnxId) ->
     case emqx_app:get_config_loader() of
         Module when Module == emqx; Module == emqx_conf ->
             ok = emqx_config:init_load(emqx_conf:schema_module()),
+            %% Set load config done after update(init) tnx_id.
+            ok = emqx_cluster_rpc:maybe_init_tnx_id(node(), TnxId),
             ok = emqx_app:set_config_loader(emqx_conf),
             ok;
         Module ->
@@ -115,8 +116,8 @@ init_load_done() ->
 init_conf() ->
     emqx_cluster_rpc:wait_for_cluster_rpc(),
     {ok, TnxId} = sync_cluster_conf(),
-    ok = init_load(),
-    {ok, TnxId}.
+    ok = init_load(TnxId),
+    ok.
 
 cluster_nodes() ->
     mria:cluster_nodes(cores) -- [node()].

+ 5 - 5
apps/emqx_conf/src/emqx_conf_sup.erl

@@ -18,16 +18,16 @@
 
 -behaviour(supervisor).
 
--export([start_link/1]).
+-export([start_link/0]).
 
 -export([init/1]).
 
 -define(SERVER, ?MODULE).
 
-start_link(TnxId) ->
-    supervisor:start_link({local, ?SERVER}, ?MODULE, [TnxId]).
+start_link() ->
+    supervisor:start_link({local, ?SERVER}, ?MODULE, []).
 
-init([TnxId]) ->
+init([]) ->
     SupFlags = #{
         strategy => one_for_all,
         intensity => 10,
@@ -35,7 +35,7 @@ init([TnxId]) ->
     },
     ChildSpecs =
         [
-            child_spec(emqx_cluster_rpc, [TnxId]),
+            child_spec(emqx_cluster_rpc, []),
             child_spec(emqx_cluster_rpc_cleaner, [])
         ],
     {ok, {SupFlags, ChildSpecs}}.