|
|
@@ -22,6 +22,8 @@
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
|
|
+-logger_header("[Delayed]").
|
|
|
+
|
|
|
%% Mnesia bootstrap
|
|
|
-export([mnesia/1]).
|
|
|
|
|
|
@@ -90,7 +92,11 @@ description() ->
|
|
|
%% Hooks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>>, timestamp = Ts}) ->
|
|
|
+on_message_publish(Msg = #message{
|
|
|
+ id = Id,
|
|
|
+ topic = <<"$delayed/", Topic/binary>>,
|
|
|
+ timestamp = Ts
|
|
|
+ }) ->
|
|
|
[Delay, Topic1] = binary:split(Topic, <<"/">>),
|
|
|
PubAt = case binary_to_integer(Delay) of
|
|
|
Interval when Interval < ?MAX_INTERVAL ->
|
|
|
@@ -127,42 +133,57 @@ store(DelayedMsg) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init([]) ->
|
|
|
- {ok, ensure_publish_timer(#{timer => undefined, publish_at => 0})}.
|
|
|
+ {ok, ensure_stats_event(
|
|
|
+ ensure_publish_timer(#{timer => undefined, publish_at => 0}))}.
|
|
|
|
|
|
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
|
|
|
ok = mnesia:dirty_write(?TAB, DelayedMsg),
|
|
|
- emqx_metrics:set('messages.delayed', delayed_count()),
|
|
|
+ emqx_metrics:inc('messages.delayed'),
|
|
|
{reply, ok, ensure_publish_timer(Key, State)};
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
- ?LOG(error, "[Delayed] Unexpected call: ~p", [Req]),
|
|
|
+ ?LOG(error, "Unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|
|
|
|
|
|
handle_cast(Msg, State) ->
|
|
|
- ?LOG(error, "[Delayed] Unexpected cast: ~p", [Msg]),
|
|
|
+ ?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
%% Do Publish...
|
|
|
handle_info({timeout, TRef, do_publish}, State = #{timer := TRef}) ->
|
|
|
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)),
|
|
|
lists:foreach(fun(Key) -> mnesia:dirty_delete(?TAB, Key) end, DeletedKeys),
|
|
|
- emqx_metrics:set('messages.delayed', delayed_count()),
|
|
|
{noreply, ensure_publish_timer(State#{timer := undefined, publish_at := 0})};
|
|
|
|
|
|
+handle_info(stats, State = #{stats_fun := StatsFun}) ->
|
|
|
+ StatsFun(delayed_count()),
|
|
|
+ {noreply, State, hibernate};
|
|
|
+
|
|
|
handle_info(Info, State) ->
|
|
|
- ?LOG(error, "[Delayed] Unexpected info: ~p", [Info]),
|
|
|
+ ?LOG(error, "Unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(_Reason, #{timer := TRef}) ->
|
|
|
emqx_misc:cancel_timer(TRef).
|
|
|
|
|
|
-code_change(_OldVsn, State, _Extra) ->
|
|
|
- {ok, State}.
|
|
|
+code_change({down, Vsn}, State, _Extra) when Vsn =:= "4.3.0" ->
|
|
|
+ NState = maps:with([timer, publish_at], State),
|
|
|
+ {ok, NState};
|
|
|
+
|
|
|
+code_change(Vsn, State, _Extra) when Vsn =:= "4.3.0" ->
|
|
|
+ NState = ensure_stats_event(State),
|
|
|
+ {ok, NState}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
+%% Ensure the stats
|
|
|
+ensure_stats_event(State) ->
|
|
|
+ StatsFun = emqx_stats:statsfun('delayed.count', 'delayed.max'),
|
|
|
+ {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
|
|
+ State#{stats_fun => StatsFun, stats_timer => StatsTimer}.
|
|
|
+
|
|
|
%% Ensure publish timer
|
|
|
ensure_publish_timer(State) ->
|
|
|
ensure_publish_timer(mnesia:dirty_first(?TAB), State).
|