emqx_router.erl 6.9 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_router).
  17. -behaviour(gen_server).
  18. -include("emqx.hrl").
  19. -include("logger.hrl").
  20. -include("types.hrl").
  21. -include_lib("mria/include/mria.hrl").
  22. -include_lib("emqx/include/emqx_router.hrl").
  23. %% Mnesia bootstrap
  24. -export([mnesia/1]).
  25. -boot_mnesia({mnesia, [boot]}).
  26. -export([start_link/2]).
  27. %% Route APIs
  28. -export([
  29. add_route/1,
  30. add_route/2,
  31. do_add_route/1,
  32. do_add_route/2
  33. ]).
  34. -export([
  35. delete_route/1,
  36. delete_route/2,
  37. do_delete_route/1,
  38. do_delete_route/2
  39. ]).
  40. -export([
  41. match_routes/1,
  42. lookup_routes/1,
  43. has_routes/1
  44. ]).
  45. -export([print_routes/1]).
  46. -export([topics/0]).
  47. %% gen_server callbacks
  48. -export([
  49. init/1,
  50. handle_call/3,
  51. handle_cast/2,
  52. handle_info/2,
  53. terminate/2,
  54. code_change/3
  55. ]).
  56. -type group() :: binary().
  57. -type dest() :: node() | {group(), node()}.
  58. %%--------------------------------------------------------------------
  59. %% Mnesia bootstrap
  60. %%--------------------------------------------------------------------
  61. mnesia(boot) ->
  62. mria_config:set_dirty_shard(?ROUTE_SHARD, true),
  63. ok = mria:create_table(?ROUTE_TAB, [
  64. {type, bag},
  65. {rlog_shard, ?ROUTE_SHARD},
  66. {storage, ram_copies},
  67. {record_name, route},
  68. {attributes, record_info(fields, route)},
  69. {storage_properties, [
  70. {ets, [
  71. {read_concurrency, true},
  72. {write_concurrency, true}
  73. ]}
  74. ]}
  75. ]).
  76. %%--------------------------------------------------------------------
  77. %% Start a router
  78. %%--------------------------------------------------------------------
  79. -spec start_link(atom(), pos_integer()) -> startlink_ret().
  80. start_link(Pool, Id) ->
  81. gen_server:start_link(
  82. {local, emqx_utils:proc_name(?MODULE, Id)},
  83. ?MODULE,
  84. [Pool, Id],
  85. [{hibernate_after, 1000}]
  86. ).
  87. %%--------------------------------------------------------------------
  88. %% Route APIs
  89. %%--------------------------------------------------------------------
  90. -spec add_route(emqx_types:topic()) -> ok | {error, term()}.
  91. add_route(Topic) when is_binary(Topic) ->
  92. add_route(Topic, node()).
  93. -spec add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  94. add_route(Topic, Dest) when is_binary(Topic) ->
  95. call(pick(Topic), {add_route, Topic, Dest}).
  96. -spec do_add_route(emqx_types:topic()) -> ok | {error, term()}.
  97. do_add_route(Topic) when is_binary(Topic) ->
  98. do_add_route(Topic, node()).
  99. -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  100. do_add_route(Topic, Dest) when is_binary(Topic) ->
  101. Route = #route{topic = Topic, dest = Dest},
  102. case lists:member(Route, lookup_routes(Topic)) of
  103. true ->
  104. ok;
  105. false ->
  106. ok = emqx_router_helper:monitor(Dest),
  107. case emqx_topic:wildcard(Topic) of
  108. true ->
  109. Fun = fun emqx_router_utils:insert_trie_route/2,
  110. emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
  111. false ->
  112. emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
  113. end
  114. end.
  115. %% @doc Match routes
  116. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
  117. match_routes(Topic) when is_binary(Topic) ->
  118. case match_trie(Topic) of
  119. [] -> lookup_routes(Topic);
  120. Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]])
  121. end.
  122. %% Optimize: routing table will be replicated to all router nodes.
  123. match_trie(Topic) ->
  124. case emqx_trie:empty() of
  125. true -> [];
  126. false -> emqx_trie:match(Topic)
  127. end.
  128. -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
  129. lookup_routes(Topic) ->
  130. ets:lookup(?ROUTE_TAB, Topic).
  131. -spec has_routes(emqx_types:topic()) -> boolean().
  132. has_routes(Topic) when is_binary(Topic) ->
  133. ets:member(?ROUTE_TAB, Topic).
  134. -spec delete_route(emqx_types:topic()) -> ok | {error, term()}.
  135. delete_route(Topic) when is_binary(Topic) ->
  136. delete_route(Topic, node()).
  137. -spec delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  138. delete_route(Topic, Dest) when is_binary(Topic) ->
  139. call(pick(Topic), {delete_route, Topic, Dest}).
  140. -spec do_delete_route(emqx_types:topic()) -> ok | {error, term()}.
  141. do_delete_route(Topic) when is_binary(Topic) ->
  142. do_delete_route(Topic, node()).
  143. -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  144. do_delete_route(Topic, Dest) ->
  145. Route = #route{topic = Topic, dest = Dest},
  146. case emqx_topic:wildcard(Topic) of
  147. true ->
  148. Fun = fun emqx_router_utils:delete_trie_route/2,
  149. emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
  150. false ->
  151. emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
  152. end.
  153. -spec topics() -> list(emqx_types:topic()).
  154. topics() ->
  155. mnesia:dirty_all_keys(?ROUTE_TAB).
  156. %% @doc Print routes to a topic
  157. -spec print_routes(emqx_types:topic()) -> ok.
  158. print_routes(Topic) ->
  159. lists:foreach(
  160. fun(#route{topic = To, dest = Dest}) ->
  161. io:format("~ts -> ~ts~n", [To, Dest])
  162. end,
  163. match_routes(Topic)
  164. ).
  165. call(Router, Msg) ->
  166. gen_server:call(Router, Msg, infinity).
  167. pick(Topic) ->
  168. gproc_pool:pick_worker(router_pool, Topic).
  169. %%--------------------------------------------------------------------
  170. %% gen_server callbacks
  171. %%--------------------------------------------------------------------
  172. init([Pool, Id]) ->
  173. true = gproc_pool:connect_worker(Pool, {Pool, Id}),
  174. {ok, #{pool => Pool, id => Id}}.
  175. handle_call({add_route, Topic, Dest}, _From, State) ->
  176. Ok = do_add_route(Topic, Dest),
  177. {reply, Ok, State};
  178. handle_call({delete_route, Topic, Dest}, _From, State) ->
  179. Ok = do_delete_route(Topic, Dest),
  180. {reply, Ok, State};
  181. handle_call(Req, _From, State) ->
  182. ?SLOG(error, #{msg => "unexpected_call", call => Req}),
  183. {reply, ignored, State}.
  184. handle_cast(Msg, State) ->
  185. ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
  186. {noreply, State}.
  187. handle_info(Info, State) ->
  188. ?SLOG(error, #{msg => "unexpected_info", info => Info}),
  189. {noreply, State}.
  190. terminate(_Reason, #{pool := Pool, id := Id}) ->
  191. gproc_pool:disconnect_worker(Pool, {Pool, Id}).
  192. code_change(_OldVsn, State, _Extra) ->
  193. {ok, State}.