|
|
@@ -20,7 +20,7 @@
|
|
|
|
|
|
-export([start_link/2]).
|
|
|
-export([subscribe/1, subscribe/2, subscribe/3]).
|
|
|
--export([unsubscribe/1, unsubscribe/2]).
|
|
|
+-export([unsubscribe/1]).
|
|
|
-export([subscriber_down/1]).
|
|
|
-export([publish/1, safe_publish/1]).
|
|
|
-export([dispatch/2]).
|
|
|
@@ -35,6 +35,8 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
code_change/3]).
|
|
|
|
|
|
+-import(emqx_tables, [lookup_value/2, lookup_value/3]).
|
|
|
+
|
|
|
-ifdef(TEST).
|
|
|
-compile(export_all).
|
|
|
-compile(nowarn_export_all).
|
|
|
@@ -42,8 +44,7 @@
|
|
|
|
|
|
-define(BROKER, ?MODULE).
|
|
|
|
|
|
-%% ETS tables
|
|
|
--define(SUBID, emqx_subid).
|
|
|
+%% ETS tables for PubSub
|
|
|
-define(SUBOPTION, emqx_suboption).
|
|
|
-define(SUBSCRIBER, emqx_subscriber).
|
|
|
-define(SUBSCRIPTION, emqx_subscription).
|
|
|
@@ -65,9 +66,6 @@ start_link(Pool, Id) ->
|
|
|
create_tabs() ->
|
|
|
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
|
|
|
|
|
|
- %% SubId: SubId -> SubPid
|
|
|
- ok = emqx_tables:new(?SUBID, [set | TabOpts]),
|
|
|
-
|
|
|
%% SubOption: {SubPid, Topic} -> SubOption
|
|
|
ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
|
|
|
|
|
|
@@ -76,7 +74,7 @@ create_tabs() ->
|
|
|
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
|
|
|
|
|
|
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
|
|
|
- %% bag: o(n) insert
|
|
|
+ %% bag: o(n) insert:(
|
|
|
ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -98,28 +96,37 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map
|
|
|
SubPid = self(),
|
|
|
case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
|
|
false ->
|
|
|
- ok = emqx_broker_helper:monitor(SubPid, SubId),
|
|
|
- %% true = ets:insert(?SUBID, {SubId, SubPid}),
|
|
|
- true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
- case maps:get(share, SubOpts, undefined) of
|
|
|
- undefined ->
|
|
|
- Shard = emqx_broker_helper:get_shard(SubPid, Topic),
|
|
|
- case Shard of
|
|
|
- 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid});
|
|
|
- I ->
|
|
|
- true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
- true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}})
|
|
|
- end,
|
|
|
- SubOpts1 = maps:put(shard, Shard, SubOpts),
|
|
|
- true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}),
|
|
|
- call(pick({Topic, Shard}), {subscribe, Topic});
|
|
|
- Group -> %% Shard subscription
|
|
|
- true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
|
- emqx_shared_sub:subscribe(Group, Topic, SubPid)
|
|
|
- end;
|
|
|
+ ok = emqx_broker_helper:monitor_sub(SubPid, SubId),
|
|
|
+ do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
|
|
true -> ok
|
|
|
end.
|
|
|
|
|
|
+with_subid(undefined, SubOpts) ->
|
|
|
+ SubOpts;
|
|
|
+with_subid(SubId, SubOpts) ->
|
|
|
+ maps:put(subid, SubId, SubOpts).
|
|
|
+
|
|
|
+%% @private
|
|
|
+do_subscribe(Topic, SubPid, SubOpts) ->
|
|
|
+ true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
+ Group = maps:get(share, SubOpts, undefined),
|
|
|
+ do_subscribe(Group, Topic, SubPid, SubOpts).
|
|
|
+
|
|
|
+do_subscribe(undefined, Topic, SubPid, SubOpts) ->
|
|
|
+ case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
|
|
|
+ 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
+ true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
|
+ call(pick(Topic), {subscribe, Topic});
|
|
|
+ I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
+ true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}),
|
|
|
+ call(pick({Topic, I}), {subscribe, Topic, I})
|
|
|
+ end;
|
|
|
+
|
|
|
+%% Shared subscription
|
|
|
+do_subscribe(Group, Topic, SubPid, SubOpts) ->
|
|
|
+ true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
|
+ emqx_shared_sub:subscribe(Group, Topic, SubPid).
|
|
|
+
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Unsubscribe API
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -130,33 +137,26 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
|
|
[{_, SubOpts}] ->
|
|
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
|
|
- case maps:get(share, SubOpts, undefined) of
|
|
|
- undefined ->
|
|
|
- case maps:get(shard, SubOpts, 0) of
|
|
|
- 0 ->
|
|
|
- true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
- ok = cast(pick(Topic), {unsubscribed, Topic});
|
|
|
- I ->
|
|
|
- true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
- case ets:member(emqx_subscriber, {shard, Topic, I}) of
|
|
|
- true -> ok;
|
|
|
- false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}})
|
|
|
- end,
|
|
|
- ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
|
|
- end;
|
|
|
- Group ->
|
|
|
- ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid)
|
|
|
- end,
|
|
|
- true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
- %%true = ets:delete_object(?SUBID, {SubId, SubPid}),
|
|
|
- true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
|
- ok;
|
|
|
+ do_unsubscribe(Topic, SubPid, SubOpts);
|
|
|
[] -> ok
|
|
|
end.
|
|
|
|
|
|
--spec(unsubscribe(emqx_topic:topic(), emqx_types:subid()) -> ok).
|
|
|
-unsubscribe(Topic, _SubId) when is_binary(Topic) ->
|
|
|
- unsubscribe(Topic).
|
|
|
+do_unsubscribe(Topic, SubPid, SubOpts) ->
|
|
|
+ true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
|
+ true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
+ Group = maps:get(share, SubOpts, undefined),
|
|
|
+ do_unsubscribe(Group, Topic, SubPid, SubOpts).
|
|
|
+
|
|
|
+do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
|
|
|
+ case maps:get(shard, SubOpts, 0) of
|
|
|
+ 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
+ cast(pick(Topic), {unsubscribed, Topic});
|
|
|
+ I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
+ cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
|
|
+ end;
|
|
|
+
|
|
|
+do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
|
|
|
+ emqx_shared_sub:unsubscribe(Group, Topic, SubPid).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Publish
|
|
|
@@ -241,23 +241,28 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
|
|
|
inc_dropped_cnt(Topic),
|
|
|
Delivery;
|
|
|
[Sub] -> %% optimize?
|
|
|
- dispatch(Sub, Topic, Msg),
|
|
|
- Delivery#delivery{results = [{dispatch, Topic, 1}|Results]};
|
|
|
+ Cnt = dispatch(Sub, Topic, Msg),
|
|
|
+ Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]};
|
|
|
Subs ->
|
|
|
- Count = lists:foldl(
|
|
|
- fun(Sub, Acc) ->
|
|
|
- dispatch(Sub, Topic, Msg), Acc + 1
|
|
|
- end, 0, Subs),
|
|
|
- Delivery#delivery{results = [{dispatch, Topic, Count}|Results]}
|
|
|
+ Cnt = lists:foldl(
|
|
|
+ fun(Sub, Acc) ->
|
|
|
+ dispatch(Sub, Topic, Msg) + Acc
|
|
|
+ end, 0, Subs),
|
|
|
+ Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}
|
|
|
end.
|
|
|
|
|
|
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
|
|
- SubPid ! {dispatch, Topic, Msg};
|
|
|
+ case erlang:is_process_alive(SubPid) of
|
|
|
+ true ->
|
|
|
+ SubPid ! {dispatch, Topic, Msg},
|
|
|
+ 1;
|
|
|
+ false -> 0
|
|
|
+ end;
|
|
|
dispatch({shard, I}, Topic, Msg) ->
|
|
|
-
|
|
|
- lists:foreach(fun(SubPid) ->
|
|
|
- SubPid ! {dispatch, Topic, Msg}
|
|
|
- end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])).
|
|
|
+ lists:foldl(
|
|
|
+ fun(SubPid, Cnt) ->
|
|
|
+ dispatch(SubPid, Topic, Msg) + Cnt
|
|
|
+ end, 0, subscribers({shard, Topic, I})).
|
|
|
|
|
|
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
|
|
ok;
|
|
|
@@ -265,8 +270,10 @@ inc_dropped_cnt(_Topic) ->
|
|
|
emqx_metrics:inc('messages/dropped').
|
|
|
|
|
|
-spec(subscribers(emqx_topic:topic()) -> [pid()]).
|
|
|
-subscribers(Topic) ->
|
|
|
- safe_lookup_element(?SUBSCRIBER, Topic, []).
|
|
|
+subscribers(Topic) when is_binary(Topic) ->
|
|
|
+ lookup_value(?SUBSCRIBER, Topic, []);
|
|
|
+subscribers(Shard = {shard, _Topic, _I}) ->
|
|
|
+ lookup_value(?SUBSCRIBER, Shard, []).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Subscriber is down
|
|
|
@@ -275,27 +282,21 @@ subscribers(Topic) ->
|
|
|
-spec(subscriber_down(pid()) -> true).
|
|
|
subscriber_down(SubPid) ->
|
|
|
lists:foreach(
|
|
|
- fun(Sub = {_Pid, Topic}) ->
|
|
|
- case ets:lookup(?SUBOPTION, Sub) of
|
|
|
- [{_, SubOpts}] ->
|
|
|
+ fun(Topic) ->
|
|
|
+ case lookup_value(?SUBOPTION, {SubPid, Topic}) of
|
|
|
+ SubOpts when is_map(SubOpts) ->
|
|
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
|
|
+ true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
|
case maps:get(shard, SubOpts, 0) of
|
|
|
- 0 ->
|
|
|
- true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
- ok = cast(pick(Topic), {unsubscribed, Topic});
|
|
|
- I ->
|
|
|
- true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
- case ets:member(emqx_subscriber, {shard, Topic, I}) of
|
|
|
- true -> ok;
|
|
|
- false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}})
|
|
|
- end,
|
|
|
- ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
|
|
- end,
|
|
|
- ets:delete(?SUBOPTION, Sub);
|
|
|
- [] -> ok
|
|
|
+ 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
|
|
+ ok = cast(pick(Topic), {unsubscribed, Topic});
|
|
|
+ I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
|
+ ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
|
|
+ end;
|
|
|
+ undefined -> ok
|
|
|
end
|
|
|
- end, ets:lookup(?SUBSCRIPTION, SubPid)),
|
|
|
- true = ets:delete(?SUBSCRIPTION, SubPid).
|
|
|
+ end, lookup_value(?SUBSCRIPTION, SubPid, [])),
|
|
|
+ ets:delete(?SUBSCRIPTION, SubPid).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Management APIs
|
|
|
@@ -303,20 +304,32 @@ subscriber_down(SubPid) ->
|
|
|
|
|
|
-spec(subscriptions(pid() | emqx_types:subid())
|
|
|
-> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
|
|
-subscriptions(SubPid) ->
|
|
|
- [{Topic, safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
|
- || Topic <- safe_lookup_element(?SUBSCRIPTION, SubPid, [])].
|
|
|
+subscriptions(SubPid) when is_pid(SubPid) ->
|
|
|
+ [{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
|
+ || Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])];
|
|
|
+subscriptions(SubId) ->
|
|
|
+ case emqx_broker_helper:lookup_subpid(SubId) of
|
|
|
+ SubPid when is_pid(SubPid) ->
|
|
|
+ subscriptions(SubPid);
|
|
|
+ undefined -> []
|
|
|
+ end.
|
|
|
|
|
|
-spec(subscribed(pid(), emqx_topic:topic()) -> boolean()).
|
|
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
|
ets:member(?SUBOPTION, {SubPid, Topic});
|
|
|
subscribed(SubId, Topic) when ?is_subid(SubId) ->
|
|
|
- %%FIXME:... SubId -> SubPid
|
|
|
- ets:member(?SUBOPTION, {SubId, Topic}).
|
|
|
+ SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
|
|
+ ets:member(?SUBOPTION, {SubPid, Topic}).
|
|
|
|
|
|
--spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts()).
|
|
|
+-spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts() | undefined).
|
|
|
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
|
|
- safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{}).
|
|
|
+ lookup_value(?SUBOPTION, {SubPid, Topic});
|
|
|
+get_subopts(SubId, Topic) when ?is_subid(SubId) ->
|
|
|
+ case emqx_broker_helper:lookup_subpid(SubId) of
|
|
|
+ SubPid when is_pid(SubPid) ->
|
|
|
+ get_subopts(SubPid, Topic);
|
|
|
+ undefined -> undefined
|
|
|
+ end.
|
|
|
|
|
|
-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
|
|
|
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
|
@@ -331,9 +344,6 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
|
topics() ->
|
|
|
emqx_router:topics().
|
|
|
|
|
|
-safe_lookup_element(Tab, Key, Def) ->
|
|
|
- try ets:lookup_element(Tab, Key, 2) catch error:badarg -> Def end.
|
|
|
-
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Stats fun
|
|
|
%%------------------------------------------------------------------------------
|
|
|
@@ -372,10 +382,15 @@ init([Pool, Id]) ->
|
|
|
{ok, #{pool => Pool, id => Id}}.
|
|
|
|
|
|
handle_call({subscribe, Topic}, _From, State) ->
|
|
|
- Ok = case get(Topic) of
|
|
|
+ Ok = emqx_router:do_add_route(Topic),
|
|
|
+ {reply, Ok, State};
|
|
|
+
|
|
|
+handle_call({subscribe, Topic, I}, _From, State) ->
|
|
|
+ Ok = case get(Shard = {Topic, I}) of
|
|
|
undefined ->
|
|
|
- _ = put(Topic, true),
|
|
|
- emqx_router:do_add_route(Topic);
|
|
|
+ _ = put(Shard, true),
|
|
|
+ true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
|
|
|
+ cast(pick(Topic), {subscribe, Topic});
|
|
|
true -> ok
|
|
|
end,
|
|
|
{reply, Ok, State};
|
|
|
@@ -384,11 +399,18 @@ handle_call(Req, _From, State) ->
|
|
|
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|
|
|
|
|
|
+handle_cast({subscribe, Topic}, State) ->
|
|
|
+ case emqx_router:do_add_route(Topic) of
|
|
|
+ ok -> ok;
|
|
|
+ {error, Reason} ->
|
|
|
+ emqx_logger:error("[Broker] Failed to add route: ~p", [Reason])
|
|
|
+ end,
|
|
|
+ {noreply, State};
|
|
|
+
|
|
|
handle_cast({unsubscribed, Topic}, State) ->
|
|
|
case ets:member(?SUBSCRIBER, Topic) of
|
|
|
false ->
|
|
|
- _ = erase(Topic),
|
|
|
- emqx_router:do_delete_route(Topic);
|
|
|
+ _ = emqx_router:do_delete_route(Topic);
|
|
|
true -> ok
|
|
|
end,
|
|
|
{noreply, State};
|
|
|
@@ -396,6 +418,7 @@ handle_cast({unsubscribed, Topic}, State) ->
|
|
|
handle_cast({unsubscribed, Topic, I}, State) ->
|
|
|
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
|
|
|
false ->
|
|
|
+ _ = erase({Topic, I}),
|
|
|
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
|
|
|
cast(pick(Topic), {unsubscribed, Topic});
|
|
|
true -> ok
|