Просмотр исходного кода

Merge pull request #6580 from lafirest/fix/slow_subs_config

fix(emqx_slow_subs): fix config update error
lafirest 4 лет назад
Родитель
Сommit
88bd6e9629

+ 10 - 4
apps/emqx_slow_subs/src/emqx_slow_subs.erl

@@ -23,7 +23,7 @@
 -include_lib("emqx_slow_subs/include/emqx_slow_subs.hrl").
 
 -export([ start_link/0, on_stats_update/2, update_settings/1
-        , clear_history/0, init_topk_tab/0
+        , clear_history/0, init_topk_tab/0, post_config_update/5
         ]).
 
 %% gen_server callbacks
@@ -123,8 +123,8 @@ on_stats_update(#{clientid := ClientId,
 clear_history() ->
     gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT).
 
-update_settings(Enable) ->
-    gen_server:call(?MODULE, {?FUNCTION_NAME, Enable}, ?DEF_CALL_TIMEOUT).
+update_settings(Conf) ->
+    emqx_conf:update([emqx_slow_subs], Conf, #{override_to => cluster}).
 
 init_topk_tab() ->
     case ets:whereis(?TOPK_TAB) of
@@ -138,11 +138,16 @@ init_topk_tab() ->
             ?TOPK_TAB
     end.
 
+post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
+    gen_server:call(?MODULE, {update_settings, NewConf}, ?DEF_CALL_TIMEOUT).
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
 init([]) ->
+    emqx_conf:add_handler([emqx_slow_subs], ?MODULE),
+
     InitState = #{enable => false,
                   last_tick_at => 0,
                   expire_timer => undefined,
@@ -152,7 +157,8 @@ init([]) ->
     Enable = emqx:get_config([emqx_slow_subs, enable]),
     {ok, check_enable(Enable, InitState)}.
 
-handle_call({update_settings, Enable}, _From, State) ->
+handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
+    emqx_config:put([emqx_slow_subs], Conf),
     State2 = check_enable(Enable, State),
     {reply, ok, State2};
 

+ 1 - 2
apps/emqx_slow_subs/src/emqx_slow_subs_api.erl

@@ -107,6 +107,5 @@ settings(get, _) ->
     {200, emqx:get_raw_config([?APP_NAME], #{})};
 
 settings(put, #{body := Body}) ->
-    {ok, #{config := #{enable := Enable}}} = emqx:update_config([?APP], Body),
-    _ = emqx_slow_subs:update_settings(Enable),
+    _ = emqx_slow_subs:update_settings(Body),
     {200, emqx:get_raw_config([?APP_NAME], #{})}.

+ 22 - 0
apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl

@@ -32,6 +32,7 @@
 
 -define(BASE_PATH, "api").
 -define(NOW, erlang:system_time(millisecond)).
+-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
 
 -define(CONF_DEFAULT, <<"""
 emqx_slow_subs
@@ -49,23 +50,42 @@ all() ->
     emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
+    application:load(emqx_conf),
+    ok = ekka:start(),
+    ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
+    meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
+    meck:expect(emqx_alarm, activate, 3, ok),
+    meck:expect(emqx_alarm, deactivate, 3, ok),
+
     ok = emqx_config:init_load(emqx_slow_subs_schema, ?CONF_DEFAULT),
     emqx_mgmt_api_test_util:init_suite([emqx_slow_subs]),
     {ok, _} = application:ensure_all_started(emqx_authn),
     Config.
 
 end_per_suite(Config) ->
+    ekka:stop(),
+    mria:stop(),
+    mria_mnesia:delete_schema(),
+    meck:unload(emqx_alarm),
+
     application:stop(emqx_authn),
     emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]),
     Config.
 
 init_per_testcase(_, Config) ->
+    {ok, _} = emqx_cluster_rpc:start_link(),
     application:ensure_all_started(emqx_slow_subs),
     timer:sleep(500),
     Config.
 
 end_per_testcase(_, Config) ->
     application:stop(emqx_slow_subs),
+    case erlang:whereis(node()) of
+        undefined -> ok;
+        P ->
+            erlang:unlink(P),
+            erlang:exit(P, kill)
+    end,
     Config.
 
 t_get_history(_) ->
@@ -119,6 +139,8 @@ t_settting(_) ->
                                 auth_header_()
                             ),
 
+    timer:sleep(1000),
+
     GetReturn = decode_json(GetData),
 
     ?assertEqual(Conf2, GetReturn),