emqttd_router.erl 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. %% @doc MQTT Message Router
  17. -module(emqttd_router).
  18. -behaviour(gen_server2).
  19. -include("emqttd.hrl").
  20. -include("emqttd_protocol.hrl").
  21. -include("emqttd_internal.hrl").
  22. -export([start_link/4]).
  23. %% Route API
  24. -export([route/2]).
  25. %% Route Admin API
  26. -export([add_route/2, lookup_routes/1, has_route/1, delete_route/2]).
  27. %% Batch API
  28. -export([add_routes/2, delete_routes/2]).
  29. %% For Test
  30. -export([stop/1]).
  31. %% gen_server Callbacks
  32. -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
  33. terminate/2, code_change/3]).
  34. -record(aging, {topics, time, tref}).
  35. -record(state, {pool, id, statsfun, aging :: #aging{}}).
  36. %% @doc Start a local router.
  37. -spec start_link(atom(), pos_integer(), fun((atom()) -> ok), list()) -> {ok, pid()} | {error, any()}.
  38. start_link(Pool, Id, StatsFun, Env) ->
  39. gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)},
  40. ?MODULE, [Pool, Id, StatsFun, Env], []).
  41. %% @doc Route Message on the local node.
  42. -spec route(emqttd_topic:topic(), mqtt_message()) -> any().
  43. route(Queue = <<"$Q/", _Q>>, Msg) ->
  44. case lookup_routes(Queue) of
  45. [] ->
  46. emqttd_metrics:inc('messages/dropped');
  47. [SubPid] ->
  48. SubPid ! {dispatch, Queue, Msg};
  49. Routes ->
  50. Idx = crypto:rand_uniform(1, length(Routes) + 1),
  51. SubPid = lists:nth(Idx, Routes),
  52. SubPid ! {dispatch, Queue, Msg}
  53. end;
  54. route(Topic, Msg) ->
  55. case lookup_routes(Topic) of
  56. [] ->
  57. emqttd_metrics:inc('messages/dropped');
  58. [SubPid] -> %% optimize
  59. SubPid ! {dispatch, Topic, Msg};
  60. Routes ->
  61. lists:foreach(fun(SubPid) ->
  62. SubPid ! {dispatch, Topic, Msg}
  63. end, Routes)
  64. end.
  65. %% @doc Has Route?
  66. -spec has_route(emqttd_topic:topic()) -> boolean().
  67. has_route(Topic) ->
  68. ets:member(route, Topic).
  69. %% @doc Lookup Routes
  70. -spec lookup_routes(emqttd_topic:topic()) -> list(pid()).
  71. lookup_routes(Topic) when is_binary(Topic) ->
  72. case ets:member(route, Topic) of
  73. true ->
  74. try ets:lookup_element(route, Topic, 2) catch error:badarg -> [] end;
  75. false ->
  76. []
  77. end.
  78. %% @doc Add Route.
  79. -spec add_route(emqttd_topic:topic(), pid()) -> ok.
  80. add_route(Topic, Pid) when is_pid(Pid) ->
  81. call(pick(Topic), {add_route, Topic, Pid}).
  82. %% @doc Add Routes.
  83. -spec add_routes(list(emqttd_topic:topic()), pid()) -> ok.
  84. add_routes([], _Pid) ->
  85. ok;
  86. add_routes([Topic], Pid) ->
  87. add_route(Topic, Pid);
  88. add_routes(Topics, Pid) ->
  89. lists:foreach(fun({Router, Slice}) ->
  90. call(Router, {add_routes, Slice, Pid})
  91. end, slice(Topics)).
  92. %% @doc Delete Route.
  93. -spec delete_route(emqttd_topic:topic(), pid()) -> ok.
  94. delete_route(Topic, Pid) ->
  95. cast(pick(Topic), {delete_route, Topic, Pid}).
  96. %% @doc Delete Routes.
  97. -spec delete_routes(list(emqttd_topic:topic()), pid()) -> ok.
  98. delete_routes([Topic], Pid) ->
  99. delete_route(Topic, Pid);
  100. delete_routes(Topics, Pid) ->
  101. lists:foreach(fun({Router, Slice}) ->
  102. cast(Router, {delete_routes, Slice, Pid})
  103. end, slice(Topics)).
  104. %% @private Slice topics.
  105. slice(Topics) ->
  106. dict:to_list(lists:foldl(fun(Topic, Dict) ->
  107. dict:append(pick(Topic), Topic, Dict)
  108. end, dict:new(), Topics)).
  109. %% @private Pick a router.
  110. pick(Topic) ->
  111. gproc_pool:pick_worker(router, Topic).
  112. stop(Id) when is_integer(Id) ->
  113. gen_server2:call(?PROC_NAME(?MODULE, Id), stop).
  114. call(Router, Request) ->
  115. gen_server2:call(Router, Request, infinity).
  116. cast(Router, Msg) ->
  117. gen_server2:cast(Router, Msg).
  118. init([Pool, Id, StatsFun, Opts]) ->
  119. emqttd_time:seed(),
  120. %% Calls from pubsub should be scheduled first?
  121. process_flag(priority, high),
  122. ?GPROC_POOL(join, Pool, Id),
  123. AgingSecs = proplists:get_value(route_aging, Opts, 5),
  124. %% Aging Timer
  125. {ok, AgingTref} = start_tick(AgingSecs + random:uniform(AgingSecs)),
  126. Aging = #aging{topics = dict:new(), time = AgingSecs, tref = AgingTref},
  127. {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, aging = Aging}}.
  128. start_tick(Secs) ->
  129. timer:send_interval(timer:seconds(Secs), {clean, aged}).
  130. handle_call(stop, _From, State) ->
  131. {stop, normal, ok, State};
  132. handle_call({add_route, Topic, Pid}, _From, State) ->
  133. ets:insert(route, {Topic, Pid}),
  134. {reply, ok, setstats(State)};
  135. handle_call({add_routes, Topics, Pid}, _From, State) ->
  136. ets:insert(route, [{Topic, Pid} || Topic <- Topics]),
  137. {reply, ok, setstats(State)};
  138. handle_call(Req, _From, State) ->
  139. ?UNEXPECTED_REQ(Req, State).
  140. handle_cast({delete_route, Topic, Pid}, State = #state{aging = Aging}) ->
  141. ets:delete_object(route, {Topic, Pid}),
  142. NewState =
  143. case has_route(Topic) of
  144. false -> State#state{aging = store_aged(Topic, Aging)};
  145. true -> State
  146. end,
  147. {noreply, setstats(NewState)};
  148. handle_cast({delete_routes, Topics, Pid}, State) ->
  149. NewAging =
  150. lists:foldl(fun(Topic, Aging) ->
  151. ets:delete_object(route, {Topic, Pid}),
  152. case has_route(Topic) of
  153. false -> store_aged(Topic, Aging);
  154. true -> Aging
  155. end
  156. end, State#state.aging, Topics),
  157. {noreply, setstats(State#state{aging = NewAging})};
  158. handle_cast(Msg, State) ->
  159. ?UNEXPECTED_MSG(Msg, State).
  160. handle_info({clean, aged}, State = #state{aging = Aging}) ->
  161. #aging{topics = Dict, time = Time} = Aging,
  162. ByTime = emqttd_time:now_to_secs() - Time,
  163. Dict1 = try_clean(ByTime, dict:to_list(Dict)),
  164. NewAging = Aging#aging{topics = dict:from_list(Dict1)},
  165. {noreply, State#state{aging = NewAging}, hibernate};
  166. handle_info(Info, State) ->
  167. ?UNEXPECTED_INFO(Info, State).
  168. terminate(_Reason, #state{pool = Pool, id = Id, aging = #aging{tref = TRef}}) ->
  169. timer:cancel(TRef),
  170. ?GPROC_POOL(leave, Pool, Id).
  171. code_change(_OldVsn, State, _Extra) ->
  172. {ok, State}.
  173. try_clean(ByTime, List) ->
  174. try_clean(ByTime, List, []).
  175. try_clean(_ByTime, [], Acc) ->
  176. Acc;
  177. try_clean(ByTime, [{Topic, TS} | Left], Acc) ->
  178. case has_route(Topic) of
  179. false ->
  180. try_clean2(ByTime, {Topic, TS}, Left, Acc);
  181. true ->
  182. try_clean(ByTime, Left, Acc)
  183. end.
  184. try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime ->
  185. try_clean(ByTime, Left, [{Topic, TS} | Acc]);
  186. try_clean2(ByTime, {Topic, _TS}, Left, Acc) ->
  187. TopicR = #mqtt_topic{topic = Topic, node = node()},
  188. case mnesia:transaction(fun try_remove_topic/1, [TopicR]) of
  189. {atomic, _} -> ok;
  190. {aborted, Error} -> lager:error("Clean Topic '~s' Error: ~p", [Topic, Error])
  191. end,
  192. try_clean(ByTime, Left, Acc).
  193. try_remove_topic(TopicR = #mqtt_topic{topic = Topic}) ->
  194. %% Lock topic first
  195. case mnesia:wread({topic, Topic}) of
  196. [] ->
  197. ok; %% mnesia:abort(not_found);
  198. [TopicR] ->
  199. %% Remove topic and trie
  200. delete_topic(TopicR),
  201. emqttd_trie:delete(Topic);
  202. _More ->
  203. %% Remove topic only
  204. delete_topic(TopicR)
  205. end.
  206. delete_topic(TopicR) ->
  207. mnesia:delete_object(topic, TopicR, write).
  208. store_aged(Topic, Aging = #aging{topics = Dict}) ->
  209. Now = emqttd_time:now_to_secs(),
  210. Aging#aging{topics = dict:store(Topic, Now, Dict)}.
  211. setstats(State = #state{statsfun = StatsFun}) ->
  212. StatsFun(route), State.