Преглед изворни кода

Merge pull request #7651 from HJianBo/hot-confs-sys-topics-limiter

Add APIs for System Topics
zhongwencool пре 3 година
родитељ
комит
ae005f6e69

+ 43 - 7
apps/emqx/src/emqx_sys.erl

@@ -52,6 +52,8 @@
     on_client_unsubscribed/3
 ]).
 
+-export([post_config_update/5]).
+
 -ifdef(TEST).
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -82,6 +84,8 @@
     ]
 ).
 
+-define(CONF_KEY_PATH, [sys_topics]).
+
 %%--------------------------------------------------------------------
 %% APIs
 %%--------------------------------------------------------------------
@@ -124,7 +128,7 @@ sys_heatbeat_interval() ->
     emqx:get_config([sys_topics, sys_heartbeat_interval]).
 
 sys_event_messages() ->
-    emqx:get_config([sys_topics, sys_event_messages]).
+    maps:to_list(emqx:get_config([sys_topics, sys_event_messages])).
 
 %% @doc Get sys info
 -spec info() -> list(tuple()).
@@ -136,13 +140,40 @@ info() ->
         {datetime, datetime()}
     ].
 
+%% Update the confgs at runtime
+post_config_update(_, _Req, NewSysConf, OldSysConf, _AppEnvs) ->
+    {Added, Removed} = diff_hooks(NewSysConf, OldSysConf),
+    unload_event_hooks(Removed),
+    load_event_hooks(Added).
+
+diff_hooks(NewSysConf, OldSysConf) ->
+    NewEvents = maps:to_list(maps:get(sys_event_messages, NewSysConf, #{})),
+    OldEvents = maps:to_list(maps:get(sys_event_messages, OldSysConf, #{})),
+    diff_hooks(NewEvents, OldEvents, [], []).
+
+diff_hooks([], [], Added, Removed) ->
+    {lists:reverse(Added), lists:reverse(Removed)};
+diff_hooks([H | T1], [H | T2], Added, Removed) ->
+    diff_hooks(T1, T2, Added, Removed);
+diff_hooks(
+    [New = {EventName, NewEnable} | T1],
+    [Old = {EventName, OldEnable} | T2],
+    Added,
+    Removed
+) ->
+    case {NewEnable, OldEnable} of
+        {true, false} -> diff_hooks(T1, T2, [New | Added], Removed);
+        {false, true} -> diff_hooks(T1, T2, Added, [Old | Removed])
+    end.
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------
 
 init([]) ->
+    ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
     State = #state{sysdescr = iolist_to_binary(sysdescr())},
-    load_event_hooks(),
+    load_event_hooks(sys_event_messages()),
     {ok, heartbeat(tick(State))}.
 
 heartbeat(State) ->
@@ -150,7 +181,9 @@ heartbeat(State) ->
 tick(State) ->
     State#state{ticker = start_timer(sys_interval(), tick)}.
 
-load_event_hooks() ->
+load_event_hooks([]) ->
+    ok;
+load_event_hooks(Events) ->
     lists:foreach(
         fun
             ({_, false}) ->
@@ -159,7 +192,7 @@ load_event_hooks() ->
                 {HookPoint, Fun} = hook_and_fun(K),
                 emqx_hooks:put(HookPoint, {?MODULE, Fun, []})
         end,
-        maps:to_list(sys_event_messages())
+        Events
     ).
 
 handle_call(Req, _From, State) ->
@@ -186,16 +219,19 @@ handle_info(Info, State) ->
     {noreply, State}.
 
 terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
-    unload_event_hooks(),
+    _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
+    unload_event_hooks(sys_event_messages()),
     lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
 
-unload_event_hooks() ->
+unload_event_hooks([]) ->
+    ok;
+unload_event_hooks(Event) ->
     lists:foreach(
         fun({K, _}) ->
             {HookPoint, Fun} = hook_and_fun(K),
             emqx_hooks:del(HookPoint, {?MODULE, Fun})
         end,
-        maps:to_list(sys_event_messages())
+        Event
     ).
 
 %%--------------------------------------------------------------------

+ 5 - 2
apps/emqx/test/props/prop_emqx_sys.erl

@@ -31,7 +31,8 @@
     emqx_stats,
     emqx_broker,
     mria_mnesia,
-    emqx_hooks
+    emqx_hooks,
+    emqx_config_handler
 ]).
 
 -define(ALL(Vars, Types, Exprs),
@@ -114,7 +115,9 @@ do_mock(emqx_metrics) ->
     meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end);
 do_mock(emqx_hooks) ->
     meck:expect(emqx_hooks, put, fun(_HookPoint, _MFA) -> ok end),
-    meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end).
+    meck:expect(emqx_hooks, del, fun(_HookPoint, _MF) -> ok end);
+do_mock(emqx_config_handler) ->
+    meck:expect(emqx_config_handler, add_handler, fun(_, _) -> ok end).
 
 %%--------------------------------------------------------------------
 %% MODEL

+ 2 - 1
apps/emqx_management/src/emqx_mgmt_api_configs.erl

@@ -60,7 +60,8 @@
         <<"delayed">>,
         <<"event_message">>,
         <<"prometheus">>,
-        <<"telemetry">>
+        <<"telemetry">>,
+        <<"sys_topics">>
     ] ++ global_zone_roots()
 ).
 

