|
|
@@ -39,7 +39,7 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
terminate/2, code_change/3]).
|
|
|
|
|
|
--record(retained_message, {topic, msg}).
|
|
|
+-record(mqtt_retained, {topic, msg}).
|
|
|
|
|
|
-record(state, {stats_fun, expired_after, stats_timer, expire_timer}).
|
|
|
|
|
|
@@ -65,13 +65,13 @@ on_message_publish(Msg = #mqtt_message{retain = false}, _Env) ->
|
|
|
|
|
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
|
|
on_message_publish(Msg = #mqtt_message{retain = true, topic = Topic, payload = <<>>}, _Env) ->
|
|
|
- mnesia:dirty_delete(retained_message, Topic),
|
|
|
+ mnesia:dirty_delete(mqtt_retained, Topic),
|
|
|
{stop, Msg};
|
|
|
|
|
|
on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}, Env) ->
|
|
|
case {is_table_full(Env), is_too_big(size(Payload), Env)} of
|
|
|
{false, false} ->
|
|
|
- mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}),
|
|
|
+ mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg}),
|
|
|
emqttd_metrics:set('messages/retained', retained_count());
|
|
|
{true, _}->
|
|
|
lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]);
|
|
|
@@ -111,20 +111,19 @@ start_link(Env) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init([Env]) ->
|
|
|
- Copy = case proplists:get_value(storage, Env, disc) of
|
|
|
+ Copy = case proplists:get_value(storage_type, Env, disc) of
|
|
|
disc -> disc_copies;
|
|
|
ram -> ram_copies
|
|
|
end,
|
|
|
- ok = emqttd_mnesia:create_table(retained_message, [
|
|
|
+ ok = emqttd_mnesia:create_table(mqtt_retained, [
|
|
|
{type, ordered_set},
|
|
|
{Copy, [node()]},
|
|
|
- {record_name, retained_message},
|
|
|
- {attributes, record_info(fields, retained_message)},
|
|
|
+ {record_name, mqtt_retained},
|
|
|
+ {attributes, record_info(fields, mqtt_retained)},
|
|
|
{storage_properties, [{ets, [compressed]},
|
|
|
{dets, [{auto_save, 1000}]}]}]),
|
|
|
- ok = emqttd_mnesia:copy_table(retained_message),
|
|
|
+ ok = emqttd_mnesia:copy_table(mqtt_retained),
|
|
|
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
|
|
|
- %% One second
|
|
|
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
|
|
State = #state{stats_fun = StatsFun, stats_timer = StatsTimer},
|
|
|
{ok, init_expire_timer(proplists:get_value(expired_after, Env, 0), State)}.
|
|
|
@@ -171,33 +170,33 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
|
-spec(read_messages(binary()) -> [mqtt_message()]).
|
|
|
read_messages(Topic) ->
|
|
|
- [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
|
|
|
+ [Msg || #mqtt_retained{msg = Msg} <- mnesia:dirty_read(mqtt_retained, Topic)].
|
|
|
|
|
|
-spec(match_messages(binary()) -> [mqtt_message()]).
|
|
|
match_messages(Filter) ->
|
|
|
%% TODO: optimize later...
|
|
|
- Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
|
|
|
+ Fun = fun(#mqtt_retained{topic = Name, msg = Msg}, Acc) ->
|
|
|
case emqttd_topic:match(Name, Filter) of
|
|
|
true -> [Msg|Acc];
|
|
|
false -> Acc
|
|
|
end
|
|
|
end,
|
|
|
- mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
|
|
|
+ mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], mqtt_retained]).
|
|
|
|
|
|
-spec(expire_messages(pos_integer()) -> any()).
|
|
|
expire_messages(Time) when is_integer(Time) ->
|
|
|
mnesia:transaction(
|
|
|
fun() ->
|
|
|
Match = ets:fun2ms(
|
|
|
- fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = Ts}})
|
|
|
+ fun(#mqtt_retained{topic = Topic, msg = #mqtt_message{timestamp = Ts}})
|
|
|
when Time > Ts -> Topic
|
|
|
end),
|
|
|
- Topics = mnesia:select(retained_message, Match, write),
|
|
|
+ Topics = mnesia:select(mqtt_retained, Match, write),
|
|
|
lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages
|
|
|
- (Topic) -> mnesia:delete({retained_message, Topic})
|
|
|
+ (Topic) -> mnesia:delete({mqtt_retained, Topic})
|
|
|
end, Topics)
|
|
|
end).
|
|
|
|
|
|
-spec(retained_count() -> non_neg_integer()).
|
|
|
-retained_count() -> mnesia:table_info(retained_message, size).
|
|
|
+retained_count() -> mnesia:table_info(mqtt_retained, size).
|
|
|
|