|
|
@@ -75,15 +75,19 @@ mnesia(copy) ->
|
|
|
start_link() ->
|
|
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
|
|
|
|
|
+-spec(subscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok).
|
|
|
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
|
|
gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
|
|
|
|
|
|
+-spec(unsubscribe(emqx_topic:group(), emqx_topic:topic(), pid()) -> ok).
|
|
|
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
|
|
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
|
|
|
|
|
|
record(Group, Topic, SubPid) ->
|
|
|
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
|
|
|
|
|
+-spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
|
|
|
+ -> emqx_types:delivery()).
|
|
|
dispatch(Group, Topic, Delivery) ->
|
|
|
dispatch(Group, Topic, Delivery, _FailedSubs = []).
|
|
|
|
|
|
@@ -173,12 +177,12 @@ get_ack_ref(Msg) ->
|
|
|
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
|
|
is_ack_required(Msg) -> ?no_ack =/= get_ack_ref(Msg).
|
|
|
|
|
|
-%% @doc Negative ack dropped message due to message queue being full.
|
|
|
+%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
|
|
-spec(maybe_nack_dropped(emqx_types:message()) -> ok).
|
|
|
maybe_nack_dropped(Msg) ->
|
|
|
case get_ack_ref(Msg) of
|
|
|
?no_ack -> ok;
|
|
|
- {Sender, Ref} -> nack(Sender, Ref, drpped)
|
|
|
+ {Sender, Ref} -> nack(Sender, Ref, dropped)
|
|
|
end.
|
|
|
|
|
|
%% @doc Negative ack message due to connection down.
|
|
|
@@ -325,9 +329,9 @@ terminate(_Reason, _State) ->
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
-%%--------------------------------------------------------------------
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
-%%--------------------------------------------------------------------
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
|
|
|
%% keep track of alive remote pids
|
|
|
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
|