|
@@ -112,17 +112,17 @@ create_tabs() ->
|
|
|
%% Subscribe API
|
|
%% Subscribe API
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
--spec(subscribe(emqx_topic:topic()) -> ok).
|
|
|
|
|
|
|
+-spec(subscribe(emqx_types:topic()) -> ok).
|
|
|
subscribe(Topic) when is_binary(Topic) ->
|
|
subscribe(Topic) when is_binary(Topic) ->
|
|
|
subscribe(Topic, undefined).
|
|
subscribe(Topic, undefined).
|
|
|
|
|
|
|
|
--spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
|
|
|
|
|
|
+-spec(subscribe(emqx_types:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
|
|
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
|
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
|
|
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
|
|
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
|
|
|
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
|
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
|
|
subscribe(Topic, undefined, SubOpts).
|
|
subscribe(Topic, undefined, SubOpts).
|
|
|
|
|
|
|
|
--spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
|
|
|
|
|
|
+-spec(subscribe(emqx_types:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
|
|
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) ->
|
|
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) ->
|
|
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
|
|
case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
|
|
case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
|
|
@@ -165,7 +165,7 @@ do_subscribe(Group, Topic, SubPid, SubOpts) ->
|
|
|
%% Unsubscribe API
|
|
%% Unsubscribe API
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
--spec(unsubscribe(emqx_topic:topic()) -> ok).
|
|
|
|
|
|
|
+-spec(unsubscribe(emqx_types:topic()) -> ok).
|
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
SubPid = self(),
|
|
SubPid = self(),
|
|
|
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
|
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
|
@@ -279,7 +279,7 @@ forward(Node, To, Delivery, sync) ->
|
|
|
emqx_metrics:inc('messages.forward'), Result
|
|
emqx_metrics:inc('messages.forward'), Result
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
|
|
|
|
|
|
+-spec(dispatch(emqx_types:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
|
|
dispatch(Topic, #delivery{message = Msg}) ->
|
|
dispatch(Topic, #delivery{message = Msg}) ->
|
|
|
DispN = lists:foldl(
|
|
DispN = lists:foldl(
|
|
|
fun(Sub, N) ->
|
|
fun(Sub, N) ->
|
|
@@ -316,7 +316,7 @@ inc_dropped_cnt(Msg) ->
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-compile({inline, [subscribers/1]}).
|
|
-compile({inline, [subscribers/1]}).
|
|
|
--spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()})
|
|
|
|
|
|
|
+-spec(subscribers(emqx_types:topic() | {shard, emqx_types:topic(), non_neg_integer()})
|
|
|
-> [pid()]).
|
|
-> [pid()]).
|
|
|
subscribers(Topic) when is_binary(Topic) ->
|
|
subscribers(Topic) when is_binary(Topic) ->
|
|
|
lookup_value(?SUBSCRIBER, Topic, []);
|
|
lookup_value(?SUBSCRIBER, Topic, []);
|
|
@@ -351,7 +351,7 @@ subscriber_down(SubPid) ->
|
|
|
%%--------------------------------------------------------------------
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
-spec(subscriptions(pid() | emqx_types:subid())
|
|
-spec(subscriptions(pid() | emqx_types:subid())
|
|
|
- -> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
|
|
|
|
|
|
+ -> [{emqx_types:topic(), emqx_types:subopts()}]).
|
|
|
subscriptions(SubPid) when is_pid(SubPid) ->
|
|
subscriptions(SubPid) when is_pid(SubPid) ->
|
|
|
[{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
[{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
|
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])];
|
|
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])];
|
|
@@ -362,14 +362,14 @@ subscriptions(SubId) ->
|
|
|
undefined -> []
|
|
undefined -> []
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()).
|
|
|
|
|
|
|
+-spec(subscribed(pid() | emqx_types:subid(), emqx_types:topic()) -> boolean()).
|
|
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
|
ets:member(?SUBOPTION, {SubPid, Topic});
|
|
ets:member(?SUBOPTION, {SubPid, Topic});
|
|
|
subscribed(SubId, Topic) when ?is_subid(SubId) ->
|
|
subscribed(SubId, Topic) when ?is_subid(SubId) ->
|
|
|
SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
|
SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
|
|
ets:member(?SUBOPTION, {SubPid, Topic}).
|
|
ets:member(?SUBOPTION, {SubPid, Topic}).
|
|
|
|
|
|
|
|
--spec(get_subopts(pid(), emqx_topic:topic()) -> maybe(emqx_types:subopts())).
|
|
|
|
|
|
|
+-spec(get_subopts(pid(), emqx_types:topic()) -> maybe(emqx_types:subopts())).
|
|
|
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
|
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
|
|
lookup_value(?SUBOPTION, {SubPid, Topic});
|
|
lookup_value(?SUBOPTION, {SubPid, Topic});
|
|
|
get_subopts(SubId, Topic) when ?is_subid(SubId) ->
|
|
get_subopts(SubId, Topic) when ?is_subid(SubId) ->
|
|
@@ -379,7 +379,7 @@ get_subopts(SubId, Topic) when ?is_subid(SubId) ->
|
|
|
undefined -> undefined
|
|
undefined -> undefined
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
|
|
|
|
|
|
|
+-spec(set_subopts(emqx_types:topic(), emqx_types:subopts()) -> boolean()).
|
|
|
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
|
set_subopts(self(), Topic, NewOpts).
|
|
set_subopts(self(), Topic, NewOpts).
|
|
|
|
|
|
|
@@ -392,7 +392,7 @@ set_subopts(SubPid, Topic, NewOpts) ->
|
|
|
[] -> false
|
|
[] -> false
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec(topics() -> [emqx_topic:topic()]).
|
|
|
|
|
|
|
+-spec(topics() -> [emqx_types:topic()]).
|
|
|
topics() ->
|
|
topics() ->
|
|
|
emqx_router:topics().
|
|
emqx_router:topics().
|
|
|
|
|
|