|
|
@@ -25,14 +25,14 @@
|
|
|
|
|
|
-logger_header("[Retainer]").
|
|
|
|
|
|
--export([start_link/1]).
|
|
|
+-export([start_link/0]).
|
|
|
|
|
|
--export([ load/1
|
|
|
+-export([ load/0
|
|
|
, unload/0
|
|
|
]).
|
|
|
|
|
|
-export([ on_session_subscribed/3
|
|
|
- , on_message_publish/2
|
|
|
+ , on_message_publish/1
|
|
|
]).
|
|
|
|
|
|
-export([clean/1]).
|
|
|
@@ -51,15 +51,25 @@
|
|
|
|
|
|
-record(state, {stats_fun, stats_timer, expiry_timer}).
|
|
|
|
|
|
+-define(STATS_INTERVAL, timer:seconds(1)).
|
|
|
+-define(DEF_STORAGE_TYPE, ram).
|
|
|
+-define(DEF_MAX_RETAINED_MESSAGES, 0).
|
|
|
+-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
|
|
|
+-define(DEF_EXPIRY_INTERVAL, 0).
|
|
|
+
|
|
|
+%% convenient to generate stats_timer/expiry_timer
|
|
|
+-define(MAKE_TIMER(State, Timer, Interval, Msg),
|
|
|
+ State#state{Timer = erlang:send_after(Interval, self(), Msg)}).
|
|
|
+
|
|
|
-rlog_shard({?RETAINER_SHARD, ?TAB}).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Load/Unload
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-load(Env) ->
|
|
|
+load() ->
|
|
|
_ = emqx:hook('session.subscribed', {?MODULE, on_session_subscribed, []}),
|
|
|
- _ = emqx:hook('message.publish', {?MODULE, on_message_publish, [Env]}),
|
|
|
+ _ = emqx:hook('message.publish', {?MODULE, on_message_publish, []}),
|
|
|
ok.
|
|
|
|
|
|
unload() ->
|
|
|
@@ -85,15 +95,15 @@ dispatch(Pid, Topic) ->
|
|
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
|
|
on_message_publish(Msg = #message{flags = #{retain := true},
|
|
|
topic = Topic,
|
|
|
- payload = <<>>}, _Env) ->
|
|
|
+ payload = <<>>}) ->
|
|
|
ekka_mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
|
|
{ok, Msg};
|
|
|
|
|
|
-on_message_publish(Msg = #message{flags = #{retain := true}}, Env) ->
|
|
|
+on_message_publish(Msg = #message{flags = #{retain := true}}) ->
|
|
|
Msg1 = emqx_message:set_header(retained, true, Msg),
|
|
|
- store_retained(Msg1, Env),
|
|
|
+ store_retained(Msg1),
|
|
|
{ok, Msg};
|
|
|
-on_message_publish(Msg, _Env) ->
|
|
|
+on_message_publish(Msg) ->
|
|
|
{ok, Msg}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -101,9 +111,9 @@ on_message_publish(Msg, _Env) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
%% @doc Start the retainer
|
|
|
--spec(start_link(Env :: list()) -> emqx_types:startlink_ret()).
|
|
|
-start_link(Env) ->
|
|
|
- gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
|
|
|
+-spec(start_link() -> emqx_types:startlink_ret()).
|
|
|
+start_link() ->
|
|
|
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
|
|
-spec(clean(emqx_types:topic()) -> non_neg_integer()).
|
|
|
clean(Topic) when is_binary(Topic) ->
|
|
|
@@ -124,8 +134,10 @@ clean(Topic) when is_binary(Topic) ->
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-init([Env]) ->
|
|
|
- Copies = case proplists:get_value(storage_type, Env, disc) of
|
|
|
+init([]) ->
|
|
|
+ StorageType = emqx_config:get([?MODULE, storage_type], ?DEF_STORAGE_TYPE),
|
|
|
+ ExpiryInterval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
|
|
+ Copies = case StorageType of
|
|
|
ram -> ram_copies;
|
|
|
disc -> disc_copies;
|
|
|
disc_only -> disc_only_copies
|
|
|
@@ -149,17 +161,15 @@ init([Env]) ->
|
|
|
ok
|
|
|
end,
|
|
|
StatsFun = emqx_stats:statsfun('retained.count', 'retained.max'),
|
|
|
- {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
|
|
- State = #state{stats_fun = StatsFun, stats_timer = StatsTimer},
|
|
|
- {ok, start_expire_timer(proplists:get_value(expiry_interval, Env, 0), State)}.
|
|
|
+ State = ?MAKE_TIMER(#state{stats_fun = StatsFun}, stats_timer, ?STATS_INTERVAL, stats),
|
|
|
+ {ok, start_expire_timer(ExpiryInterval, State)}.
|
|
|
|
|
|
start_expire_timer(0, State) ->
|
|
|
State;
|
|
|
start_expire_timer(undefined, State) ->
|
|
|
State;
|
|
|
start_expire_timer(Ms, State) ->
|
|
|
- {ok, Timer} = timer:send_interval(Ms, expire),
|
|
|
- State#state{expiry_timer = Timer}.
|
|
|
+ ?MAKE_TIMER(State, expiry_timer, Ms, expire).
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
|
|
@@ -171,19 +181,20 @@ handle_cast(Msg, State) ->
|
|
|
|
|
|
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
|
|
StatsFun(retained_count()),
|
|
|
- {noreply, State, hibernate};
|
|
|
+ {noreply, ?MAKE_TIMER(State, stats_timer, ?STATS_INTERVAL, stats), hibernate};
|
|
|
|
|
|
handle_info(expire, State) ->
|
|
|
ok = expire_messages(),
|
|
|
- {noreply, State, hibernate};
|
|
|
+ Interval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
|
|
+ {noreply, start_expire_timer(Interval, State), hibernate};
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(_Reason, #state{stats_timer = TRef1, expiry_timer = TRef2}) ->
|
|
|
- _ = timer:cancel(TRef1),
|
|
|
- _ = timer:cancel(TRef2),
|
|
|
+ _ = erlang:cancel_timer(TRef1),
|
|
|
+ _ = erlang:cancel_timer(TRef2),
|
|
|
ok.
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
@@ -192,31 +203,33 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
-
|
|
|
sort_retained([]) -> [];
|
|
|
sort_retained([Msg]) -> [Msg];
|
|
|
sort_retained(Msgs) ->
|
|
|
lists:sort(fun(#message{timestamp = Ts1}, #message{timestamp = Ts2}) ->
|
|
|
- Ts1 =< Ts2
|
|
|
- end, Msgs).
|
|
|
+ Ts1 =< Ts2 end,
|
|
|
+ Msgs).
|
|
|
|
|
|
-store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
|
|
- case {is_table_full(Env), is_too_big(size(Payload), Env)} of
|
|
|
+store_retained(Msg = #message{topic = Topic, payload = Payload}) ->
|
|
|
+ case {is_table_full(), is_too_big(size(Payload))} of
|
|
|
{false, false} ->
|
|
|
ok = emqx_metrics:inc('messages.retained'),
|
|
|
ekka_mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic),
|
|
|
msg = Msg,
|
|
|
- expiry_time = get_expiry_time(Msg, Env)});
|
|
|
+ expiry_time = get_expiry_time(Msg)});
|
|
|
{true, false} ->
|
|
|
{atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD,
|
|
|
fun() ->
|
|
|
- case mnesia:read(?TAB, Topic) of
|
|
|
- [_] ->
|
|
|
- mnesia:write(?TAB, #retained{topic = topic2tokens(Topic),
|
|
|
- msg = Msg,
|
|
|
- expiry_time = get_expiry_time(Msg, Env)}, write);
|
|
|
- [] ->
|
|
|
- ?LOG(error, "Cannot retain message(topic=~s) for table is full!", [Topic])
|
|
|
+ case mnesia:read(?TAB, Topic) of
|
|
|
+ [_] ->
|
|
|
+ mnesia:write(?TAB,
|
|
|
+ #retained{topic = topic2tokens(Topic),
|
|
|
+ msg = Msg,
|
|
|
+ expiry_time = get_expiry_time(Msg)},
|
|
|
+ write);
|
|
|
+ [] ->
|
|
|
+ ?LOG(error,
|
|
|
+ "Cannot retain message(topic=~s) for table is full!", [Topic])
|
|
|
end
|
|
|
end),
|
|
|
ok;
|
|
|
@@ -227,22 +240,24 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
|
|
"for payload is too big!", [Topic, iolist_size(Payload)])
|
|
|
end.
|
|
|
|
|
|
-is_table_full(Env) ->
|
|
|
- Limit = proplists:get_value(max_retained_messages, Env, 0),
|
|
|
+is_table_full() ->
|
|
|
+ Limit = emqx_config:get([?MODULE, max_retained_messages], ?DEF_MAX_RETAINED_MESSAGES),
|
|
|
Limit > 0 andalso (retained_count() > Limit).
|
|
|
|
|
|
-is_too_big(Size, Env) ->
|
|
|
- Limit = proplists:get_value(max_payload_size, Env, 0),
|
|
|
+is_too_big(Size) ->
|
|
|
+ Limit = emqx_config:get([?MODULE, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
|
|
|
Limit > 0 andalso (Size > Limit).
|
|
|
|
|
|
-get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}, _Env) ->
|
|
|
+get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := 0}}}) ->
|
|
|
0;
|
|
|
-get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = Ts}, _Env) ->
|
|
|
+get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}},
|
|
|
+ timestamp = Ts}) ->
|
|
|
Ts + Interval * 1000;
|
|
|
-get_expiry_time(#message{timestamp = Ts}, Env) ->
|
|
|
- case proplists:get_value(expiry_interval, Env, 0) of
|
|
|
+get_expiry_time(#message{timestamp = Ts}) ->
|
|
|
+ Interval = emqx_config:get([?MODULE, expiry_interval], ?DEF_EXPIRY_INTERVAL),
|
|
|
+ case Interval of
|
|
|
0 -> 0;
|
|
|
- Interval -> Ts + Interval
|
|
|
+ _ -> Ts + Interval
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|