+ 90 - 0
apps/emqx_management/src/emqx_mgmt_api_sys.erl

@@ -0,0 +1,90 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_mgmt_api_sys).
+
+-behaviour(minirest_api).
+
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("typerefl/include/types.hrl").
+
+%% API
+-export([ api_spec/0
+        , paths/0
+        , schema/1
+        , namespace/0
+        ]).
+
+-export([ sys/2
+        ]).
+
+-define(TAGS, [<<"sys">>]).
+
+namespace() -> "sys".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    ["/mqtt/sys_topics"].
+
+sys(get, _Params) ->
+    {200, emqx_conf:get_raw([sys_topics], #{})};
+sys(put, #{body := Body}) ->
+    {ok, _} = emqx_conf:update([sys_topics], Body, #{override_to => cluster}),
+    {200, emqx_conf:get_raw([sys_topics], #{})}.
+
+%%--------------------------------------------------------------------
+%% Swagger defines
+%%--------------------------------------------------------------------
+
+schema("/mqtt/sys_topics") ->
+    #{
+        'operationId' => sys,
+        get =>
+            #{
+                tags => ?TAGS,
+                description => <<"Get System Topics config">>,
+                responses =>
+                    #{
+                        200 => schema_sys_topics()
+                     }
+             },
+        put =>
+            #{
+                tags => ?TAGS,
+                description => <<"Update System Topics config">>,
+                'requestBody' => schema_sys_topics(),
+                responses =>
+                    #{
+                        200 => schema_sys_topics()
+                     }
+             }
+     }.
+
+schema_sys_topics() ->
+    emqx_dashboard_swagger:schema_with_example(
+      hoconsc:ref(emqx_schema, "sys_topics"), example_sys_topics()).
+
+example_sys_topics() ->
+    #{<<"sys_event_messages">> =>
+      #{<<"client_connected">> => true,
+        <<"client_disconnected">> => true,
+        <<"client_subscribed">> => false,
+        <<"client_unsubscribed">> => false},
+      <<"sys_heartbeat_interval">> => <<"30s">>,
+      <<"sys_msg_interval">> => <<"1m">>
+     }.

+ 66 - 0
apps/emqx_management/test/emqx_mgmt_api_sys_SUITE.erl

@@ -0,0 +1,66 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_sys_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_mgmt_api_test_util:init_suite([emqx_conf]),
+    Config.
+
+end_per_suite(_) ->
+    emqx_mgmt_api_test_util:end_suite([emqx_conf]).
+
+t_get_put(_) ->
+    {ok, Default} = get_sys_topics_config(),
+    ?assertEqual(
+       #{<<"sys_event_messages">> =>
+         #{<<"client_connected">> => true,
+           <<"client_disconnected">> => true,
+           <<"client_subscribed">> => false,
+           <<"client_unsubscribed">> => false
+          },
+         <<"sys_heartbeat_interval">> => <<"30s">>,
+         <<"sys_msg_interval">> => <<"1m">>}, Default),
+
+   NConfig = Default#{
+               <<"sys_msg_interval">> => <<"4m">>,
+               <<"sys_event_messages">> => #{<<"client_subscribed">> => false}
+              },
+   {ok, ConfigResp} = put_sys_topics_config(NConfig),
+   ?assertEqual(NConfig, ConfigResp),
+   {ok, Default} = put_sys_topics_config(Default).
+
+get_sys_topics_config() ->
+    Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]),
+    case emqx_mgmt_api_test_util:request_api(get, Path) of
+        {ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])};
+        Error -> Error
+    end.
+
+put_sys_topics_config(Config) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Path = emqx_mgmt_api_test_util:api_path(["mqtt", "sys_topics"]),
+    case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, Config) of
+        {ok, Conf0} -> {ok, emqx_json:decode(Conf0, [return_maps])};
+        Error -> Error
+    end.