|
|
@@ -51,6 +51,8 @@
|
|
|
|
|
|
-record(state, {stats_fun, stats_timer, expiry_timer}).
|
|
|
|
|
|
+-rlog_shard({?RETAINER_SHARD, ?TAB}).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Load/Unload
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -84,7 +86,7 @@ dispatch(Pid, Topic) ->
|
|
|
on_message_publish(Msg = #message{flags = #{retain := true},
|
|
|
topic = Topic,
|
|
|
payload = <<>>}, _Env) ->
|
|
|
- mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
|
|
+ ekka_mnesia:dirty_delete(?TAB, topic2tokens(Topic)),
|
|
|
{ok, Msg};
|
|
|
|
|
|
on_message_publish(Msg = #message{flags = #{retain := true}}, Env) ->
|
|
|
@@ -115,7 +117,7 @@ clean(Topic) when is_binary(Topic) ->
|
|
|
[_M] -> mnesia:delete({?TAB, Tokens}), 1
|
|
|
end
|
|
|
end,
|
|
|
- {atomic, N} = mnesia:transaction(Fun), N
|
|
|
+ {atomic, N} = ekka_mnesia:transaction(?RETAINER_SHARD, Fun), N
|
|
|
end.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -139,6 +141,7 @@ init([Env]) ->
|
|
|
{attributes, record_info(fields, retained)},
|
|
|
{storage_properties, StoreProps}]),
|
|
|
ok = ekka_mnesia:copy_table(?TAB, Copies),
|
|
|
+ ok = ekka_rlog:wait_for_shards([?RETAINER_SHARD], infinity),
|
|
|
case mnesia:table_info(?TAB, storage_type) of
|
|
|
Copies -> ok;
|
|
|
_Other ->
|
|
|
@@ -201,11 +204,11 @@ store_retained(Msg = #message{topic = Topic, payload = Payload}, Env) ->
|
|
|
case {is_table_full(Env), is_too_big(size(Payload), Env)} of
|
|
|
{false, false} ->
|
|
|
ok = emqx_metrics:inc('messages.retained'),
|
|
|
- mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic),
|
|
|
- msg = Msg,
|
|
|
- expiry_time = get_expiry_time(Msg, Env)});
|
|
|
+ ekka_mnesia:dirty_write(?TAB, #retained{topic = topic2tokens(Topic),
|
|
|
+ msg = Msg,
|
|
|
+ expiry_time = get_expiry_time(Msg, Env)});
|
|
|
{true, false} ->
|
|
|
- {atomic, _} = mnesia:transaction(
|
|
|
+ {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD,
|
|
|
fun() ->
|
|
|
case mnesia:read(?TAB, Topic) of
|
|
|
[_] ->
|
|
|
@@ -256,7 +259,7 @@ expire_messages() ->
|
|
|
NowMs = erlang:system_time(millisecond),
|
|
|
MsHd = #retained{topic = '$1', msg = '_', expiry_time = '$3'},
|
|
|
Ms = [{MsHd, [{'=/=','$3',0}, {'<','$3',NowMs}], ['$1']}],
|
|
|
- {atomic, _} = mnesia:transaction(
|
|
|
+ {atomic, _} = ekka_mnesia:transaction(?RETAINER_SHARD,
|
|
|
fun() ->
|
|
|
Keys = mnesia:select(?TAB, Ms, write),
|
|
|
lists:foreach(fun(Key) -> mnesia:delete({?TAB, Key}) end, Keys)
|
|
|
@@ -293,7 +296,7 @@ match_delete_messages(Filter) ->
|
|
|
MsHd = #retained{topic = Cond, msg = '_', expiry_time = '_'},
|
|
|
Ms = [{MsHd, [], ['$_']}],
|
|
|
Rs = mnesia:dirty_select(?TAB, Ms),
|
|
|
- lists:foreach(fun(R) -> mnesia:dirty_delete_object(?TAB, R) end, Rs),
|
|
|
+ lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(?TAB, R) end, Rs),
|
|
|
length(Rs).
|
|
|
|
|
|
%% @private
|