|
|
@@ -19,150 +19,130 @@
|
|
|
-include("emqx.hrl").
|
|
|
|
|
|
-export([start_link/2]).
|
|
|
--export([subscribe/1, subscribe/2, subscribe/3, subscribe/4]).
|
|
|
--export([multi_subscribe/1, multi_subscribe/2, multi_subscribe/3]).
|
|
|
+-export([subscribe/1, subscribe/2, subscribe/3]).
|
|
|
+-export([unsubscribe/1, unsubscribe/2]).
|
|
|
+-export([subscriber_down/1]).
|
|
|
-export([publish/1, safe_publish/1]).
|
|
|
--export([unsubscribe/1, unsubscribe/2, unsubscribe/3]).
|
|
|
--export([multi_unsubscribe/1, multi_unsubscribe/2, multi_unsubscribe/3]).
|
|
|
-export([dispatch/2, dispatch/3]).
|
|
|
-export([subscriptions/1, subscribers/1, subscribed/2]).
|
|
|
--export([get_subopts/2, set_subopts/3]).
|
|
|
+-export([get_subopts/2, set_subopts/2]).
|
|
|
-export([topics/0]).
|
|
|
+%% Stats fun
|
|
|
+-export([stats_fun/0]).
|
|
|
|
|
|
%% gen_server callbacks
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
|
|
code_change/3]).
|
|
|
|
|
|
--ifdef(TEST).
|
|
|
--compile(export_all).
|
|
|
--compile(nowarn_export_all).
|
|
|
--endif.
|
|
|
-
|
|
|
--record(state, {pool, id, submap, submon}).
|
|
|
--record(subscribe, {topic, subpid, subid, subopts = #{}}).
|
|
|
--record(unsubscribe, {topic, subpid, subid}).
|
|
|
-
|
|
|
-%% The default request timeout
|
|
|
+-define(SHARD, 1024).
|
|
|
-define(TIMEOUT, 60000).
|
|
|
-define(BROKER, ?MODULE).
|
|
|
|
|
|
%% ETS tables
|
|
|
--define(SUBOPTION, emqx_suboption).
|
|
|
--define(SUBSCRIBER, emqx_subscriber).
|
|
|
+-define(SUBID, emqx_subid).
|
|
|
+-define(SUBOPTION, emqx_suboption).
|
|
|
+-define(SUBSCRIBER, emqx_subscriber).
|
|
|
-define(SUBSCRIPTION, emqx_subscription).
|
|
|
|
|
|
+%% Gards
|
|
|
-define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))).
|
|
|
|
|
|
--spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
|
|
+-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
|
|
|
start_link(Pool, Id) ->
|
|
|
- gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE,
|
|
|
- [Pool, Id], [{hibernate_after, 2000}]).
|
|
|
+ _ = create_tabs(),
|
|
|
+ gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)}, ?MODULE, [Pool, Id], []).
|
|
|
+
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Create tabs
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+
|
|
|
+create_tabs() ->
|
|
|
+ TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
|
|
|
+
|
|
|
+ %% SubId: SubId -> SubPid1, SubPid2,...
|
|
|
+ _ = emqx_tables:new(?SUBID, [bag | TabOpts]),
|
|
|
+ %% SubOption: {SubPid, Topic} -> SubOption
|
|
|
+ _ = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
|
|
|
+
|
|
|
+ %% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
|
|
|
+ %% duplicate_bag: o(1) insert
|
|
|
+ _ = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
|
|
|
+
|
|
|
+ %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
|
|
|
+ %% duplicate_bag: o(1) insert
|
|
|
+ emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-%% Subscribe
|
|
|
+%% Subscribe API
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-spec(subscribe(emqx_topic:topic()) -> ok).
|
|
|
subscribe(Topic) when is_binary(Topic) ->
|
|
|
- subscribe(Topic, self()).
|
|
|
+ subscribe(Topic, undefined).
|
|
|
|
|
|
--spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok).
|
|
|
-subscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
|
|
- subscribe(Topic, SubPid, undefined);
|
|
|
+-spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
|
|
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
|
|
- subscribe(Topic, self(), SubId).
|
|
|
-
|
|
|
--spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(),
|
|
|
- emqx_types:subid() | emqx_types:subopts()) -> ok).
|
|
|
-subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
|
|
|
- subscribe(Topic, SubPid, SubId, #{qos => 0});
|
|
|
-subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) ->
|
|
|
- subscribe(Topic, SubPid, undefined, SubOpts);
|
|
|
+ subscribe(Topic, SubId, #{});
|
|
|
+subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
|
|
+ subscribe(Topic, undefined, SubOpts).
|
|
|
+
|
|
|
+-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
|
|
subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->
|
|
|
- subscribe(Topic, self(), SubId, SubOpts).
|
|
|
-
|
|
|
--spec(subscribe(emqx_topic:topic(), pid(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
|
|
-subscribe(Topic, SubPid, SubId, SubOpts) when is_binary(Topic), is_pid(SubPid),
|
|
|
- ?is_subid(SubId), is_map(SubOpts) ->
|
|
|
- Broker = pick(SubPid),
|
|
|
- SubReq = #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts},
|
|
|
- wait_for_reply(async_call(Broker, SubReq), ?TIMEOUT).
|
|
|
-
|
|
|
--spec(multi_subscribe(emqx_types:topic_table()) -> ok).
|
|
|
-multi_subscribe(TopicTable) when is_list(TopicTable) ->
|
|
|
- multi_subscribe(TopicTable, self()).
|
|
|
-
|
|
|
--spec(multi_subscribe(emqx_types:topic_table(), pid() | emqx_types:subid()) -> ok).
|
|
|
-multi_subscribe(TopicTable, SubPid) when is_pid(SubPid) ->
|
|
|
- multi_subscribe(TopicTable, SubPid, undefined);
|
|
|
-multi_subscribe(TopicTable, SubId) when ?is_subid(SubId) ->
|
|
|
- multi_subscribe(TopicTable, self(), SubId).
|
|
|
-
|
|
|
--spec(multi_subscribe(emqx_types:topic_table(), pid(), emqx_types:subid()) -> ok).
|
|
|
-multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) ->
|
|
|
- Broker = pick(SubPid),
|
|
|
- SubReq = fun(Topic, SubOpts) ->
|
|
|
- #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}
|
|
|
- end,
|
|
|
- wait_for_replies([async_call(Broker, SubReq(Topic, SubOpts))
|
|
|
- || {Topic, SubOpts} <- TopicTable], ?TIMEOUT).
|
|
|
+ SubPid = self(),
|
|
|
+ case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
|
|
+ false ->
|
|
|
+ ok = emqx_broker_helper:monitor(SubPid, SubId),
|
|
|
+ Group = maps:get(share, SubOpts, undefined),
|
|
|
+ %% true = ets:insert(?SUBID, {SubId, SubPid}),
|
|
|
+ true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
+ %% SeqId = emqx_broker_helper:create_seq(Topic),
|
|
|
+ true = ets:insert(?SUBSCRIBER, {Topic, shared(Group, SubPid)}),
|
|
|
+ true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
|
+ ok = emqx_shared_sub:subscribe(Group, Topic, SubPid),
|
|
|
+ call(pick(Topic), {subscribe, Group, Topic});
|
|
|
+ true -> ok
|
|
|
+ end.
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-%% Unsubscribe
|
|
|
+%% Unsubscribe API
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-spec(unsubscribe(emqx_topic:topic()) -> ok).
|
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
- unsubscribe(Topic, self()).
|
|
|
-
|
|
|
--spec(unsubscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok).
|
|
|
-unsubscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
|
|
- unsubscribe(Topic, SubPid, undefined);
|
|
|
-unsubscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
|
|
- unsubscribe(Topic, self(), SubId).
|
|
|
-
|
|
|
--spec(unsubscribe(emqx_topic:topic(), pid(), emqx_types:subid()) -> ok).
|
|
|
-unsubscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
|
|
|
- Broker = pick(SubPid),
|
|
|
- UnsubReq = #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId},
|
|
|
- wait_for_reply(async_call(Broker, UnsubReq), ?TIMEOUT).
|
|
|
-
|
|
|
--spec(multi_unsubscribe([emqx_topic:topic()]) -> ok).
|
|
|
-multi_unsubscribe(Topics) ->
|
|
|
- multi_unsubscribe(Topics, self()).
|
|
|
-
|
|
|
--spec(multi_unsubscribe([emqx_topic:topic()], pid() | emqx_types:subid()) -> ok).
|
|
|
-multi_unsubscribe(Topics, SubPid) when is_pid(SubPid) ->
|
|
|
- multi_unsubscribe(Topics, SubPid, undefined);
|
|
|
-multi_unsubscribe(Topics, SubId) when ?is_subid(SubId) ->
|
|
|
- multi_unsubscribe(Topics, self(), SubId).
|
|
|
-
|
|
|
--spec(multi_unsubscribe([emqx_topic:topic()], pid(), emqx_types:subid()) -> ok).
|
|
|
-multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) ->
|
|
|
- Broker = pick(SubPid),
|
|
|
- UnsubReq = fun(Topic) ->
|
|
|
- #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}
|
|
|
- end,
|
|
|
- wait_for_replies([async_call(Broker, UnsubReq(Topic)) || Topic <- Topics], ?TIMEOUT).
|
|
|
+ SubPid = self(),
|
|
|
+ case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
|
|
+ [{_, SubOpts}] ->
|
|
|
+ Group = maps:get(share, SubOpts, undefined),
|
|
|
+ true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
|
|
+ true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}),
|
|
|
+ true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
|
+ ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid),
|
|
|
+ call(pick(Topic), {unsubscribe, Group, Topic});
|
|
|
+ [] -> ok
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec(unsubscribe(emqx_topic:topic(), emqx_types:subid()) -> ok).
|
|
|
+unsubscribe(Topic, _SubId) when is_binary(Topic) ->
|
|
|
+ unsubscribe(Topic).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Publish
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
--spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
|
|
|
+-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
|
|
|
publish(Msg) when is_record(Msg, message) ->
|
|
|
_ = emqx_tracer:trace(publish, Msg),
|
|
|
- {ok, case emqx_hooks:run('message.publish', [], Msg) of
|
|
|
- {ok, Msg1 = #message{topic = Topic}} ->
|
|
|
- Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
|
|
|
- Delivery#delivery.results;
|
|
|
- {stop, _} ->
|
|
|
- emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
|
|
|
- []
|
|
|
- end}.
|
|
|
+ case emqx_hooks:run('message.publish', [], Msg) of
|
|
|
+ {ok, Msg1 = #message{topic = Topic}} ->
|
|
|
+ Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
|
|
|
+ Delivery#delivery.results;
|
|
|
+ {stop, _} ->
|
|
|
+ emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
|
|
|
+ []
|
|
|
+ end.
|
|
|
|
|
|
--spec(safe_publish(emqx_types:message()) -> ok).
|
|
|
%% Called internally
|
|
|
+-spec(safe_publish(emqx_types:message()) -> ok).
|
|
|
safe_publish(Msg) when is_record(Msg, message) ->
|
|
|
try
|
|
|
publish(Msg)
|
|
|
@@ -227,98 +207,113 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
|
|
|
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
|
|
inc_dropped_cnt(Topic),
|
|
|
Delivery;
|
|
|
- [Sub] -> %% optimize?
|
|
|
- dispatch(Sub, Topic, Msg),
|
|
|
+ [SubPid] -> %% optimize?
|
|
|
+ dispatch(SubPid, Topic, Msg),
|
|
|
Delivery#delivery{results = [{dispatch, Topic, 1}|Results]};
|
|
|
- Subscribers ->
|
|
|
- Count = lists:foldl(fun(Sub, Acc) ->
|
|
|
- dispatch(Sub, Topic, Msg), Acc + 1
|
|
|
- end, 0, Subscribers),
|
|
|
+ SubPids ->
|
|
|
+ Count = lists:foldl(fun(SubPid, Acc) ->
|
|
|
+ dispatch(SubPid, Topic, Msg), Acc + 1
|
|
|
+ end, 0, SubPids),
|
|
|
Delivery#delivery{results = [{dispatch, Topic, Count}|Results]}
|
|
|
end.
|
|
|
|
|
|
-dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) ->
|
|
|
- SubPid ! {dispatch, Topic, Msg};
|
|
|
-dispatch({share, _Group, _Sub}, _Topic, _Msg) ->
|
|
|
- ignored.
|
|
|
+dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
|
|
+ SubPid ! {dispatch, Topic, Msg},
|
|
|
+ true;
|
|
|
+%% TODO: how to optimize the share sub?
|
|
|
+dispatch({share, _Group, _SubPid}, _Topic, _Msg) ->
|
|
|
+ false.
|
|
|
|
|
|
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
|
|
ok;
|
|
|
inc_dropped_cnt(_Topic) ->
|
|
|
emqx_metrics:inc('messages/dropped').
|
|
|
|
|
|
--spec(subscribers(emqx_topic:topic()) -> [emqx_types:subscriber()]).
|
|
|
+-spec(subscribers(emqx_topic:topic()) -> [pid()]).
|
|
|
subscribers(Topic) ->
|
|
|
- try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end.
|
|
|
+ safe_lookup_element(?SUBSCRIBER, Topic, []).
|
|
|
|
|
|
--spec(subscriptions(emqx_types:subscriber())
|
|
|
- -> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
|
|
-subscriptions(Subscriber) ->
|
|
|
- lists:map(fun({_, {share, _Group, Topic}}) ->
|
|
|
- subscription(Topic, Subscriber);
|
|
|
- ({_, Topic}) ->
|
|
|
- subscription(Topic, Subscriber)
|
|
|
- end, ets:lookup(?SUBSCRIPTION, Subscriber)).
|
|
|
-
|
|
|
-subscription(Topic, Subscriber) ->
|
|
|
- {Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}.
|
|
|
-
|
|
|
--spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
|
|
|
-subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
|
|
- case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of
|
|
|
- {Match, _} ->
|
|
|
- length(Match) >= 1;
|
|
|
- '$end_of_table' ->
|
|
|
- false
|
|
|
- end;
|
|
|
-subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
|
|
|
- case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of
|
|
|
- {Match, _} ->
|
|
|
- length(Match) >= 1;
|
|
|
- '$end_of_table' ->
|
|
|
- false
|
|
|
- end;
|
|
|
-subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
|
|
|
- ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
|
|
|
-
|
|
|
--spec(get_subopts(emqx_topic:topic(), emqx_types:subscriber()) -> emqx_types:subopts()).
|
|
|
-get_subopts(Topic, Subscriber) when is_binary(Topic) ->
|
|
|
- try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)
|
|
|
- catch error:badarg -> []
|
|
|
- end.
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Subscriber is down
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+
|
|
|
+-spec(subscriber_down(pid()) -> true).
|
|
|
+subscriber_down(SubPid) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun(Sub = {_, Topic}) ->
|
|
|
+ case ets:lookup(?SUBOPTION, Sub) of
|
|
|
+ [{_, SubOpts}] ->
|
|
|
+ Group = maps:get(share, SubOpts, undefined),
|
|
|
+ true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}),
|
|
|
+ true = ets:delete(?SUBOPTION, Sub),
|
|
|
+ gen_server:cast(pick(Topic), {unsubscribe, Group, Topic});
|
|
|
+ [] -> ok
|
|
|
+ end
|
|
|
+ end, ets:lookup(?SUBSCRIPTION, SubPid)),
|
|
|
+ ets:delete(?SUBSCRIPTION, SubPid).
|
|
|
+
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Management APIs
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
|
|
|
--spec(set_subopts(emqx_topic:topic(), emqx_types:subscriber(), emqx_types:subopts()) -> boolean()).
|
|
|
-set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_map(Opts) ->
|
|
|
- case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
|
|
+-spec(subscriptions(pid() | emqx_types:subid())
|
|
|
+ -> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
|
|
+subscriptions(SubPid) ->
|
|
|
+ [{Topic, safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
|
+ || Topic <- safe_lookup_element(?SUBSCRIPTION, SubPid, [])].
|
|
|
+
|
|
|
+-spec(subscribed(pid(), emqx_topic:topic()) -> boolean()).
|
|
|
+subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
|
+ ets:member(?SUBOPTION, {SubPid, Topic});
|
|
|
+subscribed(SubId, Topic) when ?is_subid(SubId) ->
|
|
|
+ %%FIXME:... SubId -> SubPid
|
|
|
+ ets:member(?SUBOPTION, {SubId, Topic}).
|
|
|
+
|
|
|
+-spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts()).
|
|
|
+get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
|
|
+ safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{}).
|
|
|
+
|
|
|
+-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
|
|
|
+set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
|
+ Sub = {self(), Topic},
|
|
|
+ case ets:lookup(?SUBOPTION, Sub) of
|
|
|
[{_, OldOpts}] ->
|
|
|
- ets:insert(?SUBOPTION, {{Topic, Subscriber}, maps:merge(OldOpts, Opts)});
|
|
|
+ ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});
|
|
|
[] -> false
|
|
|
end.
|
|
|
|
|
|
-async_call(Broker, Req) ->
|
|
|
- From = {self(), Tag = make_ref()},
|
|
|
- ok = gen_server:cast(Broker, {From, Req}),
|
|
|
- Tag.
|
|
|
+-spec(topics() -> [emqx_topic:topic()]).
|
|
|
+topics() ->
|
|
|
+ emqx_router:topics().
|
|
|
|
|
|
-wait_for_replies(Tags, Timeout) ->
|
|
|
- lists:foreach(
|
|
|
- fun(Tag) ->
|
|
|
- wait_for_reply(Tag, Timeout)
|
|
|
- end, Tags).
|
|
|
-
|
|
|
-wait_for_reply(Tag, Timeout) ->
|
|
|
- receive
|
|
|
- {Tag, Reply} -> Reply
|
|
|
- after Timeout ->
|
|
|
- exit(timeout)
|
|
|
+safe_lookup_element(Tab, Key, Def) ->
|
|
|
+ try ets:lookup_element(Tab, Key, 2) catch error:badarg -> Def end.
|
|
|
+
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Stats fun
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+
|
|
|
+stats_fun() ->
|
|
|
+ safe_update_stats(?SUBSCRIBER, 'subscribers/count', 'subscribers/max'),
|
|
|
+ safe_update_stats(?SUBSCRIPTION, 'subscriptions/count', 'subscriptions/max'),
|
|
|
+ safe_update_stats(?SUBOPTION, 'suboptions/count', 'suboptions/max').
|
|
|
+
|
|
|
+safe_update_stats(Tab, Stat, MaxStat) ->
|
|
|
+ case ets:info(Tab, size) of
|
|
|
+ undefined -> ok;
|
|
|
+ Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
|
|
end.
|
|
|
|
|
|
-%% Pick a broker
|
|
|
-pick(SubPid) when is_pid(SubPid) ->
|
|
|
- gproc_pool:pick_worker(broker, SubPid).
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
+%% Pick and call
|
|
|
+%%------------------------------------------------------------------------------
|
|
|
|
|
|
--spec(topics() -> [emqx_topic:topic()]).
|
|
|
-topics() -> emqx_router:topics().
|
|
|
+call(Broker, Req) ->
|
|
|
+ gen_server:call(Broker, Req, ?TIMEOUT).
|
|
|
+
|
|
|
+%% Pick a broker
|
|
|
+pick(Topic) ->
|
|
|
+ gproc_pool:pick_worker(broker, Topic).
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
@@ -326,61 +321,32 @@ topics() -> emqx_router:topics().
|
|
|
|
|
|
init([Pool, Id]) ->
|
|
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
|
- {ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}.
|
|
|
+ {ok, #{pool => Pool, id => Id}}.
|
|
|
+
|
|
|
+handle_call({subscribe, Group, Topic}, _From, State) ->
|
|
|
+ Ok = emqx_router:add_route(Topic, dest(Group)),
|
|
|
+ {reply, Ok, State};
|
|
|
+
|
|
|
+handle_call({unsubscribe, Group, Topic}, _From, State) ->
|
|
|
+ Ok = case ets:member(?SUBSCRIBER, Topic) of
|
|
|
+ false -> emqx_router:delete_route(Topic, dest(Group));
|
|
|
+ true -> ok
|
|
|
+ end,
|
|
|
+ {reply, Ok, State};
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
|
|
|
{reply, ignored, State}.
|
|
|
|
|
|
-handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
|
|
|
- Subscriber = {SubPid, SubId},
|
|
|
- case ets:member(?SUBOPTION, {Topic, Subscriber}) of
|
|
|
- false ->
|
|
|
- resubscribe(From, {Subscriber, SubOpts, Topic}, State);
|
|
|
- true ->
|
|
|
- case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of
|
|
|
- true ->
|
|
|
- gen_server:reply(From, ok),
|
|
|
- {noreply, State};
|
|
|
- false ->
|
|
|
- resubscribe(From, {Subscriber, SubOpts, Topic}, State)
|
|
|
- end
|
|
|
- end;
|
|
|
-
|
|
|
-handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) ->
|
|
|
- Subscriber = {SubPid, SubId},
|
|
|
- case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
|
|
|
- [{_, SubOpts}] ->
|
|
|
- Group = maps:get(share, SubOpts, undefined),
|
|
|
- true = do_unsubscribe(Group, Topic, Subscriber),
|
|
|
- emqx_shared_sub:unsubscribe(Group, Topic, SubPid),
|
|
|
- case ets:member(?SUBSCRIBER, Topic) of
|
|
|
- false -> emqx_router:del_route(From, Topic, dest(Group));
|
|
|
- true -> gen_server:reply(From, ok)
|
|
|
- end;
|
|
|
- [] -> gen_server:reply(From, ok)
|
|
|
- end,
|
|
|
- {noreply, State};
|
|
|
-
|
|
|
handle_cast(Msg, State) ->
|
|
|
emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{submap = SubMap}) ->
|
|
|
- case maps:find(SubPid, SubMap) of
|
|
|
- {ok, SubIds} ->
|
|
|
- lists:foreach(fun(SubId) -> subscriber_down({SubPid, SubId}) end, SubIds),
|
|
|
- {noreply, demonitor_subscriber(SubPid, State)};
|
|
|
- error ->
|
|
|
- emqx_logger:error("unexpected 'DOWN': ~p, reason: ~p", [SubPid, Reason]),
|
|
|
- {noreply, State}
|
|
|
- end;
|
|
|
-
|
|
|
handle_info(Info, State) ->
|
|
|
emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
|
|
|
{noreply, State}.
|
|
|
|
|
|
-terminate(_Reason, #state{pool = Pool, id = Id}) ->
|
|
|
+terminate(_Reason, #{pool := Pool, id := Id}) ->
|
|
|
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
|
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
@@ -390,69 +356,9 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%% Internal functions
|
|
|
%%------------------------------------------------------------------------------
|
|
|
|
|
|
-resubscribe(From, {Subscriber, SubOpts, Topic}, State) ->
|
|
|
- {SubPid, _} = Subscriber,
|
|
|
- Group = maps:get(share, SubOpts, undefined),
|
|
|
- true = do_subscribe(Group, Topic, Subscriber, SubOpts),
|
|
|
- emqx_shared_sub:subscribe(Group, Topic, SubPid),
|
|
|
- emqx_router:add_route(From, Topic, dest(Group)),
|
|
|
- {noreply, monitor_subscriber(Subscriber, State)}.
|
|
|
-
|
|
|
-insert_subscriber(Group, Topic, Subscriber) ->
|
|
|
- Subscribers = subscribers(Topic),
|
|
|
- case lists:member(Subscriber, Subscribers) of
|
|
|
- false ->
|
|
|
- ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)});
|
|
|
- _ ->
|
|
|
- ok
|
|
|
- end.
|
|
|
-
|
|
|
-do_subscribe(Group, Topic, Subscriber, SubOpts) ->
|
|
|
- ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
|
|
- insert_subscriber(Group, Topic, Subscriber),
|
|
|
- ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
|
|
|
-
|
|
|
-do_unsubscribe(Group, Topic, Subscriber) ->
|
|
|
- ets:delete_object(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
|
|
|
- ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
|
|
|
- ets:delete(?SUBOPTION, {Topic, Subscriber}).
|
|
|
-
|
|
|
-subscriber_down(Subscriber) ->
|
|
|
- Topics = lists:map(fun({_, {share, Group, Topic}}) ->
|
|
|
- {Topic, Group};
|
|
|
- ({_, Topic}) ->
|
|
|
- {Topic, undefined}
|
|
|
- end, ets:lookup(?SUBSCRIPTION, Subscriber)),
|
|
|
- lists:foreach(fun({Topic, undefined}) ->
|
|
|
- true = do_unsubscribe(undefined, Topic, Subscriber),
|
|
|
- ets:member(?SUBSCRIBER, Topic) orelse emqx_router:del_route(Topic, dest(undefined));
|
|
|
- ({Topic, Group}) ->
|
|
|
- true = do_unsubscribe(Group, Topic, Subscriber),
|
|
|
- Groups = groups(Topic),
|
|
|
- case lists:member(Group, lists:usort(Groups)) of
|
|
|
- true -> ok;
|
|
|
- false -> emqx_router:del_route(Topic, dest(Group))
|
|
|
- end
|
|
|
- end, Topics).
|
|
|
-
|
|
|
-monitor_subscriber({SubPid, SubId}, State = #state{submap = SubMap, submon = SubMon}) ->
|
|
|
- UpFun = fun(SubIds) -> lists:usort([SubId|SubIds]) end,
|
|
|
- State#state{submap = maps:update_with(SubPid, UpFun, [SubId], SubMap),
|
|
|
- submon = emqx_pmon:monitor(SubPid, SubMon)}.
|
|
|
-
|
|
|
-demonitor_subscriber(SubPid, State = #state{submap = SubMap, submon = SubMon}) ->
|
|
|
- State#state{submap = maps:remove(SubPid, SubMap),
|
|
|
- submon = emqx_pmon:demonitor(SubPid, SubMon)}.
|
|
|
-
|
|
|
dest(undefined) -> node();
|
|
|
dest(Group) -> {Group, node()}.
|
|
|
|
|
|
shared(undefined, Name) -> Name;
|
|
|
shared(Group, Name) -> {share, Group, Name}.
|
|
|
|
|
|
-groups(Topic) ->
|
|
|
- lists:foldl(fun({_, {share, Group, _}}, Acc) ->
|
|
|
- [Group | Acc];
|
|
|
- ({_, _}, Acc) ->
|
|
|
- Acc
|
|
|
- end, [], ets:lookup(?SUBSCRIBER, Topic)).
|