|
|
@@ -37,8 +37,7 @@
|
|
|
async_unsubscribe/1, async_unsubscribe/2]).
|
|
|
|
|
|
%% Management API.
|
|
|
--export([setqos/3, subscriptions/1, subscribers/1, is_subscribed/2,
|
|
|
- subscriber_down/1]).
|
|
|
+-export([setqos/3, subscriptions/1, subscribers/1, subscribed/2]).
|
|
|
|
|
|
%% Debug API
|
|
|
-export([dump/0]).
|
|
|
@@ -47,10 +46,10 @@
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
terminate/2, code_change/3]).
|
|
|
|
|
|
--record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}).
|
|
|
+-record(state, {pool, id, env, subids :: map(), submon :: emqttd_pmon:pmon()}).
|
|
|
|
|
|
-%% @doc Start server
|
|
|
--spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, any()}).
|
|
|
+%% @doc Start the server
|
|
|
+-spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}).
|
|
|
start_link(Pool, Id, Env) ->
|
|
|
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
|
|
|
|
|
|
@@ -58,21 +57,21 @@ start_link(Pool, Id, Env) ->
|
|
|
%% PubSub API
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-%% @doc Subscribe a Topic
|
|
|
--spec(subscribe(binary()) -> ok | emqttd:pubsub_error()).
|
|
|
+%% @doc Subscribe to a Topic.
|
|
|
+-spec(subscribe(binary()) -> ok | {error, term()}).
|
|
|
subscribe(Topic) when is_binary(Topic) ->
|
|
|
subscribe(Topic, self()).
|
|
|
|
|
|
--spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
|
|
+-spec(subscribe(binary(), emqttd:subscriber()) -> ok | {error, term()}).
|
|
|
subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
|
|
subscribe(Topic, Subscriber, []).
|
|
|
|
|
|
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) ->
|
|
|
- ok | emqttd:pubsub_error()).
|
|
|
+ ok | {error, term()}).
|
|
|
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
|
|
- call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
|
|
+ call(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}).
|
|
|
|
|
|
-%% @doc Subscribe a Topic Asynchronously
|
|
|
+%% @doc Subscribe to a Topic asynchronously.
|
|
|
-spec(async_subscribe(binary()) -> ok).
|
|
|
async_subscribe(Topic) when is_binary(Topic) ->
|
|
|
async_subscribe(Topic, self()).
|
|
|
@@ -83,7 +82,7 @@ async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
|
|
|
|
|
-spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
|
|
|
async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
|
|
- cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
|
|
|
+ cast(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}).
|
|
|
|
|
|
%% @doc Publish message to Topic.
|
|
|
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
|
|
|
@@ -109,14 +108,14 @@ trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) ->
|
|
|
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
|
|
|
|
|
%% @doc Unsubscribe
|
|
|
--spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
|
|
|
+-spec(unsubscribe(binary()) -> ok | {error, term()}).
|
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
unsubscribe(Topic, self()).
|
|
|
|
|
|
%% @doc Unsubscribe
|
|
|
--spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
|
|
|
+-spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | {error, term()}).
|
|
|
unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
|
|
- call(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
|
|
+ call(pick(Subscriber), {unsubscribe, Topic, with_subpid(Subscriber)}).
|
|
|
|
|
|
%% @doc Async Unsubscribe
|
|
|
-spec(async_unsubscribe(binary()) -> ok).
|
|
|
@@ -125,32 +124,47 @@ async_unsubscribe(Topic) when is_binary(Topic) ->
|
|
|
|
|
|
-spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
|
|
|
async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
|
|
|
- cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
|
|
|
+ cast(pick(Subscriber), {unsubscribe, Topic, with_subpid(Subscriber)}).
|
|
|
|
|
|
+-spec(setqos(binary(), emqttd:subscriber(), mqtt_qos()) -> ok).
|
|
|
setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
|
|
|
- call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
|
|
|
-
|
|
|
--spec(subscriptions(emqttd:subscriber()) -> [{binary(), binary(), list(emqttd:suboption())}]).
|
|
|
-subscriptions(Subscriber) ->
|
|
|
- lists:map(fun({_, {_Share, Topic}}) ->
|
|
|
- subscription(Topic, Subscriber);
|
|
|
- ({_, Topic}) ->
|
|
|
- subscription(Topic, Subscriber)
|
|
|
- end, ets:lookup(mqtt_subscription, Subscriber)).
|
|
|
-
|
|
|
-subscription(Topic, Subscriber) ->
|
|
|
- {Topic, Subscriber, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}.
|
|
|
-
|
|
|
-subscribers(Topic) ->
|
|
|
+ call(pick(Subscriber), {setqos, Topic, with_subpid(Subscriber), Qos}).
|
|
|
+
|
|
|
+with_subpid(SubPid) when is_pid(SubPid) ->
|
|
|
+ SubPid;
|
|
|
+with_subpid(SubId) when is_binary(SubId) ->
|
|
|
+ {SubId, self()};
|
|
|
+with_subpid({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
|
|
|
+ {SubId, SubPid}.
|
|
|
+
|
|
|
+-spec(subscriptions(emqttd:subscriber()) -> [{emqttd:subscriber(), binary(), list(emqttd:suboption())}]).
|
|
|
+subscriptions(SubPid) when is_pid(SubPid) ->
|
|
|
+ with_subproperty(ets:lookup(mqtt_subscription, SubPid));
|
|
|
+
|
|
|
+subscriptions(SubId) when is_binary(SubId) ->
|
|
|
+ with_subproperty(ets:match_object(mqtt_subscription, {{SubId, '_'}, '_'}));
|
|
|
+
|
|
|
+subscriptions({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
|
|
|
+ with_subproperty(ets:lookup(mqtt_subscription, {SubId, SubPid})).
|
|
|
+
|
|
|
+with_subproperty({Subscriber, {share, _Share, Topic}}) ->
|
|
|
+ with_subproperty({Subscriber, Topic});
|
|
|
+with_subproperty({Subscriber, Topic}) ->
|
|
|
+ {Subscriber, Topic, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)};
|
|
|
+with_subproperty(Subscriptions) when is_list(Subscriptions) ->
|
|
|
+ [with_subproperty(Subscription) || Subscription <- Subscriptions].
|
|
|
+
|
|
|
+-spec(subscribers(binary()) -> list(emqttd:subscriber())).
|
|
|
+subscribers(Topic) when is_binary(Topic) ->
|
|
|
emqttd_pubsub:subscribers(Topic).
|
|
|
|
|
|
--spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()).
|
|
|
-is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
|
|
|
- ets:member(mqtt_subproperty, {Topic, Subscriber}).
|
|
|
-
|
|
|
--spec(subscriber_down(emqttd:subscriber()) -> ok).
|
|
|
-subscriber_down(Subscriber) ->
|
|
|
- cast(pick(Subscriber), {subscriber_down, Subscriber}).
|
|
|
+-spec(subscribed(binary(), emqttd:subscriber()) -> boolean()).
|
|
|
+subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
|
|
+ ets:member(mqtt_subproperty, {Topic, SubPid});
|
|
|
+subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
|
|
|
+ length(ets:match_object(mqtt_subproperty, {{Topic, {SubId, '_'}}, '_'}, 1)) == 1;
|
|
|
+subscribed(Topic, {SubId, SubPid}) when is_binary(Topic), is_binary(SubId), is_pid(SubPid) ->
|
|
|
+ ets:member(mqtt_subproperty, {Topic, {SubId, SubPid}}).
|
|
|
|
|
|
call(Server, Req) ->
|
|
|
gen_server2:call(Server, Req, infinity).
|
|
|
@@ -158,8 +172,12 @@ call(Server, Req) ->
|
|
|
cast(Server, Msg) when is_pid(Server) ->
|
|
|
gen_server2:cast(Server, Msg).
|
|
|
|
|
|
-pick(Subscriber) ->
|
|
|
- gproc_pool:pick_worker(server, Subscriber).
|
|
|
+pick(SubPid) when is_pid(SubPid) ->
|
|
|
+ gproc_pool:pick_worker(server, SubPid);
|
|
|
+pick(SubId) when is_binary(SubId) ->
|
|
|
+ gproc_pool:pick_worker(server, SubId);
|
|
|
+pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) ->
|
|
|
+ pick(SubId).
|
|
|
|
|
|
dump() ->
|
|
|
[{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_subproperty, mqtt_subscription, mqtt_subscriber]].
|
|
|
@@ -170,18 +188,20 @@ dump() ->
|
|
|
|
|
|
init([Pool, Id, Env]) ->
|
|
|
?GPROC_POOL(join, Pool, Id),
|
|
|
- {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
|
|
|
+ State = #state{pool = Pool, id = Id, env = Env,
|
|
|
+ subids = #{}, submon = emqttd_pmon:new()},
|
|
|
+ {ok, State, hibernate, {backoff, 2000, 2000, 20000}}.
|
|
|
|
|
|
handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
|
|
|
- case do_subscribe_(Topic, Subscriber, Options, State) of
|
|
|
- {ok, NewState} -> {reply, ok, setstats(NewState)};
|
|
|
- {error, Error} -> {reply, {error, Error}, State}
|
|
|
+ case do_subscribe(Topic, Subscriber, Options, State) of
|
|
|
+ {ok, NewState} -> reply(ok, setstats(NewState));
|
|
|
+ {error, Error} -> reply({error, Error}, State)
|
|
|
end;
|
|
|
|
|
|
handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
|
|
|
- case do_unsubscribe_(Topic, Subscriber, State) of
|
|
|
- {ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
|
|
|
- {error, Error} -> {reply, {error, Error}, State}
|
|
|
+ case do_unsubscribe(Topic, Subscriber, State) of
|
|
|
+ {ok, NewState} -> reply(ok, setstats(NewState));
|
|
|
+ {error, Error} -> reply({error, Error}, State)
|
|
|
end;
|
|
|
|
|
|
handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
|
|
|
@@ -190,36 +210,37 @@ handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
|
|
|
[{_, Opts}] ->
|
|
|
Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
|
|
|
ets:insert(mqtt_subproperty, {Key, Opts1}),
|
|
|
- {reply, ok, State};
|
|
|
+ reply(ok, State);
|
|
|
[] ->
|
|
|
- {reply, {error, {subscription_not_found, Topic}}, State}
|
|
|
+ reply({error, {subscription_not_found, Topic}}, State)
|
|
|
end;
|
|
|
|
|
|
handle_call(Req, _From, State) ->
|
|
|
?UNEXPECTED_REQ(Req, State).
|
|
|
|
|
|
handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
|
|
|
- case do_subscribe_(Topic, Subscriber, Options, State) of
|
|
|
- {ok, NewState} -> {noreply, setstats(NewState)};
|
|
|
- {error, _Error} -> {noreply, State}
|
|
|
+ case do_subscribe(Topic, Subscriber, Options, State) of
|
|
|
+ {ok, NewState} -> noreply(setstats(NewState));
|
|
|
+ {error, _Error} -> noreply(State)
|
|
|
end;
|
|
|
|
|
|
handle_cast({unsubscribe, Topic, Subscriber}, State) ->
|
|
|
- case do_unsubscribe_(Topic, Subscriber, State) of
|
|
|
- {ok, NewState} -> {noreply, setstats(NewState), hibernate};
|
|
|
- {error, _Error} -> {noreply, State}
|
|
|
+ case do_unsubscribe(Topic, Subscriber, State) of
|
|
|
+ {ok, NewState} -> noreply(setstats(NewState));
|
|
|
+ {error, _Error} -> noreply(State)
|
|
|
end;
|
|
|
|
|
|
-handle_cast({subscriber_down, Subscriber}, State) ->
|
|
|
- subscriber_down_(Subscriber),
|
|
|
- {noreply, setstats(State)};
|
|
|
-
|
|
|
handle_cast(Msg, State) ->
|
|
|
?UNEXPECTED_MSG(Msg, State).
|
|
|
|
|
|
-handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
|
|
|
- subscriber_down_(DownPid),
|
|
|
- {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
|
|
|
+handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{subids = SubIds}) ->
|
|
|
+ case maps:find(DownPid, SubIds) of
|
|
|
+ {ok, SubId} ->
|
|
|
+ clean_subscriber({SubId, DownPid});
|
|
|
+ error ->
|
|
|
+ clean_subscriber(DownPid)
|
|
|
+ end,
|
|
|
+ noreply(setstats(demonitor_subscriber(DownPid, State)));
|
|
|
|
|
|
handle_info(Info, State) ->
|
|
|
?UNEXPECTED_INFO(Info, State).
|
|
|
@@ -234,62 +255,54 @@ code_change(_OldVsn, State, _Extra) ->
|
|
|
%% Internal Functions
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-do_subscribe_(Topic, Subscriber, Options, State) ->
|
|
|
+do_subscribe(Topic, Subscriber, Options, State) ->
|
|
|
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
|
|
|
[] ->
|
|
|
emqttd_pubsub:async_subscribe(Topic, Subscriber, Options),
|
|
|
Share = proplists:get_value(share, Options),
|
|
|
- add_subscription_(Share, Subscriber, Topic),
|
|
|
+ add_subscription(Share, Subscriber, Topic),
|
|
|
ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}),
|
|
|
- {ok, monitor_subpid(Subscriber, State)};
|
|
|
+ {ok, monitor_subscriber(Subscriber, State)};
|
|
|
[_] ->
|
|
|
{error, {already_subscribed, Topic}}
|
|
|
end.
|
|
|
|
|
|
-add_subscription_(undefined, Subscriber, Topic) ->
|
|
|
+add_subscription(undefined, Subscriber, Topic) ->
|
|
|
ets:insert(mqtt_subscription, {Subscriber, Topic});
|
|
|
-add_subscription_(Share, Subscriber, Topic) ->
|
|
|
- ets:insert(mqtt_subscription, {Subscriber, {Share, Topic}}).
|
|
|
+add_subscription(Share, Subscriber, Topic) ->
|
|
|
+ ets:insert(mqtt_subscription, {Subscriber, {share, Share, Topic}}).
|
|
|
|
|
|
-monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
|
|
- State#state{submon = PMon:monitor(SubPid)};
|
|
|
-monitor_subpid(_SubPid, State) ->
|
|
|
- State.
|
|
|
+monitor_subscriber(SubPid, State = #state{submon = SubMon}) when is_pid(SubPid) ->
|
|
|
+ State#state{submon = SubMon:monitor(SubPid)};
|
|
|
+monitor_subscriber({SubId, SubPid}, State = #state{subids = SubIds, submon = SubMon}) ->
|
|
|
+ State#state{subids = maps:put(SubPid, SubId, SubIds), submon = SubMon:monitor(SubPid)}.
|
|
|
|
|
|
-do_unsubscribe_(Topic, Subscriber, State) ->
|
|
|
+do_unsubscribe(Topic, Subscriber, State) ->
|
|
|
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
|
|
|
[{_, Options}] ->
|
|
|
emqttd_pubsub:async_unsubscribe(Topic, Subscriber, Options),
|
|
|
Share = proplists:get_value(share, Options),
|
|
|
- del_subscription_(Share, Subscriber, Topic),
|
|
|
+ del_subscription(Share, Subscriber, Topic),
|
|
|
ets:delete(mqtt_subproperty, {Topic, Subscriber}),
|
|
|
- {ok, case ets:member(mqtt_subscription, Subscriber) of
|
|
|
- true -> State;
|
|
|
- false -> demonitor_subpid(Subscriber, State)
|
|
|
- end};
|
|
|
+ {ok, State};
|
|
|
[] ->
|
|
|
{error, {subscription_not_found, Topic}}
|
|
|
end.
|
|
|
|
|
|
-del_subscription_(undefined, Subscriber, Topic) ->
|
|
|
+del_subscription(undefined, Subscriber, Topic) ->
|
|
|
ets:delete_object(mqtt_subscription, {Subscriber, Topic});
|
|
|
-del_subscription_(Share, Subscriber, Topic) ->
|
|
|
- ets:delete_object(mqtt_subscription, {Subscriber, {Share, Topic}}).
|
|
|
+del_subscription(Share, Subscriber, Topic) ->
|
|
|
+ ets:delete_object(mqtt_subscription, {Subscriber, {share, Share, Topic}}).
|
|
|
|
|
|
-demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
|
|
|
- State#state{submon = PMon:demonitor(SubPid)};
|
|
|
-demonitor_subpid(_SubPid, State) ->
|
|
|
- State.
|
|
|
-
|
|
|
-subscriber_down_(Subscriber) ->
|
|
|
- lists:foreach(fun({_, {Share, Topic}}) ->
|
|
|
- subscriber_down_(Share, Subscriber, Topic);
|
|
|
+clean_subscriber(Subscriber) ->
|
|
|
+ lists:foreach(fun({_, {share, Share, Topic}}) ->
|
|
|
+ clean_subscriber(Share, Subscriber, Topic);
|
|
|
({_, Topic}) ->
|
|
|
- subscriber_down_(undefined, Subscriber, Topic)
|
|
|
+ clean_subscriber(undefined, Subscriber, Topic)
|
|
|
end, ets:lookup(mqtt_subscription, Subscriber)),
|
|
|
ets:delete(mqtt_subscription, Subscriber).
|
|
|
|
|
|
-subscriber_down_(Share, Subscriber, Topic) ->
|
|
|
+clean_subscriber(Share, Subscriber, Topic) ->
|
|
|
case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
|
|
|
[] ->
|
|
|
%% TODO:....???
|
|
|
@@ -300,7 +313,16 @@ subscriber_down_(Share, Subscriber, Topic) ->
|
|
|
ets:delete(mqtt_subproperty, {Topic, Subscriber})
|
|
|
end.
|
|
|
|
|
|
+demonitor_subscriber(SubPid, State = #state{subids = SubIds, submon = SubMon}) ->
|
|
|
+ State#state{subids = maps:remove(SubPid, SubIds), submon = SubMon:demonitor(SubPid)}.
|
|
|
+
|
|
|
setstats(State) ->
|
|
|
emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
|
|
|
ets:info(mqtt_subscription, size)), State.
|
|
|
|
|
|
+reply(Reply, State) ->
|
|
|
+ {reply, Reply, State, hibernate}.
|
|
|
+
|
|
|
+noreply(State) ->
|
|
|
+ {noreply, State, hibernate}.
|
|
|
+
|