|
@@ -25,6 +25,7 @@
|
|
|
-define(HOOK_POINT, 'client.connected').
|
|
-define(HOOK_POINT, 'client.connected').
|
|
|
|
|
|
|
|
-define(MAX_AUTO_SUBSCRIBE, 20).
|
|
-define(MAX_AUTO_SUBSCRIBE, 20).
|
|
|
|
|
+-define(ROOT_KEY, auto_subscribe).
|
|
|
|
|
|
|
|
-export([load/0, unload/0]).
|
|
-export([load/0, unload/0]).
|
|
|
|
|
|
|
@@ -47,22 +48,22 @@
|
|
|
]).
|
|
]).
|
|
|
|
|
|
|
|
load() ->
|
|
load() ->
|
|
|
- ok = emqx_conf:add_handler([auto_subscribe], ?MODULE),
|
|
|
|
|
|
|
+ ok = emqx_conf:add_handler([?ROOT_KEY], ?MODULE),
|
|
|
update_hook().
|
|
update_hook().
|
|
|
|
|
|
|
|
unload() ->
|
|
unload() ->
|
|
|
- emqx_conf:remove_handler([auto_subscribe]).
|
|
|
|
|
|
|
+ emqx_conf:remove_handler([?ROOT_KEY]).
|
|
|
|
|
|
|
|
max_limit() ->
|
|
max_limit() ->
|
|
|
?MAX_AUTO_SUBSCRIBE.
|
|
?MAX_AUTO_SUBSCRIBE.
|
|
|
|
|
|
|
|
list() ->
|
|
list() ->
|
|
|
- format(emqx_conf:get([auto_subscribe, topics], [])).
|
|
|
|
|
|
|
+ format(emqx_conf:get([?ROOT_KEY, topics], [])).
|
|
|
|
|
|
|
|
update(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
|
update(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
|
|
case
|
|
case
|
|
|
emqx_conf:update(
|
|
emqx_conf:update(
|
|
|
- [auto_subscribe],
|
|
|
|
|
|
|
+ [?ROOT_KEY],
|
|
|
#{<<"topics">> => Topics},
|
|
#{<<"topics">> => Topics},
|
|
|
#{rawconf_with_defaults => true, override_to => cluster}
|
|
#{rawconf_with_defaults => true, override_to => cluster}
|
|
|
)
|
|
)
|
|
@@ -75,7 +76,7 @@ update(Topics) when length(Topics) =< ?MAX_AUTO_SUBSCRIBE ->
|
|
|
update(_Topics) ->
|
|
update(_Topics) ->
|
|
|
{error, quota_exceeded}.
|
|
{error, quota_exceeded}.
|
|
|
|
|
|
|
|
-post_config_update([auto_subscribe], _Req, NewConf, _OldConf, _AppEnvs) ->
|
|
|
|
|
|
|
+post_config_update([?ROOT_KEY], _Req, NewConf, _OldConf, _AppEnvs) ->
|
|
|
update_hook(NewConf).
|
|
update_hook(NewConf).
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
@@ -99,26 +100,26 @@ on_client_connected(_, _, _) ->
|
|
|
|
|
|
|
|
-spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}.
|
|
-spec get_basic_usage_info() -> #{auto_subscribe_count => non_neg_integer()}.
|
|
|
get_basic_usage_info() ->
|
|
get_basic_usage_info() ->
|
|
|
- AutoSubscribe = emqx_conf:get([auto_subscribe, topics], []),
|
|
|
|
|
|
|
+ AutoSubscribe = emqx_conf:get([?ROOT_KEY, topics], []),
|
|
|
#{auto_subscribe_count => length(AutoSubscribe)}.
|
|
#{auto_subscribe_count => length(AutoSubscribe)}.
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Data backup
|
|
%% Data backup
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
-import_config(#{<<"auto_subscribe">> := #{<<"topics">> := Topics}} = AutoSubscribe) ->
|
|
|
|
|
- ConfPath = [auto_subscribe],
|
|
|
|
|
- OldTopics = emqx:get_raw_config(ConfPath, []),
|
|
|
|
|
|
|
+import_config(#{<<"auto_subscribe">> := #{<<"topics">> := Topics} = AutoSubscribe}) ->
|
|
|
|
|
+ ConfPath = [?ROOT_KEY],
|
|
|
|
|
+ OldTopics = emqx:get_raw_config(ConfPath ++ [topics], []),
|
|
|
KeyFun = fun(#{<<"topic">> := T}) -> T end,
|
|
KeyFun = fun(#{<<"topic">> := T}) -> T end,
|
|
|
MergedTopics = emqx_utils:merge_lists(OldTopics, Topics, KeyFun),
|
|
MergedTopics = emqx_utils:merge_lists(OldTopics, Topics, KeyFun),
|
|
|
Conf = AutoSubscribe#{<<"topics">> => MergedTopics},
|
|
Conf = AutoSubscribe#{<<"topics">> => MergedTopics},
|
|
|
case emqx_conf:update(ConfPath, Conf, #{override_to => cluster}) of
|
|
case emqx_conf:update(ConfPath, Conf, #{override_to => cluster}) of
|
|
|
- {ok, #{raw_config := NewTopics}} ->
|
|
|
|
|
|
|
+ {ok, #{raw_config := #{<<"topics">> := NewTopics}}} ->
|
|
|
Changed = maps:get(changed, emqx_utils:diff_lists(NewTopics, OldTopics, KeyFun)),
|
|
Changed = maps:get(changed, emqx_utils:diff_lists(NewTopics, OldTopics, KeyFun)),
|
|
|
Changed1 = [ConfPath ++ [T] || {#{<<"topic">> := T}, _} <- Changed],
|
|
Changed1 = [ConfPath ++ [T] || {#{<<"topic">> := T}, _} <- Changed],
|
|
|
- {ok, #{root_key => auto_subscribe, changed => Changed1}};
|
|
|
|
|
|
|
+ {ok, #{root_key => ?ROOT_KEY, changed => Changed1}};
|
|
|
Error ->
|
|
Error ->
|
|
|
- {error, #{root_key => auto_subscribe, reason => Error}}
|
|
|
|
|
|
|
+ {error, #{root_key => ?ROOT_KEY, reason => Error}}
|
|
|
end;
|
|
end;
|
|
|
import_config(_RawConf) ->
|
|
import_config(_RawConf) ->
|
|
|
{ok, #{root_key => auto_subscribe, changed => []}}.
|
|
{ok, #{root_key => auto_subscribe, changed => []}}.
|
|
@@ -139,7 +140,7 @@ format(Rule = #{topic := Topic}) when is_map(Rule) ->
|
|
|
}.
|
|
}.
|
|
|
|
|
|
|
|
update_hook() ->
|
|
update_hook() ->
|
|
|
- update_hook(emqx_conf:get([auto_subscribe], #{topics => []})).
|
|
|
|
|
|
|
+ update_hook(emqx_conf:get([?ROOT_KEY], #{topics => []})).
|
|
|
|
|
|
|
|
update_hook(Config) ->
|
|
update_hook(Config) ->
|
|
|
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config),
|
|
{TopicHandler, Options} = emqx_auto_subscribe_handler:init(Config),
|