emqttd_server.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqttd_server).
  17. -author("Feng Lee <feng@emqtt.io>").
  18. -behaviour(gen_server2).
  19. -include("emqttd.hrl").
  20. -include("emqttd_protocol.hrl").
  21. -include("emqttd_internal.hrl").
  22. -export([start_link/3]).
  23. %% PubSub API.
  24. -export([subscribe/1, subscribe/2, subscribe/3, publish/1,
  25. unsubscribe/1, unsubscribe/2]).
  26. %% Async PubSub API.
  27. -export([async_subscribe/1, async_subscribe/2, async_subscribe/3,
  28. async_unsubscribe/1, async_unsubscribe/2]).
  29. %% Management API.
  30. -export([setqos/3, subscriptions/1, subscribers/1, is_subscribed/2,
  31. subscriber_down/1]).
  32. %% Debug API
  33. -export([dump/0]).
  34. %% gen_server.
  35. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  36. terminate/2, code_change/3]).
  37. -record(state, {pool, id, env, submon :: emqttd_pmon:pmon()}).
  38. %% @doc Start server
  39. -spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, any()}).
  40. start_link(Pool, Id, Env) ->
  41. gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id, Env], []).
  42. %%--------------------------------------------------------------------
  43. %% PubSub API
  44. %%--------------------------------------------------------------------
  45. %% @doc Subscribe a Topic
  46. -spec(subscribe(binary()) -> ok | emqttd:pubsub_error()).
  47. subscribe(Topic) when is_binary(Topic) ->
  48. subscribe(Topic, self()).
  49. -spec(subscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
  50. subscribe(Topic, Subscriber) when is_binary(Topic) ->
  51. subscribe(Topic, Subscriber, []).
  52. -spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) ->
  53. ok | emqttd:pubsub_error()).
  54. subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
  55. call(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
  56. %% @doc Subscribe a Topic Asynchronously
  57. -spec(async_subscribe(binary()) -> ok).
  58. async_subscribe(Topic) when is_binary(Topic) ->
  59. async_subscribe(Topic, self()).
  60. -spec(async_subscribe(binary(), emqttd:subscriber()) -> ok).
  61. async_subscribe(Topic, Subscriber) when is_binary(Topic) ->
  62. async_subscribe(Topic, Subscriber, []).
  63. -spec(async_subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
  64. async_subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
  65. cast(pick(Subscriber), {subscribe, Topic, Subscriber, Options}).
  66. %% @doc Publish message to Topic.
  67. -spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
  68. publish(Msg = #mqtt_message{from = From}) ->
  69. trace(publish, From, Msg),
  70. case emqttd_hook:run('message.publish', [], Msg) of
  71. {ok, Msg1 = #mqtt_message{topic = Topic}} ->
  72. %% Retain message first. Don't create retained topic.
  73. Msg2 = case emqttd_retainer:retain(Msg1) of
  74. ok -> emqttd_message:unset_flag(Msg1);
  75. ignore -> Msg1
  76. end,
  77. emqttd_pubsub:publish(Topic, Msg2);
  78. {stop, Msg1} ->
  79. lager:warning("Stop publishing: ~s", [emqttd_message:format(Msg1)]),
  80. ignore
  81. end.
  82. trace(publish, From, _Msg) when is_atom(From) ->
  83. %% Dont' trace '$SYS' publish
  84. ignore;
  85. trace(publish, {ClientId, Username}, #mqtt_message{topic = Topic, payload = Payload}) ->
  86. lager:info([{client, ClientId}, {topic, Topic}],
  87. "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
  88. trace(publish, From, #mqtt_message{topic = Topic, payload = Payload}) when is_binary(From); is_list(From) ->
  89. lager:info([{client, From}, {topic, Topic}],
  90. "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
  91. %% @doc Unsubscribe
  92. -spec(unsubscribe(binary()) -> ok | emqttd:pubsub_error()).
  93. unsubscribe(Topic) when is_binary(Topic) ->
  94. unsubscribe(Topic, self()).
  95. %% @doc Unsubscribe
  96. -spec(unsubscribe(binary(), emqttd:subscriber()) -> ok | emqttd:pubsub_error()).
  97. unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
  98. call(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
  99. %% @doc Async Unsubscribe
  100. -spec(async_unsubscribe(binary()) -> ok).
  101. async_unsubscribe(Topic) when is_binary(Topic) ->
  102. async_unsubscribe(Topic, self()).
  103. -spec(async_unsubscribe(binary(), emqttd:subscriber()) -> ok).
  104. async_unsubscribe(Topic, Subscriber) when is_binary(Topic) ->
  105. cast(pick(Subscriber), {unsubscribe, Topic, Subscriber}).
  106. setqos(Topic, Subscriber, Qos) when is_binary(Topic) ->
  107. call(pick(Subscriber), {setqos, Topic, Subscriber, Qos}).
  108. -spec(subscriptions(emqttd:subscriber()) -> [{binary(), list(emqttd:suboption())}]).
  109. subscriptions(Subscriber) ->
  110. lists:map(fun({_, Topic}) ->
  111. subscription(Topic, Subscriber)
  112. end, ets:lookup(mqtt_subscription, Subscriber)).
  113. subscription(Topic, Subscriber) ->
  114. {Topic, Subscriber, ets:lookup_element(mqtt_subproperty, {Topic, Subscriber}, 2)}.
  115. subscribers(Topic) -> emqttd_pubsub:subscribers(Topic).
  116. -spec(is_subscribed(binary(), emqttd:subscriber()) -> boolean()).
  117. is_subscribed(Topic, Subscriber) when is_binary(Topic) ->
  118. ets:member(mqtt_subproperty, {Topic, Subscriber}).
  119. -spec(subscriber_down(emqttd:subscriber()) -> ok).
  120. subscriber_down(Subscriber) ->
  121. cast(pick(Subscriber), {subscriber_down, Subscriber}).
  122. call(Server, Req) ->
  123. gen_server2:call(Server, Req, infinity).
  124. cast(Server, Msg) when is_pid(Server) ->
  125. gen_server2:cast(Server, Msg).
  126. pick(Subscriber) ->
  127. gproc_pool:pick_worker(server, Subscriber).
  128. dump() ->
  129. [{Tab, ets:tab2list(Tab)} || Tab <- [mqtt_subproperty, mqtt_subscription, mqtt_subscriber]].
  130. %%--------------------------------------------------------------------
  131. %% gen_server Callbacks
  132. %%--------------------------------------------------------------------
  133. init([Pool, Id, Env]) ->
  134. ?GPROC_POOL(join, Pool, Id),
  135. {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
  136. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
  137. case do_subscribe_(Topic, Subscriber, Options, State) of
  138. {ok, NewState} -> {reply, ok, setstats(NewState)};
  139. {error, Error} -> {reply, {error, Error}, State}
  140. end;
  141. handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
  142. case do_unsubscribe_(Topic, Subscriber, State) of
  143. {ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
  144. {error, Error} -> {reply, {error, Error}, State}
  145. end;
  146. handle_call({setqos, Topic, Subscriber, Qos}, _From, State) ->
  147. Key = {Topic, Subscriber},
  148. case ets:lookup(mqtt_subproperty, Key) of
  149. [{_, Opts}] ->
  150. Opts1 = lists:ukeymerge(1, [{qos, Qos}], Opts),
  151. ets:insert(mqtt_subproperty, {Key, Opts1}),
  152. {reply, ok, State};
  153. [] ->
  154. {reply, {error, {subscription_not_found, Topic}}, State}
  155. end;
  156. handle_call(Req, _From, State) ->
  157. ?UNEXPECTED_REQ(Req, State).
  158. handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
  159. case do_subscribe_(Topic, Subscriber, Options, State) of
  160. {ok, NewState} -> {noreply, setstats(NewState)};
  161. {error, _Error} -> {noreply, State}
  162. end;
  163. handle_cast({unsubscribe, Topic, Subscriber}, State) ->
  164. case do_unsubscribe_(Topic, Subscriber, State) of
  165. {ok, NewState} -> {noreply, setstats(NewState), hibernate};
  166. {error, _Error} -> {noreply, State}
  167. end;
  168. handle_cast({subscriber_down, Subscriber}, State) ->
  169. subscriber_down_(Subscriber),
  170. {noreply, setstats(State)};
  171. handle_cast(Msg, State) ->
  172. ?UNEXPECTED_MSG(Msg, State).
  173. handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #state{submon = PM}) ->
  174. subscriber_down_(DownPid),
  175. {noreply, setstats(State#state{submon = PM:erase(DownPid)}), hibernate};
  176. handle_info(Info, State) ->
  177. ?UNEXPECTED_INFO(Info, State).
  178. terminate(_Reason, #state{pool = Pool, id = Id}) ->
  179. ?GPROC_POOL(leave, Pool, Id).
  180. code_change(_OldVsn, State, _Extra) ->
  181. {ok, State}.
  182. %%--------------------------------------------------------------------
  183. %% Internal Functions
  184. %%--------------------------------------------------------------------
  185. do_subscribe_(Topic, Subscriber, Options, State) ->
  186. case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
  187. [] ->
  188. emqttd_pubsub:async_subscribe(Topic, Subscriber),
  189. ets:insert(mqtt_subscription, {Subscriber, Topic}),
  190. ets:insert(mqtt_subproperty, {{Topic, Subscriber}, Options}),
  191. {ok, monitor_subpid(Subscriber, State)};
  192. [_] ->
  193. {error, {already_subscribed, Topic}}
  194. end.
  195. monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
  196. State#state{submon = PMon:monitor(SubPid)};
  197. monitor_subpid(_SubPid, State) ->
  198. State.
  199. do_unsubscribe_(Topic, Subscriber, State) ->
  200. case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
  201. [_] ->
  202. emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
  203. ets:delete_object(mqtt_subscription, {Subscriber, Topic}),
  204. ets:delete(mqtt_subproperty, {Topic, Subscriber}),
  205. {ok, case ets:member(mqtt_subscription, Subscriber) of
  206. true -> State;
  207. false -> demonitor_subpid(Subscriber, State)
  208. end};
  209. [] ->
  210. {error, {subscription_not_found, Topic}}
  211. end.
  212. demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
  213. State#state{submon = PMon:demonitor(SubPid)};
  214. demonitor_subpid(_SubPid, State) ->
  215. State.
  216. subscriber_down_(Subscriber) ->
  217. lists:foreach(fun({_, Topic}) ->
  218. subscriber_down_(Subscriber, Topic)
  219. end, ets:lookup(mqtt_subscription, Subscriber)),
  220. ets:delete(mqtt_subscription, Subscriber).
  221. subscriber_down_(Subscriber, Topic) ->
  222. case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
  223. [] ->
  224. %% here?
  225. emqttd_pubsub:async_unsubscribe(Topic, Subscriber);
  226. [_] ->
  227. emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
  228. ets:delete(mqtt_subproperty, {Topic, Subscriber})
  229. end.
  230. setstats(State) ->
  231. emqttd_stats:setstats('subscriptions/count', 'subscriptions/max',
  232. ets:info(mqtt_subscription, size)), State.