|
|
@@ -43,8 +43,7 @@
|
|
|
]).
|
|
|
|
|
|
%% gen_server callbacks
|
|
|
--export([ get_status/0
|
|
|
- , enable/0
|
|
|
+-export([ enable/0
|
|
|
, disable/0
|
|
|
]).
|
|
|
|
|
|
@@ -90,7 +89,11 @@ on_message_publish(Msg = #message{
|
|
|
end,
|
|
|
PubMsg = Msg#message{topic = Topic1},
|
|
|
Headers = PubMsg#message.headers,
|
|
|
- ok = store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}),
|
|
|
+ case store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}) of
|
|
|
+ ok -> ok;
|
|
|
+ {error, Error} ->
|
|
|
+ ?LOG(error, "Store delayed message fail: ~p", [Error])
|
|
|
+ end,
|
|
|
{stop, PubMsg#message{headers = Headers#{allow_publish => false}}};
|
|
|
|
|
|
on_message_publish(Msg) ->
|
|
|
@@ -109,9 +112,6 @@ start_link() ->
|
|
|
store(DelayedMsg) ->
|
|
|
gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
|
|
|
|
|
|
-get_status() ->
|
|
|
- gen_server:call(?SERVER, get_status).
|
|
|
-
|
|
|
enable() ->
|
|
|
gen_server:call(?SERVER, enable).
|
|
|
|
|
|
@@ -122,17 +122,31 @@ disable() ->
|
|
|
%% gen_server callback
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-init([_Opts]) ->
|
|
|
+init([Opts]) ->
|
|
|
+ MaxDelayedMessages = maps:get(max_delayed_messages, Opts, 0),
|
|
|
{ok, ensure_stats_event(
|
|
|
ensure_publish_timer(#{timer => undefined,
|
|
|
publish_at => 0,
|
|
|
- enabled => false}))}.
|
|
|
+ max_delayed_messages => MaxDelayedMessages}))}.
|
|
|
|
|
|
-handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
|
|
|
+handle_call({store, DelayedMsg = #delayed_message{key = Key}},
|
|
|
+ _From, State = #{max_delayed_messages := 0}) ->
|
|
|
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
|
|
|
emqx_metrics:inc('messages.delayed'),
|
|
|
{reply, ok, ensure_publish_timer(Key, State)};
|
|
|
|
|
|
+handle_call({store, DelayedMsg = #delayed_message{key = Key}},
|
|
|
+ _From, State = #{max_delayed_messages := Val}) ->
|
|
|
+ Size = mnesia:table_info(?TAB, size),
|
|
|
+ case Size > Val of
|
|
|
+ true ->
|
|
|
+ {reply, {error, max_delayed_messages_full}, State};
|
|
|
+ false ->
|
|
|
+ ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
|
|
|
+ emqx_metrics:inc('messages.delayed'),
|
|
|
+ {reply, ok, ensure_publish_timer(Key, State)}
|
|
|
+ end;
|
|
|
+
|
|
|
handle_call(enable, _From, State) ->
|
|
|
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
|
|
|
{reply, ok, State};
|
|
|
@@ -141,9 +155,6 @@ handle_call(disable, _From, State) ->
|
|
|
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
|
|
|
{reply, ok, State};
|
|
|
|
|
|
-handle_call(get_status, _From, State = #{enabled := Enabled}) ->
|
|
|
- {reply, Enabled, State};
|
|
|
-
|
|
|
handle_call(Req, _From, State) ->
|
|
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|