Просмотр исходного кода

Merge pull request #6578 from lafirest/fix/retainer_config

fix(emqx_retainer): fix config update error
lafirest 4 лет назад
Родитель
Сommit
922fc0ad9f

+ 16 - 10
apps/emqx_retainer/src/emqx_retainer.erl

@@ -36,7 +36,8 @@
         , update_config/1
         , update_config/1
         , clean/0
         , clean/0
         , delete/1
         , delete/1
-        , page_read/3]).
+        , page_read/3
+        , post_config_update/5]).
 
 
 %% gen_server callbacks
 %% gen_server callbacks
 -export([ init/1
 -export([ init/1
@@ -165,24 +166,31 @@ get_expiry_time(#message{timestamp = Ts}) ->
 get_stop_publish_clear_msg() ->
 get_stop_publish_clear_msg() ->
     emqx_conf:get([?APP, stop_publish_clear_msg], false).
     emqx_conf:get([?APP, stop_publish_clear_msg], false).
 
 
--spec update_config(hocon:config()) -> ok.
+-spec update_config(hocon:config()) -> {ok, _} | {error, _}.
 update_config(Conf) ->
 update_config(Conf) ->
-    gen_server:call(?MODULE, {?FUNCTION_NAME, Conf}).
+    emqx_conf:update([emqx_retainer], Conf, #{override_to => cluster}).
 
 
 clean() ->
 clean() ->
-    gen_server:call(?MODULE, ?FUNCTION_NAME).
+    call(?FUNCTION_NAME).
 
 
 delete(Topic) ->
 delete(Topic) ->
-    gen_server:call(?MODULE, {?FUNCTION_NAME, Topic}).
+    call({?FUNCTION_NAME, Topic}).
 
 
 page_read(Topic, Page, Limit) ->
 page_read(Topic, Page, Limit) ->
-    gen_server:call(?MODULE, {?FUNCTION_NAME, Topic, Page, Limit}).
+    call({?FUNCTION_NAME, Topic, Page, Limit}).
+
+post_config_update(_, _UpdateReq, NewConf, OldConf, _AppEnvs) ->
+    call({update_config, NewConf, OldConf}).
+
+call(Req) ->
+    gen_server:call(?MODULE, Req, infinity).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 init([]) ->
 init([]) ->
+    emqx_conf:add_handler([emqx_retainer], ?MODULE),
     init_shared_context(),
     init_shared_context(),
     State = new_state(),
     State = new_state(),
     #{enable := Enable} = Cfg = emqx:get_config([?APP]),
     #{enable := Enable} = Cfg = emqx:get_config([?APP]),
@@ -194,9 +202,7 @@ init([]) ->
              State
              State
      end}.
      end}.
 
 
-handle_call({update_config, Conf}, _, State) ->
-    OldConf = emqx:get_config([?APP]),
-    {ok, #{config := NewConf}} = emqx:update_config([?APP], Conf),
+handle_call({update_config, NewConf, OldConf}, _, State) ->
     State2 = update_config(State, NewConf, OldConf),
     State2 = update_config(State, NewConf, OldConf),
     {reply, ok, State2};
     {reply, ok, State2};
 
 
@@ -326,7 +332,7 @@ require_semaphore(Semaphore, Id) ->
 
 
 -spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean().
 -spec wait_semaphore(non_neg_integer(), pos_integer()) -> boolean().
 wait_semaphore(X, Id) when X < 0 ->
 wait_semaphore(X, Id) when X < 0 ->
-    gen_server:call(?MODULE, {?FUNCTION_NAME, Id}, infinity);
+    call({?FUNCTION_NAME, Id});
 wait_semaphore(_, _) ->
 wait_semaphore(_, _) ->
     true.
     true.
 
 

+ 1 - 1
apps/emqx_retainer/src/emqx_retainer_api.erl

@@ -130,7 +130,7 @@ config(get, _) ->
 
 
 config(put, #{body := Body}) ->
 config(put, #{body := Body}) ->
     try
     try
-        ok = emqx_retainer:update_config(Body),
+        {ok, _} = emqx_retainer:update_config(Body),
         {200,  emqx:get_raw_config([emqx_retainer])}
         {200,  emqx:get_raw_config([emqx_retainer])}
     catch _:Reason:_ ->
     catch _:Reason:_ ->
             {400,
             {400,

+ 27 - 0
apps/emqx_retainer/test/emqx_retainer_SUITE.erl

@@ -20,6 +20,7 @@
 -compile(nowarn_export_all).
 -compile(nowarn_export_all).
 
 
 -define(APP, emqx_retainer).
 -define(APP, emqx_retainer).
+-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
@@ -49,13 +50,39 @@ emqx_retainer {
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 init_per_suite(Config) ->
 init_per_suite(Config) ->
+    application:load(emqx_conf),
+    ok = ekka:start(),
+    ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
+    meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
+    meck:expect(emqx_alarm, activate, 3, ok),
+    meck:expect(emqx_alarm, deactivate, 3, ok),
+
     ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
     ok = emqx_config:init_load(emqx_retainer_schema, ?BASE_CONF),
     emqx_common_test_helpers:start_apps([emqx_retainer]),
     emqx_common_test_helpers:start_apps([emqx_retainer]),
     Config.
     Config.
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
+    ekka:stop(),
+    mria:stop(),
+    mria_mnesia:delete_schema(),
+    meck:unload(emqx_alarm),
+
     emqx_common_test_helpers:stop_apps([emqx_retainer]).
     emqx_common_test_helpers:stop_apps([emqx_retainer]).
 
 
+init_per_testcase(_, Config) ->
+    {ok, _} = emqx_cluster_rpc:start_link(),
+    timer:sleep(200),
+    Config.
+
+end_per_testcase(_, Config) ->
+    case erlang:whereis(node()) of
+        undefined -> ok;
+        P ->
+            erlang:unlink(P),
+            erlang:exit(P, kill)
+    end,
+    Config.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Test Cases
 %% Test Cases
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------