emqx_session_router.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_session_router).
  17. -behaviour(gen_server).
  18. -include("emqx.hrl").
  19. -include("logger.hrl").
  20. -include("types.hrl").
  21. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  22. -export([
  23. create_init_tab/0,
  24. create_router_tab/1,
  25. start_link/2
  26. ]).
  27. %% Route APIs
  28. -export([
  29. delete_routes/2,
  30. do_add_route/2,
  31. do_delete_route/2,
  32. match_routes/1
  33. ]).
  34. -export([
  35. buffer/3,
  36. pending/2,
  37. resume_begin/2,
  38. resume_end/2
  39. ]).
  40. -export([print_routes/1]).
  41. %% gen_server callbacks
  42. -export([
  43. init/1,
  44. handle_call/3,
  45. handle_cast/2,
  46. handle_info/2,
  47. terminate/2,
  48. code_change/3
  49. ]).
  50. -type dest() :: node() | {emqx_types:group(), node()}.
  51. -define(ROUTE_RAM_TAB, emqx_session_route_ram).
  52. -define(ROUTE_DISC_TAB, emqx_session_route_disc).
  53. -define(SESSION_INIT_TAB, session_init_tab).
  54. %%--------------------------------------------------------------------
  55. %% Mnesia bootstrap
  56. %%--------------------------------------------------------------------
  57. create_router_tab(disc) ->
  58. create_table(?ROUTE_DISC_TAB, disc_copies);
  59. create_router_tab(ram) ->
  60. create_table(?ROUTE_RAM_TAB, ram_copies).
  61. create_table(Tab, Storage) ->
  62. ok = mria:create_table(Tab, [
  63. {type, bag},
  64. {rlog_shard, ?ROUTE_SHARD},
  65. {storage, Storage},
  66. {record_name, route},
  67. {attributes, record_info(fields, route)},
  68. {storage_properties, [
  69. {ets, [
  70. {read_concurrency, true},
  71. {write_concurrency, true}
  72. ]}
  73. ]}
  74. ]).
  75. %%--------------------------------------------------------------------
  76. %% Start a router
  77. %%--------------------------------------------------------------------
  78. create_init_tab() ->
  79. emqx_utils_ets:new(?SESSION_INIT_TAB, [
  80. public,
  81. {read_concurrency, true},
  82. {write_concurrency, true}
  83. ]).
  84. -spec start_link(atom(), pos_integer()) -> startlink_ret().
  85. start_link(Pool, Id) ->
  86. gen_server:start_link(
  87. {local, emqx_utils:proc_name(?MODULE, Id)},
  88. ?MODULE,
  89. [Pool, Id],
  90. [{hibernate_after, 1000}]
  91. ).
  92. %%--------------------------------------------------------------------
  93. %% Route APIs
  94. %%--------------------------------------------------------------------
  95. -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  96. do_add_route(Topic, SessionID) when is_binary(Topic) ->
  97. Route = #route{topic = Topic, dest = SessionID},
  98. case lists:member(Route, lookup_routes(Topic)) of
  99. true ->
  100. ok;
  101. false ->
  102. case emqx_topic:wildcard(Topic) of
  103. true ->
  104. Fun = fun emqx_router_utils:insert_session_trie_route/2,
  105. emqx_router_utils:maybe_trans(
  106. Fun,
  107. [route_tab(), Route],
  108. ?PERSISTENT_SESSION_SHARD
  109. );
  110. false ->
  111. emqx_router_utils:insert_direct_route(route_tab(), Route)
  112. end
  113. end.
  114. %% @doc Match routes
  115. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
  116. match_routes(Topic) when is_binary(Topic) ->
  117. case match_trie(Topic) of
  118. [] -> lookup_routes(Topic);
  119. Matched -> lists:append([lookup_routes(To) || To <- [Topic | Matched]])
  120. end.
  121. %% Optimize: routing table will be replicated to all router nodes.
  122. match_trie(Topic) ->
  123. case emqx_trie:empty_session() of
  124. true -> [];
  125. false -> emqx_trie:match_session(Topic)
  126. end.
  127. %% Async
  128. delete_routes(SessionID, Subscriptions) ->
  129. cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}).
  130. -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  131. do_delete_route(Topic, SessionID) ->
  132. Route = #route{topic = Topic, dest = SessionID},
  133. case emqx_topic:wildcard(Topic) of
  134. true ->
  135. Fun = fun emqx_router_utils:delete_session_trie_route/2,
  136. emqx_router_utils:maybe_trans(Fun, [route_tab(), Route], ?PERSISTENT_SESSION_SHARD);
  137. false ->
  138. emqx_router_utils:delete_direct_route(route_tab(), Route)
  139. end.
  140. %% @doc Print routes to a topic
  141. -spec print_routes(emqx_types:topic()) -> ok.
  142. print_routes(Topic) ->
  143. lists:foreach(
  144. fun(#route{topic = To, dest = SessionID}) ->
  145. io:format("~s -> ~p~n", [To, SessionID])
  146. end,
  147. match_routes(Topic)
  148. ).
  149. %%--------------------------------------------------------------------
  150. %% Session APIs
  151. %%--------------------------------------------------------------------
  152. pending(SessionID, MarkerIDs) ->
  153. call(pick(SessionID), {pending, SessionID, MarkerIDs}).
  154. buffer(SessionID, STopic, Msg) ->
  155. case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
  156. undefined -> ok;
  157. Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg)
  158. end.
  159. -spec resume_begin(pid(), binary()) -> [{node(), emqx_guid:guid()}].
  160. resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) ->
  161. call(pick(SessionID), {resume_begin, From, SessionID}).
  162. -spec resume_end(pid(), binary()) ->
  163. {'ok', [emqx_types:message()]} | {'error', term()}.
  164. resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) ->
  165. case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
  166. undefined ->
  167. ?tp(ps_session_not_found, #{sid => SessionID}),
  168. {error, not_found};
  169. Pid ->
  170. Res = emqx_session_router_worker:resume_end(From, Pid, SessionID),
  171. cast(pick(SessionID), {resume_end, SessionID, Pid}),
  172. Res
  173. end.
  174. %%--------------------------------------------------------------------
  175. %% Worker internals
  176. %%--------------------------------------------------------------------
  177. call(Router, Msg) ->
  178. gen_server:call(Router, Msg, infinity).
  179. cast(Router, Msg) ->
  180. gen_server:cast(Router, Msg).
  181. pick(#route{dest = SessionID}) ->
  182. gproc_pool:pick_worker(session_router_pool, SessionID);
  183. pick(SessionID) when is_binary(SessionID) ->
  184. gproc_pool:pick_worker(session_router_pool, SessionID).
  185. %%--------------------------------------------------------------------
  186. %% gen_server callbacks
  187. %%--------------------------------------------------------------------
  188. init([Pool, Id]) ->
  189. true = gproc_pool:connect_worker(Pool, {Pool, Id}),
  190. {ok, #{pool => Pool, id => Id, pmon => emqx_pmon:new()}}.
  191. handle_call({resume_begin, RemotePid, SessionID}, _From, State) ->
  192. case init_resume_worker(RemotePid, SessionID, State) of
  193. error ->
  194. {reply, error, State};
  195. {ok, Pid, State1} ->
  196. ets:insert(?SESSION_INIT_TAB, {SessionID, Pid}),
  197. MarkerID = emqx_persistent_session:mark_resume_begin(SessionID),
  198. {reply, {ok, MarkerID}, State1}
  199. end;
  200. handle_call({pending, SessionID, MarkerIDs}, _From, State) ->
  201. Res = emqx_persistent_session:pending_messages_in_db(SessionID, MarkerIDs),
  202. {reply, Res, State};
  203. handle_call(Req, _From, State) ->
  204. ?SLOG(error, #{msg => "unexpected_call", req => Req}),
  205. {reply, ignored, State}.
  206. handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
  207. %% TODO: Make a batch for deleting all routes.
  208. Fun = fun(Topic, _) -> do_delete_route(Topic, SessionID) end,
  209. ok = maps:foreach(Fun, Subscriptions),
  210. {noreply, State};
  211. handle_cast({resume_end, SessionID, Pid}, State) ->
  212. case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
  213. undefined -> skip;
  214. P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID);
  215. P when is_pid(P) -> skip
  216. end,
  217. Pmon = emqx_pmon:demonitor(Pid, maps:get(pmon, State)),
  218. _ = emqx_session_router_worker_sup:abort_worker(Pid),
  219. {noreply, State#{pmon => Pmon}};
  220. handle_cast(Msg, State) ->
  221. ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
  222. {noreply, State}.
  223. handle_info(Info, State) ->
  224. ?SLOG(error, #{msg => "unexpected_info", info => Info}),
  225. {noreply, State}.
  226. terminate(_Reason, #{pool := Pool, id := Id}) ->
  227. gproc_pool:disconnect_worker(Pool, {Pool, Id}).
  228. code_change(_OldVsn, State, _Extra) ->
  229. {ok, State}.
  230. %%--------------------------------------------------------------------
  231. %% Resume worker. A process that buffers the persisted messages during
  232. %% initialisation of a resuming session.
  233. %%--------------------------------------------------------------------
  234. init_resume_worker(RemotePid, SessionID, #{pmon := Pmon} = State) ->
  235. case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of
  236. {error, What} ->
  237. ?SLOG(error, #{msg => "failed_to_start_resume_worker", reason => What}),
  238. error;
  239. {ok, Pid} ->
  240. Pmon1 = emqx_pmon:monitor(Pid, Pmon),
  241. case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
  242. undefined ->
  243. {ok, Pid, State#{pmon => Pmon1}};
  244. {_, OldPid} ->
  245. Pmon2 = emqx_pmon:demonitor(OldPid, Pmon1),
  246. _ = emqx_session_router_worker_sup:abort_worker(OldPid),
  247. {ok, Pid, State#{pmon => Pmon2}}
  248. end
  249. end.
  250. %%--------------------------------------------------------------------
  251. %% Internal functions
  252. %%--------------------------------------------------------------------
  253. lookup_routes(Topic) ->
  254. ets:lookup(route_tab(), Topic).
  255. route_tab() ->
  256. case emqx_persistent_session:storage_type() of
  257. disc -> ?ROUTE_DISC_TAB;
  258. ram -> ?ROUTE_RAM_TAB
  259. end.