|
|
@@ -18,6 +18,7 @@
|
|
|
-module(emqx_modules_conf).
|
|
|
|
|
|
-behaviour(emqx_config_handler).
|
|
|
+-behaviour(emqx_config_backup).
|
|
|
|
|
|
%% Load/Unload
|
|
|
-export([
|
|
|
@@ -37,6 +38,11 @@
|
|
|
post_config_update/5
|
|
|
]).
|
|
|
|
|
|
+%% Data backup
|
|
|
+-export([
|
|
|
+ import_config/1
|
|
|
+]).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Load/Unload
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -78,6 +84,20 @@ remove_topic_metrics(Topic) ->
|
|
|
{error, Reason} -> {error, Reason}
|
|
|
end.
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% Data backup (Topic-Metrics)
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+import_config(#{<<"topic_metrics">> := Topics}) ->
|
|
|
+ case emqx_conf:update([topic_metrics], {merge_topics, Topics}, #{override_to => cluster}) of
|
|
|
+ {ok, _} ->
|
|
|
+ {ok, #{root_key => topic_metrics, changed => []}};
|
|
|
+ Error ->
|
|
|
+ {error, #{root_key => topic_metrics, reason => Error}}
|
|
|
+ end;
|
|
|
+import_config(_RawConf) ->
|
|
|
+ {ok, #{root_key => topic_metrics, changed => []}}.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Config Handler
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -103,7 +123,13 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
|
|
|
{ok, RawConf -- [Topic]};
|
|
|
_ ->
|
|
|
{error, not_found}
|
|
|
- end.
|
|
|
+ end;
|
|
|
+pre_config_update(_, {merge_topics, NewConf}, OldConf) ->
|
|
|
+ KeyFun = fun(#{<<"topic">> := T}) -> T end,
|
|
|
+ MergedConf = emqx_utils:merge_lists(OldConf, NewConf, KeyFun),
|
|
|
+ {ok, MergedConf};
|
|
|
+pre_config_update(_, NewConf, _OldConf) ->
|
|
|
+ {ok, NewConf}.
|
|
|
|
|
|
-spec post_config_update(
|
|
|
list(atom()),
|
|
|
@@ -113,7 +139,6 @@ pre_config_update(_, {remove_topic_metrics, Topic0}, RawConf) ->
|
|
|
emqx_config:app_envs()
|
|
|
) ->
|
|
|
ok | {ok, Result :: any()} | {error, Reason :: term()}.
|
|
|
-
|
|
|
post_config_update(
|
|
|
_,
|
|
|
{add_topic_metrics, Topic},
|
|
|
@@ -135,6 +160,19 @@ post_config_update(
|
|
|
case emqx_topic_metrics:deregister(Topic) of
|
|
|
ok -> ok;
|
|
|
{error, Reason} -> {error, Reason}
|
|
|
+ end;
|
|
|
+post_config_update(_, _UpdateReq, NewConfig, OldConfig, _AppEnvs) ->
|
|
|
+ #{
|
|
|
+ removed := Removed,
|
|
|
+ added := Added
|
|
|
+ } = emqx_utils:diff_lists(NewConfig, OldConfig, fun(#{topic := T}) -> T end),
|
|
|
+ Deregistered = [emqx_topic_metrics:deregister(T) || #{topic := T} <- Removed],
|
|
|
+ Registered = [emqx_topic_metrics:register(T) || #{topic := T} <- Added],
|
|
|
+ DeregisteredErrs = [Res || Res <- Deregistered, Res =/= ok, Res =/= {error, topic_not_found}],
|
|
|
+ RegisteredErrs = [Res || Res <- Registered, Res =/= ok, Res =/= {error, already_existed}],
|
|
|
+ case DeregisteredErrs ++ RegisteredErrs of
|
|
|
+ [] -> ok;
|
|
|
+ Errs -> {error, Errs}
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|