emqx_persistent_session_ds_router.erl 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_persistent_session_ds_router).
  17. -include("emqx.hrl").
  18. -include("emqx_persistent_session_ds/emqx_ps_ds_int.hrl").
  19. -export([init_tables/0]).
  20. %% Route APIs
  21. -export([
  22. do_add_route/2,
  23. do_delete_route/2,
  24. has_any_route/1,
  25. match_routes/1,
  26. lookup_routes/1,
  27. foldr_routes/2,
  28. foldl_routes/2
  29. ]).
  30. -export([cleanup_routes/1]).
  31. -export([print_routes/1]).
  32. -export([topics/0]).
  33. -ifdef(TEST).
  34. -export([has_route/2]).
  35. -endif.
  36. -type dest() :: emqx_persistent_session_ds:id().
  37. -export_type([dest/0]).
  38. %%--------------------------------------------------------------------
  39. %% Table Initialization
  40. %%--------------------------------------------------------------------
  41. init_tables() ->
  42. mria_config:set_dirty_shard(?PS_ROUTER_SHARD, true),
  43. ok = mria:create_table(?PS_ROUTER_TAB, [
  44. {type, bag},
  45. {rlog_shard, ?PS_ROUTER_SHARD},
  46. {storage, disc_copies},
  47. {record_name, ps_route},
  48. {attributes, record_info(fields, ps_route)},
  49. {storage_properties, [
  50. {ets, [
  51. {read_concurrency, true},
  52. {write_concurrency, true}
  53. ]}
  54. ]}
  55. ]),
  56. ok = mria:create_table(?PS_FILTERS_TAB, [
  57. {type, ordered_set},
  58. {rlog_shard, ?PS_ROUTER_SHARD},
  59. {storage, disc_copies},
  60. {record_name, ps_routeidx},
  61. {attributes, record_info(fields, ps_routeidx)},
  62. {storage_properties, [
  63. {ets, [
  64. {read_concurrency, true},
  65. {write_concurrency, auto}
  66. ]}
  67. ]}
  68. ]),
  69. ok = mria:wait_for_tables([?PS_ROUTER_TAB, ?PS_FILTERS_TAB]),
  70. ok.
  71. %%--------------------------------------------------------------------
  72. %% Route APIs
  73. %%--------------------------------------------------------------------
  74. -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  75. do_add_route(Topic, Dest) when is_binary(Topic) ->
  76. case has_route(Topic, Dest) of
  77. true ->
  78. ok;
  79. false ->
  80. mria_insert_route(Topic, Dest)
  81. end.
  82. -spec do_delete_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
  83. do_delete_route(Topic, Dest) ->
  84. case emqx_trie_search:filter(Topic) of
  85. Words when is_list(Words) ->
  86. K = emqx_topic_index:make_key(Words, Dest),
  87. mria:dirty_delete(?PS_FILTERS_TAB, K);
  88. false ->
  89. mria_route_tab_delete(#ps_route{topic = Topic, dest = Dest})
  90. end.
  91. %% @doc Takes a real topic (not filter) as input, and returns whether there is any
  92. %% matching filters.
  93. -spec has_any_route(emqx_types:topic()) -> boolean().
  94. has_any_route(Topic) ->
  95. DirectTopicMatch = lookup_route_tab(Topic),
  96. WildcardMatch = emqx_topic_index:match(Topic, ?PS_FILTERS_TAB),
  97. case {DirectTopicMatch, WildcardMatch} of
  98. {[], false} ->
  99. false;
  100. {_, _} ->
  101. true
  102. end.
  103. %% @doc Take a real topic (not filter) as input, return the matching topics and topic
  104. %% filters associated with route destination.
  105. -spec match_routes(emqx_types:topic()) -> [emqx_types:route()].
  106. match_routes(Topic) when is_binary(Topic) ->
  107. lookup_route_tab(Topic) ++
  108. [match_to_route(M) || M <- match_filters(Topic)].
  109. %% @doc Take a topic or filter as input, and return the existing routes with exactly
  110. %% this topic or filter.
  111. -spec lookup_routes(emqx_types:topic()) -> [emqx_types:route()].
  112. lookup_routes(Topic) ->
  113. case emqx_topic:wildcard(Topic) of
  114. true ->
  115. Pat = #ps_routeidx{entry = emqx_topic_index:make_key(Topic, '$1')},
  116. [Dest || [Dest] <- ets:match(?PS_FILTERS_TAB, Pat)];
  117. false ->
  118. lookup_route_tab(Topic)
  119. end.
  120. -spec has_route(emqx_types:topic(), dest()) -> boolean().
  121. has_route(Topic, Dest) ->
  122. case emqx_topic:wildcard(Topic) of
  123. true ->
  124. ets:member(?PS_FILTERS_TAB, emqx_topic_index:make_key(Topic, Dest));
  125. false ->
  126. has_route_tab_entry(Topic, Dest)
  127. end.
  128. -spec topics() -> list(emqx_types:topic()).
  129. topics() ->
  130. Pat = #ps_routeidx{entry = '$1'},
  131. Filters = [emqx_topic_index:get_topic(K) || [K] <- ets:match(?PS_FILTERS_TAB, Pat)],
  132. list_route_tab_topics() ++ Filters.
  133. %% @doc Print routes to a topic
  134. -spec print_routes(emqx_types:topic()) -> ok.
  135. print_routes(Topic) ->
  136. lists:foreach(
  137. fun(#ps_route{topic = To, dest = Dest}) ->
  138. io:format("~ts -> ~ts~n", [To, Dest])
  139. end,
  140. match_routes(Topic)
  141. ).
  142. -spec cleanup_routes(emqx_persistent_session_ds:id()) -> ok.
  143. cleanup_routes(DSSessionId) ->
  144. %% NOTE
  145. %% No point in transaction here because all the operations on filters table are dirty.
  146. ok = ets:foldl(
  147. fun(#ps_routeidx{entry = K}, ok) ->
  148. case get_dest_session_id(emqx_topic_index:get_id(K)) of
  149. DSSessionId ->
  150. mria:dirty_delete(?PS_FILTERS_TAB, K);
  151. _ ->
  152. ok
  153. end
  154. end,
  155. ok,
  156. ?PS_FILTERS_TAB
  157. ),
  158. ok = ets:foldl(
  159. fun(#ps_route{dest = Dest} = Route, ok) ->
  160. case get_dest_session_id(Dest) of
  161. DSSessionId ->
  162. mria:dirty_delete_object(?PS_ROUTER_TAB, Route);
  163. _ ->
  164. ok
  165. end
  166. end,
  167. ok,
  168. ?PS_ROUTER_TAB
  169. ).
  170. -spec foldl_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
  171. foldl_routes(FoldFun, AccIn) ->
  172. fold_routes(foldl, FoldFun, AccIn).
  173. -spec foldr_routes(fun((emqx_types:route(), Acc) -> Acc), Acc) -> Acc.
  174. foldr_routes(FoldFun, AccIn) ->
  175. fold_routes(foldr, FoldFun, AccIn).
  176. %%--------------------------------------------------------------------
  177. %% Internal fns
  178. %%--------------------------------------------------------------------
  179. mria_insert_route(Topic, Dest) ->
  180. case emqx_trie_search:filter(Topic) of
  181. Words when is_list(Words) ->
  182. K = emqx_topic_index:make_key(Words, Dest),
  183. mria:dirty_write(?PS_FILTERS_TAB, #ps_routeidx{entry = K});
  184. false ->
  185. mria_route_tab_insert(#ps_route{topic = Topic, dest = Dest})
  186. end.
  187. fold_routes(FunName, FoldFun, AccIn) ->
  188. FilterFoldFun = mk_filtertab_fold_fun(FoldFun),
  189. Acc = ets:FunName(FoldFun, AccIn, ?PS_ROUTER_TAB),
  190. ets:FunName(FilterFoldFun, Acc, ?PS_FILTERS_TAB).
  191. mk_filtertab_fold_fun(FoldFun) ->
  192. fun(#ps_routeidx{entry = K}, Acc) -> FoldFun(match_to_route(K), Acc) end.
  193. match_filters(Topic) ->
  194. emqx_topic_index:matches(Topic, ?PS_FILTERS_TAB, []).
  195. get_dest_session_id({_, DSSessionId}) ->
  196. DSSessionId;
  197. get_dest_session_id(DSSessionId) ->
  198. DSSessionId.
  199. match_to_route(M) ->
  200. #ps_route{topic = emqx_topic_index:get_topic(M), dest = emqx_topic_index:get_id(M)}.
  201. mria_route_tab_insert(Route) ->
  202. mria:dirty_write(?PS_ROUTER_TAB, Route).
  203. lookup_route_tab(Topic) ->
  204. ets:lookup(?PS_ROUTER_TAB, Topic).
  205. has_route_tab_entry(Topic, Dest) ->
  206. [] =/= ets:match(?PS_ROUTER_TAB, #ps_route{topic = Topic, dest = Dest}).
  207. list_route_tab_topics() ->
  208. mnesia:dirty_all_keys(?PS_ROUTER_TAB).
  209. mria_route_tab_delete(Route) ->
  210. mria:dirty_delete_object(?PS_ROUTER_TAB, Route).