emqx_persistent_session_ds_router.erl 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_ds_router).
  17. -include("emqx.hrl").
  18. -include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
  19. -export([init_tables/0]).
  20. %% Route APIs
  21. -export([
  22. do_add_route/2,
  23. do_delete_route/2,
  24. has_any_route/1,
  25. match_routes/1,
  26. lookup_routes/1,
  27. foldr_routes/2,
  28. foldl_routes/2
  29. ]).
  30. %% Topics API
  31. -export([
  32. stream/1,
  33. stats/1
  34. ]).
  35. -export([cleanup_routes/1]).
  36. -export([print_routes/1]).
  37. -export([topics/0]).
  38. -ifdef(TEST).
  39. -export([has_route/2]).
  40. -endif.
  41. -type dest() :: emqx_persistent_session_ds:id().
  42. -export_type([dest/0]).
  43. %%--------------------------------------------------------------------
  44. %% Table Initialization
  45. %%--------------------------------------------------------------------
  46. init_tables() ->
  47. mria_config:set_dirty_shard(?PS_ROUTER_SHARD, true),
  48. ok = mria:create_table(?PS_ROUTER_TAB, [
  49. {type, bag},
  50. {rlog_shard, ?PS_ROUTER_SHARD},
  51. {storage, disc_copies},
  52. {record_name, ps_route},
  53. {attributes, record_info(fields, ps_route)},
  54. {storage_properties, [
  55. {ets, [
  56. {read_concurrency, true},
  57. {write_concurrency, true}
  58. ]}
  59. ]}
  60. ]),
  61. ok = mria:create_table(?PS_FILTERS_TAB, [
  62. {type, ordered_set},
  63. {rlog_shard, ?PS_ROUTER_SHARD},
  64. {storage, disc_copies},
  65. {record_name, ps_routeidx},
  66. {attributes, record_info(fields, ps_routeidx)},
  67. {storage_properties, [
  68. {ets, [
  69. {read_concurrency, true},
  70. {write_concurrency, auto}
  71. ]}
  72. ]}
  73. ]),
  74. ok = mria:wait_for_tables([?PS_ROUTER_TAB, ?PS_FILTERS_TAB]),
  75. ok.
  76. %%--------------------------------------------------------------------
  77. %% Route APIs
  78. %%--------------------------------------------------------------------
  79. -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  80. do_add_route(Topic, Dest) when is_binary(Topic) ->
  81. case has_route(Topic, Dest) of
  82. true ->
  83. ok;
  84. false ->
  85. mria_insert_route(Topic, Dest)
  86. end.
  87. -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  88. do_delete_route(Topic, Dest) ->
  89. case emqx_trie_search:filter(Topic) of
  90. Words when is_list(Words) ->
  91. K = emqx_topic_index:make_key(Words, Dest),
  92. mria:dirty_delete(?PS_FILTERS_TAB, K);
  93. false ->
  94. mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
  95. end.
  96. %% @doc Takes a real topic (not filter) as input, and returns whether there is any
  97. %% matching filters.
  98. -spec has_any_route(emqx_types:topic()) -> boolean().
  99. has_any_route(Topic) ->
  100. DirectTopicMatch = lookup_route_tab(Topic),
  101. WildcardMatch = emqx_topic_index:match(Topic, ?PS_FILTERS_TAB),
  102. case {DirectTopicMatch, WildcardMatch} of
  103. {[], false} ->
  104. false;
  105. {_, _} ->
  106. true
  107. end.
  108. %% @doc Take a real topic (not filter) as input, return the matching topics and topic
  109. %% filters associated with route destination.
  110. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
  111. match_routes(Topic) when is_binary(Topic) ->
  112. lookup_route_tab(Topic) ++
  113. [match_to_route(M) || M <- match_filters(Topic)].
  114. %% @doc Take a topic or filter as input, and return the existing routes with exactly
  115. %% this topic or filter.
  116. -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
  117. lookup_routes(Topic) ->
  118. case emqx_topic:wildcard(Topic) of
  119. true ->
  120. Pat = #ps_routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
  121. [Dest || [Dest] <- ets:match(?PS_FILTERS_TAB, Pat)];
  122. false ->
  123. lookup_route_tab(Topic)
  124. end.
  125. -spec has_route(emqx_types:topic(), dest()) -> boolean().
  126. has_route(Topic, Dest) ->
  127. case emqx_topic:wildcard(Topic) of
  128. true ->
  129. ets:member(?PS_FILTERS_TAB, emqx_topic_index:make_key(Topic, Dest));
  130. false ->
  131. has_route_tab_entry(Topic, Dest)
  132. end.
  133. -spec topics() -> list(emqx_types:topic()).
  134. topics() ->
  135. Pat = #ps_routeidx{entry = '$1'},
  136. Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?PS_FILTERS_TAB, Pat)],
  137. list_route_tab_topics() ++ Filters.
  138. %% @doc Print routes to a topic
  139. -spec print_routes(emqx_types:topic()) -> ok.
  140. print_routes(Topic) ->
  141. lists:foreach(
  142. fun(#ps_route{topic = To, dest = Dest}) ->
  143. io:format("~ts -> ~ts~n", [To, Dest])
  144. end,
  145. match_routes(Topic)
  146. ).
  147. -spec cleanup_routes(emqx_persistent_session_ds:id()) -> ok.
  148. cleanup_routes(DSSessionId) ->
  149. %% NOTE
  150. %% No point in transaction here because all the operations on filters table are dirty.
  151. ok = ets:foldl(
  152. fun(#ps_routeidx{entry = K}, ok) ->
  153. case get_dest_session_id(emqx_topic_index:get_id(K)) of
  154. DSSessionId ->
  155. mria:dirty_delete(?PS_FILTERS_TAB, K);
  156. _ ->
  157. ok
  158. end
  159. end,
  160. ok,
  161. ?PS_FILTERS_TAB
  162. ),
  163. ok = ets:foldl(
  164. fun(#ps_route{dest = Dest} = Route, ok) ->
  165. case get_dest_session_id(Dest) of
  166. DSSessionId ->
  167. mria:dirty_delete_object(?PS_ROUTER_TAB, Route);
  168. _ ->
  169. ok
  170. end
  171. end,
  172. ok,
  173. ?PS_ROUTER_TAB
  174. ).
  175. -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
  176. foldl_routes(FoldFun, AccIn) ->
  177. fold_routes(foldl, FoldFun, AccIn).
  178. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
  179. foldr_routes(FoldFun, AccIn) ->
  180. fold_routes(foldr, FoldFun, AccIn).
  181. %%--------------------------------------------------------------------
  182. %% Topic API
  183. %%--------------------------------------------------------------------
  184. %% @doc Create a `emqx_utils_stream:stream(#route{})` out of the router state,
  185. %% potentially filtered by a topic or topic filter. The stream emits `#route{}`
  186. %% records since this is what `emqx_mgmt_api_topics` knows how to deal with.
  187. -spec stream(_MTopic :: '_' | emqx_types:topic()) ->
  188. emqx_utils_stream:stream(emqx_types:route()).
  189. stream(MTopic) ->
  190. emqx_utils_stream:chain(stream(?PS_ROUTER_TAB, MTopic), stream(?PS_FILTERS_TAB, MTopic)).
  191. %% @doc Retrieve router stats.
  192. %% n_routes: total number of routes, should be equal to the length of `stream('_')`.
  193. -spec stats(n_routes) -> non_neg_integer().
  194. stats(n_routes) ->
  195. NTopics = ets:info(?PS_ROUTER_TAB, size),
  196. NFilters = ets:info(?PS_FILTERS_TAB, size),
  197. emqx_maybe:define(NTopics, 0) + emqx_maybe:define(NFilters, 0).
  198. %%--------------------------------------------------------------------
  199. %% Internal fns
  200. %%--------------------------------------------------------------------
  201. mria_insert_route(Topic, Dest) ->
  202. case emqx_trie_search:filter(Topic) of
  203. Words when is_list(Words) ->
  204. K = emqx_topic_index:make_key(Words, Dest),
  205. mria:dirty_write(?PS_FILTERS_TAB, #ps_routeidx{entry = K});
  206. false ->
  207. mria_route_tab_insert(#ps_route{topic = Topic, dest = Dest})
  208. end.
  209. fold_routes(FunName, FoldFun, AccIn) ->
  210. FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
  211. Acc = ets:FunName(FoldFun, AccIn, ?PS_ROUTER_TAB),
  212. ets:FunName(FilterFoldFun, Acc, ?PS_FILTERS_TAB).
  213. mk_filtertab_fold_fun(FoldFun) ->
  214. fun(#ps_routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
  215. match_filters(Topic) ->
  216. emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
  217. get_dest_session_id({_, DSSessionId}) ->
  218. DSSessionId;
  219. get_dest_session_id(DSSessionId) ->
  220. DSSessionId.
  221. export_route(#ps_route{topic = Topic, dest = Dest}) ->
  222. #route{topic = Topic, dest = Dest}.
  223. export_routeidx(#ps_routeidx{entry = M}) ->
  224. #route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
  225. match_to_route(M) ->
  226. #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
  227. mria_route_tab_insert(Route) ->
  228. mria:dirty_write(?PS_ROUTER_TAB, Route).
  229. lookup_route_tab(Topic) ->
  230. ets:lookup(?PS_ROUTER_TAB, Topic).
  231. has_route_tab_entry(Topic, Dest) ->
  232. [] =/= ets:match(?PS_ROUTER_TAB, #ps_route{topic = Topic, dest = Dest}).
  233. list_route_tab_topics() ->
  234. mnesia:dirty_all_keys(?PS_ROUTER_TAB).
  235. mria_route_tab_delete(Route) ->
  236. mria:dirty_delete_object(?PS_ROUTER_TAB, Route).
  237. %% @doc Create a `emqx_utils_stream:stream(#route{})` out of contents of either of
  238. %% 2 route tables, optionally filtered by a topic or topic filter. If the latter is
  239. %% specified, then it doesn't make sense to scan through `?PS_ROUTER_TAB` if it's
  240. %% a wildcard topic, and vice versa for `?PS_FILTERS_TAB` if it's not, so we optimize
  241. %% it away by returning an empty stream in those cases.
  242. stream(Tab = ?PS_ROUTER_TAB, MTopic) ->
  243. case MTopic == '_' orelse not emqx_topic:wildcard(MTopic) of
  244. true ->
  245. MatchSpec = #ps_route{topic = MTopic, _ = '_'},
  246. mk_tab_stream(Tab, MatchSpec, fun export_route/1);
  247. false ->
  248. emqx_utils_stream:empty()
  249. end;
  250. stream(Tab = ?PS_FILTERS_TAB, MTopic) ->
  251. case MTopic == '_' orelse emqx_topic:wildcard(MTopic) of
  252. true ->
  253. MatchSpec = #ps_routeidx{entry = emqx_trie_search:make_pat(MTopic, '_'), _ = '_'},
  254. mk_tab_stream(Tab, MatchSpec, fun export_routeidx/1);
  255. false ->
  256. emqx_utils_stream:empty()
  257. end.
  258. mk_tab_stream(Tab, MatchSpec, Mapper) ->
  259. %% NOTE: Currently relying on the fact that tables are backed by ETSes.
  260. emqx_utils_stream:map(
  261. Mapper,
  262. emqx_utils_stream:ets(fun
  263. (undefined) -> ets:match_object(Tab, MatchSpec, 1);
  264. (Cont) -> ets:match_object(Cont)
  265. end)
  266. ).