|
|
@@ -56,8 +56,9 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
load(Env) ->
|
|
|
- emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/3, []),
|
|
|
- emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]).
|
|
|
+ _ = emqx:hook('session.subscribed', fun ?MODULE:on_session_subscribed/3, []),
|
|
|
+ _ = emqx:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
|
|
|
+ ok.
|
|
|
|
|
|
unload() ->
|
|
|
emqx:unhook('message.publish', fun ?MODULE:on_message_publish/2),
|
|
|
@@ -169,15 +170,17 @@ handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
handle_info(expire, State) ->
|
|
|
- expire_messages(),
|
|
|
+ ok = expire_messages(),
|
|
|
{noreply, State, hibernate};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-terminate(_Reason, _State = #state{stats_timer = TRef1, expiry_timer = TRef2}) ->
|
|
|
- timer:cancel(TRef1), timer:cancel(TRef2).
|
|
|
+terminate(_Reason, #state{stats_timer = TRef1, expiry_timer = TRef2} = State) ->
|
|
|
+ _ = timer:cancel(TRef1),
|
|
|
+ _ = timer:cancel(TRef2),
|
|
|
+ ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
@@ -248,11 +251,12 @@ expire_messages() ->
|
|
|
NowMs = erlang:system_time(millisecond),
|
|
|
MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'},
|
|
|
Ms = [{MsHd, [{'=/=','$3',0}, {'<','$3',NowMs}], ['$1']}],
|
|
|
- mnesia:transaction(
|
|
|
+ {atomic, _} = mnesia:transaction(
|
|
|
fun() ->
|
|
|
Keys = mnesia:select(?TAB, Ms, write),
|
|
|
lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
|
|
|
- end).
|
|
|
+ end),
|
|
|
+ ok.
|
|
|
|
|
|
-spec(read_messages(emqx_types:topic())
|
|
|
-> [emqx_types:message()]).
|