|
|
@@ -21,6 +21,7 @@
|
|
|
-include("emqx_schema.hrl").
|
|
|
-include("emqx.hrl").
|
|
|
-include("emqx_mqtt.hrl").
|
|
|
+-include("emqx_shared_sub.hrl").
|
|
|
-include("logger.hrl").
|
|
|
-include("types.hrl").
|
|
|
|
|
|
@@ -84,10 +85,7 @@
|
|
|
| hash_topic.
|
|
|
|
|
|
-define(SERVER, ?MODULE).
|
|
|
--define(TAB, emqx_shared_subscription).
|
|
|
--define(SHARED_SUBS_ROUND_ROBIN_COUNTER, emqx_shared_subscriber_round_robin_counter).
|
|
|
--define(SHARED_SUBS, emqx_shared_subscriber).
|
|
|
--define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
|
|
+
|
|
|
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
|
|
-define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
|
|
|
-define(ACK, shared_sub_ack).
|
|
|
@@ -99,21 +97,21 @@
|
|
|
|
|
|
-record(state, {pmon}).
|
|
|
|
|
|
--record(emqx_shared_subscription, {group, topic, subpid}).
|
|
|
+-record(?SHARED_SUBSCRIPTION, {group, topic, subpid}).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Mnesia bootstrap
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
create_tables() ->
|
|
|
- ok = mria:create_table(?TAB, [
|
|
|
+ ok = mria:create_table(?SHARED_SUBSCRIPTION, [
|
|
|
{type, bag},
|
|
|
{rlog_shard, ?SHARED_SUB_SHARD},
|
|
|
{storage, ram_copies},
|
|
|
- {record_name, emqx_shared_subscription},
|
|
|
- {attributes, record_info(fields, emqx_shared_subscription)}
|
|
|
+ {record_name, ?SHARED_SUBSCRIPTION},
|
|
|
+ {attributes, record_info(fields, ?SHARED_SUBSCRIPTION)}
|
|
|
]),
|
|
|
- [?TAB].
|
|
|
+ [?SHARED_SUBSCRIPTION].
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% API
|
|
|
@@ -132,7 +130,7 @@ unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
|
|
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
|
|
|
|
|
|
record(Group, Topic, SubPid) ->
|
|
|
- #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
|
|
+ #?SHARED_SUBSCRIPTION{group = Group, topic = Topic, subpid = SubPid}.
|
|
|
|
|
|
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
|
|
|
emqx_types:deliver_result().
|
|
|
@@ -394,18 +392,18 @@ subscribers(Group, Topic, FailedSubs) ->
|
|
|
|
|
|
%% Select ETS table to get all subscriber pids.
|
|
|
subscribers(Group, Topic) ->
|
|
|
- ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
|
|
+ ets:select(?SHARED_SUBSCRIPTION, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init([]) ->
|
|
|
- ok = mria:wait_for_tables([?TAB]),
|
|
|
- {ok, _} = mnesia:subscribe({table, ?TAB, simple}),
|
|
|
+ ok = mria:wait_for_tables([?SHARED_SUBSCRIPTION]),
|
|
|
+ {ok, _} = mnesia:subscribe({table, ?SHARED_SUBSCRIPTION, simple}),
|
|
|
{atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
|
|
|
- ok = emqx_utils_ets:new(?SHARED_SUBS, [protected, bag]),
|
|
|
- ok = emqx_utils_ets:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
|
|
|
+ ok = emqx_utils_ets:new(?SHARED_SUBSCRIBER, [protected, bag]),
|
|
|
+ ok = emqx_utils_ets:new(?ALIVE_SHARED_SUBSCRIBERS, [protected, set, {read_concurrency, true}]),
|
|
|
ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [
|
|
|
public, set, {write_concurrency, true}
|
|
|
]),
|
|
|
@@ -413,26 +411,26 @@ init([]) ->
|
|
|
|
|
|
init_monitors() ->
|
|
|
mnesia:foldl(
|
|
|
- fun(#emqx_shared_subscription{subpid = SubPid}, Mon) ->
|
|
|
+ fun(#?SHARED_SUBSCRIPTION{subpid = SubPid}, Mon) ->
|
|
|
emqx_pmon:monitor(SubPid, Mon)
|
|
|
end,
|
|
|
emqx_pmon:new(),
|
|
|
- ?TAB
|
|
|
+ ?SHARED_SUBSCRIPTION
|
|
|
).
|
|
|
|
|
|
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
|
|
|
- mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
|
|
- case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
|
|
+ mria:dirty_write(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)),
|
|
|
+ case ets:member(?SHARED_SUBSCRIBER, {Group, Topic}) of
|
|
|
true -> ok;
|
|
|
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
|
|
|
end,
|
|
|
ok = maybe_insert_alive_tab(SubPid),
|
|
|
ok = maybe_insert_round_robin_count({Group, Topic}),
|
|
|
- true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
|
|
+ true = ets:insert(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
|
|
|
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
|
|
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
|
|
- mria:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
|
|
|
- true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
|
|
+ mria:dirty_delete_object(?SHARED_SUBSCRIPTION, record(Group, Topic, SubPid)),
|
|
|
+ true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
|
|
|
delete_route_if_needed({Group, Topic}),
|
|
|
maybe_delete_round_robin_count({Group, Topic}),
|
|
|
{reply, ok, update_stats(State)};
|
|
|
@@ -445,7 +443,7 @@ handle_cast(Msg, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
handle_info(
|
|
|
- {mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}},
|
|
|
+ {mnesia_table_event, {write, #?SHARED_SUBSCRIPTION{subpid = SubPid}, _}},
|
|
|
State = #state{pmon = PMon}
|
|
|
) ->
|
|
|
ok = maybe_insert_alive_tab(SubPid),
|
|
|
@@ -455,7 +453,7 @@ handle_info(
|
|
|
%% The trick is we don't demonitor the subscriber here, and (after a long time) it will eventually
|
|
|
%% be disconnected.
|
|
|
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
|
|
-% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
|
|
+% #?SHARED_SUBSCRIPTION{subpid = SubPid} = OldRecord,
|
|
|
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
|
|
|
|
|
handle_info({mnesia_table_event, _Event}, State) ->
|
|
|
@@ -468,7 +466,7 @@ handle_info(_Info, State) ->
|
|
|
{noreply, State}.
|
|
|
|
|
|
terminate(_Reason, _State) ->
|
|
|
- mnesia:unsubscribe({table, ?TAB, simple}).
|
|
|
+ mnesia:unsubscribe({table, ?SHARED_SUBSCRIPTION, simple}).
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
@@ -501,7 +499,7 @@ maybe_delete_round_robin_count({Group, _Topic} = GroupTopic) ->
|
|
|
ok.
|
|
|
|
|
|
if_no_more_subscribers(GroupTopic, Fn) ->
|
|
|
- case ets:member(?SHARED_SUBS, GroupTopic) of
|
|
|
+ case ets:member(?SHARED_SUBSCRIBER, GroupTopic) of
|
|
|
true -> ok;
|
|
|
false -> Fn()
|
|
|
end,
|
|
|
@@ -510,26 +508,26 @@ if_no_more_subscribers(GroupTopic, Fn) ->
|
|
|
%% keep track of alive remote pids
|
|
|
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
|
|
|
maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
|
|
|
- ets:insert(?ALIVE_SUBS, {Pid}),
|
|
|
+ ets:insert(?ALIVE_SHARED_SUBSCRIBERS, {Pid}),
|
|
|
ok.
|
|
|
|
|
|
cleanup_down(SubPid) ->
|
|
|
- ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
|
|
|
+ ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SHARED_SUBSCRIBERS, SubPid),
|
|
|
lists:foreach(
|
|
|
- fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
|
|
|
- ok = mria:dirty_delete_object(?TAB, Record),
|
|
|
- true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
|
|
+ fun(Record = #?SHARED_SUBSCRIPTION{topic = Topic, group = Group}) ->
|
|
|
+ ok = mria:dirty_delete_object(?SHARED_SUBSCRIPTION, Record),
|
|
|
+ true = ets:delete_object(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
|
|
|
maybe_delete_round_robin_count({Group, Topic}),
|
|
|
delete_route_if_needed({Group, Topic})
|
|
|
end,
|
|
|
- mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})
|
|
|
+ mnesia:dirty_match_object(#?SHARED_SUBSCRIPTION{_ = '_', subpid = SubPid})
|
|
|
).
|
|
|
|
|
|
update_stats(State) ->
|
|
|
emqx_stats:setstat(
|
|
|
'subscriptions.shared.count',
|
|
|
'subscriptions.shared.max',
|
|
|
- ets:info(?TAB, size)
|
|
|
+ ets:info(?SHARED_SUBSCRIPTION, size)
|
|
|
),
|
|
|
State.
|
|
|
|
|
|
@@ -543,7 +541,7 @@ is_active_sub(Pid, FailedSubs, All) ->
|
|
|
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
|
|
erlang:is_process_alive(Pid);
|
|
|
is_alive_sub(Pid) ->
|
|
|
- [] =/= ets:lookup(?ALIVE_SUBS, Pid).
|
|
|
+ [] =/= ets:lookup(?ALIVE_SHARED_SUBSCRIBERS, Pid).
|
|
|
|
|
|
delete_route_if_needed({Group, Topic} = GroupTopic) ->
|
|
|
if_no_more_subscribers(GroupTopic, fun() ->
|