|
@@ -97,6 +97,7 @@
|
|
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
|
|
-define(NO_ACK, no_ack).
|
|
-define(NO_ACK, no_ack).
|
|
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
|
|
|
|
+-define(SUBSCRIBER_DOWN, noproc).
|
|
|
|
|
|
|
|
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
|
|
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
|
|
|
|
|
|
|
@@ -262,7 +263,8 @@ redispatch_shared_message(#message{} = Msg) ->
|
|
|
%% Note that dispatch is called with self() in failed subs
|
|
%% Note that dispatch is called with self() in failed subs
|
|
|
%% This is done to avoid dispatching back to caller
|
|
%% This is done to avoid dispatching back to caller
|
|
|
Delivery = #delivery{sender = self(), message = Msg},
|
|
Delivery = #delivery{sender = self(), message = Msg},
|
|
|
- FailedSubs = #{self() => sender},
|
|
|
|
|
|
|
+ %% Self is terminating, it makes no sense to loop-back the dispatch
|
|
|
|
|
+ FailedSubs = #{self() => ?SUBSCRIBER_DOWN},
|
|
|
dispatch(Group, Topic, Delivery, FailedSubs).
|
|
dispatch(Group, Topic, Delivery, FailedSubs).
|
|
|
|
|
|
|
|
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
|
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
|
@@ -308,7 +310,7 @@ maybe_ack(Msg) ->
|
|
|
|
|
|
|
|
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
|
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
|
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
|
|
- All = subscribers(Group, Topic),
|
|
|
|
|
|
|
+ All = subscribers(Group, Topic, FailedSubs),
|
|
|
case is_active_sub(Sub0, FailedSubs, All) of
|
|
case is_active_sub(Sub0, FailedSubs, All) of
|
|
|
true ->
|
|
true ->
|
|
|
%% the old subscriber is still alive
|
|
%% the old subscriber is still alive
|
|
@@ -316,27 +318,25 @@ pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
|
{fresh, Sub0};
|
|
{fresh, Sub0};
|
|
|
false ->
|
|
false ->
|
|
|
%% randomly pick one for the first message
|
|
%% randomly pick one for the first message
|
|
|
- {Type, Sub} = do_pick(
|
|
|
|
|
- random,
|
|
|
|
|
- ClientId,
|
|
|
|
|
- SourceTopic,
|
|
|
|
|
- Group,
|
|
|
|
|
- Topic,
|
|
|
|
|
- FailedSubs#{Sub0 => noproc}
|
|
|
|
|
- ),
|
|
|
|
|
- %% stick to whatever pick result
|
|
|
|
|
- erlang:put({shared_sub_sticky, Group, Topic}, Sub),
|
|
|
|
|
- {Type, Sub}
|
|
|
|
|
|
|
+ FailedSubs1 = FailedSubs#{Sub0 => ?SUBSCRIBER_DOWN},
|
|
|
|
|
+ Res = do_pick(All, random, ClientId, SourceTopic, Group, Topic, FailedSubs1),
|
|
|
|
|
+ case Res of
|
|
|
|
|
+ {_, Sub} ->
|
|
|
|
|
+ %% stick to whatever pick result
|
|
|
|
|
+ erlang:put({shared_sub_sticky, Group, Topic}, Sub);
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ ok
|
|
|
|
|
+ end,
|
|
|
|
|
+ Res
|
|
|
end;
|
|
end;
|
|
|
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
|
- do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
|
|
|
|
|
|
|
+ All = subscribers(Group, Topic, FailedSubs),
|
|
|
|
|
+ do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
|
|
|
|
|
|
|
|
-do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
|
|
|
- All = subscribers(Group, Topic),
|
|
|
|
|
|
|
+do_pick([], _Strategy, _ClientId, _SourceTopic, _Group, _Topic, _FailedSubs) ->
|
|
|
|
|
+ false;
|
|
|
|
|
+do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
|
|
case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of
|
|
case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of
|
|
|
- [] when All =:= [] ->
|
|
|
|
|
- %% Genuinely no subscriber
|
|
|
|
|
- false;
|
|
|
|
|
[] ->
|
|
[] ->
|
|
|
%% All offline? pick one anyway
|
|
%% All offline? pick one anyway
|
|
|
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
|
|
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
|
|
@@ -383,6 +383,16 @@ do_pick_subscriber(Group, Topic, round_robin_per_group, _ClientId, _SourceTopic,
|
|
|
{Group, Topic}, 0
|
|
{Group, Topic}, 0
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
|
|
|
+%% Select ETS table to get all subscriber pids which are not down.
|
|
|
|
|
+subscribers(Group, Topic, FailedSubs) ->
|
|
|
|
|
+ lists:filter(
|
|
|
|
|
+ fun(P) ->
|
|
|
|
|
+ ?SUBSCRIBER_DOWN =/= maps:get(P, FailedSubs, false)
|
|
|
|
|
+ end,
|
|
|
|
|
+ subscribers(Group, Topic)
|
|
|
|
|
+ ).
|
|
|
|
|
+
|
|
|
|
|
+%% Select ETS table to get all subscriber pids.
|
|
|
subscribers(Group, Topic) ->
|
|
subscribers(Group, Topic) ->
|
|
|
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
|
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
|
|
|
|
|
|
@@ -523,8 +533,8 @@ update_stats(State) ->
|
|
|
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
|
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
|
|
is_active_sub(Pid, FailedSubs, All) ->
|
|
is_active_sub(Pid, FailedSubs, All) ->
|
|
|
lists:member(Pid, All) andalso
|
|
lists:member(Pid, All) andalso
|
|
|
- is_alive_sub(Pid) andalso
|
|
|
|
|
- (not maps:is_key(Pid, FailedSubs)).
|
|
|
|
|
|
|
+ (not maps:is_key(Pid, FailedSubs)) andalso
|
|
|
|
|
+ is_alive_sub(Pid).
|
|
|
|
|
|
|
|
%% erlang:is_process_alive/1 does not work with remote pid.
|
|
%% erlang:is_process_alive/1 does not work with remote pid.
|
|
|
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
|
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|