|
|
@@ -47,6 +47,7 @@
|
|
|
, list/1
|
|
|
, get_delayed_message/1
|
|
|
, delete_delayed_message/1
|
|
|
+ , post_config_update/5
|
|
|
]).
|
|
|
|
|
|
-export([format_delayed/1]).
|
|
|
@@ -189,11 +190,30 @@ delete_delayed_message(Id0) ->
|
|
|
update_config(Config) ->
|
|
|
emqx_conf:update([delayed], Config, #{rawconf_with_defaults => true, override_to => cluster}).
|
|
|
|
|
|
+post_config_update(_KeyPath, Config, _NewConf, _OldConf, _AppEnvs) ->
|
|
|
+ case maps:get(<<"enable">>, Config, undefined) of
|
|
|
+ undefined ->
|
|
|
+ ignore;
|
|
|
+ true ->
|
|
|
+ emqx_delayed:enable();
|
|
|
+ false ->
|
|
|
+ emqx_delayed:disable()
|
|
|
+ end,
|
|
|
+ case maps:get(<<"max_delayed_messages">>, Config, undefined) of
|
|
|
+ undefined ->
|
|
|
+ ignore;
|
|
|
+ Max ->
|
|
|
+ ok = emqx_delayed:set_max_delayed_messages(Max)
|
|
|
+ end,
|
|
|
+ ok.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callback
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init([Opts]) ->
|
|
|
+ erlang:process_flag(trap_exit, true),
|
|
|
+ emqx_conf:add_handler([delayed], ?MODULE),
|
|
|
MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0),
|
|
|
{ok, ensure_stats_event(
|
|
|
ensure_publish_timer(#{timer => undefined,
|
|
|
@@ -252,6 +272,7 @@ handle_info(Info, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(_Reason, #{timer := TRef}) ->
|
|
|
+ emqx_conf:remove_handler([delayed]),
|
|
|
emqx_misc:cancel_timer(TRef).
|
|
|
|
|
|
code_change(_Vsn, State, _Extra) ->
|