|
|
@@ -87,7 +87,7 @@
|
|
|
-define(SERVER, ?MODULE).
|
|
|
|
|
|
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
|
|
--define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
|
|
|
+-define(IS_REMOTE_PID(Pid), (is_pid(Pid) andalso node(Pid) =/= node())).
|
|
|
-define(ACK, shared_sub_ack).
|
|
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
|
|
-define(NO_ACK, no_ack).
|
|
|
@@ -403,7 +403,6 @@ init([]) ->
|
|
|
{ok, _} = mnesia:subscribe({table, ?SHARED_SUBSCRIPTION, simple}),
|
|
|
{atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
|
|
|
ok = emqx_utils_ets:new(?SHARED_SUBSCRIBER, [protected, bag]),
|
|
|
- ok = emqx_utils_ets:new(?ALIVE_SHARED_SUBSCRIBERS, [protected, set, {read_concurrency, true}]),
|
|
|
ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [
|
|
|
public, set, {write_concurrency, true}
|
|
|
]),
|
|
|
@@ -428,7 +427,6 @@ handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon
|
|
|
_ = emqx_external_broker:add_shared_route(Topic, Group),
|
|
|
ok
|
|
|
end,
|
|
|
- ok = maybe_insert_alive_tab(SubPid),
|
|
|
ok = maybe_insert_round_robin_count({Group, Topic}),
|
|
|
true = ets:insert(?SHARED_SUBSCRIBER, {{Group, Topic}, SubPid}),
|
|
|
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
|
|
@@ -450,7 +448,6 @@ handle_info(
|
|
|
{mnesia_table_event, {write, #?SHARED_SUBSCRIPTION{subpid = SubPid}, _}},
|
|
|
State = #state{pmon = PMon}
|
|
|
) ->
|
|
|
- ok = maybe_insert_alive_tab(SubPid),
|
|
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
|
|
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
|
|
%% it `unsubscribed` the last topic.
|
|
|
@@ -509,14 +506,7 @@ if_no_more_subscribers(GroupTopic, Fn) ->
|
|
|
end,
|
|
|
ok.
|
|
|
|
|
|
-%% keep track of alive remote pids
|
|
|
-maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
|
|
|
-maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
|
|
|
- ets:insert(?ALIVE_SHARED_SUBSCRIBERS, {Pid}),
|
|
|
- ok.
|
|
|
-
|
|
|
cleanup_down(SubPid) ->
|
|
|
- ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SHARED_SUBSCRIBERS, SubPid),
|
|
|
lists:foreach(
|
|
|
fun(Record = #?SHARED_SUBSCRIPTION{topic = Topic, group = Group}) ->
|
|
|
ok = mria:dirty_delete_object(?SHARED_SUBSCRIPTION, Record),
|
|
|
@@ -542,10 +532,10 @@ is_active_sub(Pid, FailedSubs, All) ->
|
|
|
is_alive_sub(Pid).
|
|
|
|
|
|
%% erlang:is_process_alive/1 does not work with remote pid.
|
|
|
-is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
|
|
- erlang:is_process_alive(Pid);
|
|
|
is_alive_sub(Pid) ->
|
|
|
- [] =/= ets:lookup(?ALIVE_SHARED_SUBSCRIBERS, Pid).
|
|
|
+ %% When process is not local, the best guess is it's alive.
|
|
|
+ %% The race is when the pid is actually down cleanup_down is not evaluated yet
|
|
|
+ ?IS_REMOTE_PID(Pid) orelse erlang:is_process_alive(Pid).
|
|
|
|
|
|
delete_route_if_needed({Group, Topic} = GroupTopic) ->
|
|
|
if_no_more_subscribers(GroupTopic, fun() ->
|