emqx_topic_index_SUITE.erl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_topic_index_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("proper/include/proper.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -import(emqx_proper_types, [scaled/2]).
  22. all() ->
  23. [
  24. {group, ets},
  25. {group, gb_tree}
  26. ].
  27. groups() ->
  28. All = emqx_common_test_helpers:all(?MODULE),
  29. [
  30. {ets, All},
  31. {gb_tree, All}
  32. ].
  33. init_per_group(ets, Config) ->
  34. [{index_module, emqx_topic_index} | Config];
  35. init_per_group(gb_tree, Config) ->
  36. [{index_module, emqx_topic_gbt_pterm} | Config].
  37. end_per_group(_Group, _Config) ->
  38. ok.
  39. get_module(Config) ->
  40. proplists:get_value(index_module, Config).
  41. t_insert(Config) ->
  42. M = get_module(Config),
  43. Tab = M:new(),
  44. true = M:insert(<<"sensor/1/metric/2">>, t_insert_1, <<>>, Tab),
  45. true = M:insert(<<"sensor/+/#">>, t_insert_2, <<>>, Tab),
  46. true = M:insert(<<"sensor/#">>, t_insert_3, <<>>, Tab),
  47. ?assertEqual(<<"sensor/#">>, topic(match(M, <<"sensor">>, Tab))),
  48. ?assertEqual(t_insert_3, id(match(M, <<"sensor">>, Tab))).
  49. t_insert_filter(Config) ->
  50. M = get_module(Config),
  51. Tab = M:new(),
  52. Topic = <<"sensor/+/metric//#">>,
  53. true = M:insert(Topic, 1, <<>>, Tab),
  54. true = M:insert(emqx_trie_search:filter(Topic), 2, <<>>, Tab),
  55. ?assertEqual(
  56. [Topic, Topic],
  57. [topic(X) || X <- matches(M, <<"sensor/1/metric//2">>, Tab)]
  58. ).
  59. t_match(Config) ->
  60. M = get_module(Config),
  61. Tab = M:new(),
  62. true = M:insert(<<"sensor/1/metric/2">>, t_match_1, <<>>, Tab),
  63. true = M:insert(<<"sensor/+/#">>, t_match_2, <<>>, Tab),
  64. true = M:insert(<<"sensor/#">>, t_match_3, <<>>, Tab),
  65. ?assertMatch(
  66. [<<"sensor/#">>, <<"sensor/+/#">>],
  67. [topic(X) || X <- matches(M, <<"sensor/1">>, Tab)]
  68. ).
  69. t_match2(Config) ->
  70. M = get_module(Config),
  71. Tab = M:new(),
  72. true = M:insert(<<"#">>, t_match2_1, <<>>, Tab),
  73. true = M:insert(<<"+/#">>, t_match2_2, <<>>, Tab),
  74. true = M:insert(<<"+/+/#">>, t_match2_3, <<>>, Tab),
  75. ?assertEqual(
  76. [<<"#">>, <<"+/#">>, <<"+/+/#">>],
  77. [topic(X) || X <- matches(M, <<"a/b/c">>, Tab)]
  78. ),
  79. ?assertEqual(
  80. false,
  81. M:match(<<"$SYS/broker/zenmq">>, Tab)
  82. ),
  83. ?assertEqual(
  84. [],
  85. matches(M, <<"$SYS/broker/zenmq">>, Tab)
  86. ).
  87. t_match3(Config) ->
  88. M = get_module(Config),
  89. Tab = M:new(),
  90. Records = [
  91. {<<"d/#">>, t_match3_1},
  92. {<<"a/b/+">>, t_match3_2},
  93. {<<"a/#">>, t_match3_3},
  94. {<<"#">>, t_match3_4},
  95. {<<"$SYS/#">>, t_match3_sys}
  96. ],
  97. lists:foreach(
  98. fun({Topic, ID}) -> M:insert(Topic, ID, <<>>, Tab) end,
  99. Records
  100. ),
  101. Matched = matches(M, <<"a/b/c">>, Tab),
  102. case length(Matched) of
  103. 3 -> ok;
  104. _ -> error({unexpected, Matched})
  105. end,
  106. ?assertEqual(
  107. t_match3_sys,
  108. id(match(M, <<"$SYS/a/b/c">>, Tab))
  109. ).
  110. t_match4(Config) ->
  111. M = get_module(Config),
  112. Tab = M:new(),
  113. Records = [{<<"/#">>, t_match4_1}, {<<"/+">>, t_match4_2}, {<<"/+/a/b/c">>, t_match4_3}],
  114. lists:foreach(
  115. fun({Topic, ID}) -> M:insert(Topic, ID, <<>>, Tab) end,
  116. Records
  117. ),
  118. ?assertEqual(
  119. [<<"/#">>, <<"/+">>],
  120. [topic(X) || X <- matches(M, <<"/">>, Tab)]
  121. ),
  122. ?assertEqual(
  123. [<<"/#">>, <<"/+/a/b/c">>],
  124. [topic(X) || X <- matches(M, <<"/0/a/b/c">>, Tab)]
  125. ).
  126. t_match5(Config) ->
  127. M = get_module(Config),
  128. Tab = M:new(),
  129. T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
  130. Records = [
  131. {<<"#">>, t_match5_1},
  132. {<<T/binary, "/#">>, t_match5_2},
  133. {<<T/binary, "/+">>, t_match5_3}
  134. ],
  135. lists:foreach(
  136. fun({Topic, ID}) -> M:insert(Topic, ID, <<>>, Tab) end,
  137. Records
  138. ),
  139. ?assertEqual(
  140. [<<"#">>, <<T/binary, "/#">>],
  141. [topic(X) || X <- matches(M, T, Tab)]
  142. ),
  143. ?assertEqual(
  144. [<<"#">>, <<T/binary, "/#">>, <<T/binary, "/+">>],
  145. [topic(X) || X <- matches(M, <<T/binary, "/1">>, Tab)]
  146. ).
  147. t_match6(Config) ->
  148. M = get_module(Config),
  149. Tab = M:new(),
  150. T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
  151. W = <<"+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/+/#">>,
  152. M:insert(W, ID = t_match6, <<>>, Tab),
  153. ?assertEqual(ID, id(match(M, T, Tab))).
  154. t_match7(Config) ->
  155. M = get_module(Config),
  156. Tab = M:new(),
  157. T = <<"a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z">>,
  158. W = <<"a/+/c/+/e/+/g/+/i/+/k/+/m/+/o/+/q/+/s/+/u/+/w/+/y/+/#">>,
  159. M:insert(W, t_match7, <<>>, Tab),
  160. ?assertEqual(W, topic(match(M, T, Tab))).
  161. t_match8(Config) ->
  162. M = get_module(Config),
  163. Tab = M:new(),
  164. Filters = [<<"+">>, <<"dev/global/sensor">>, <<"dev/+/sensor/#">>],
  165. IDs = [1, 2, 3],
  166. Keys = [{F, ID} || F <- Filters, ID <- IDs],
  167. lists:foreach(
  168. fun({F, ID}) ->
  169. M:insert(F, ID, <<>>, Tab)
  170. end,
  171. Keys
  172. ),
  173. Topic = <<"dev/global/sensor">>,
  174. Matches = lists:sort(matches(M, Topic, Tab)),
  175. ?assertEqual(
  176. [
  177. <<"dev/+/sensor/#">>,
  178. <<"dev/+/sensor/#">>,
  179. <<"dev/+/sensor/#">>,
  180. <<"dev/global/sensor">>,
  181. <<"dev/global/sensor">>,
  182. <<"dev/global/sensor">>
  183. ],
  184. [emqx_topic_index:get_topic(Match) || Match <- Matches]
  185. ).
  186. t_match_fast_forward(Config) ->
  187. M = get_module(Config),
  188. Tab = M:new(),
  189. M:insert(<<"a/b/1/2/3/4/5/6/7/8/9/#">>, id1, <<>>, Tab),
  190. M:insert(<<"z/y/x/+/+">>, id2, <<>>, Tab),
  191. M:insert(<<"a/b/c/+">>, id3, <<>>, Tab),
  192. ?assertEqual(id1, id(match(M, <<"a/b/1/2/3/4/5/6/7/8/9/0">>, Tab))),
  193. ?assertEqual([id1], [id(X) || X <- matches(M, <<"a/b/1/2/3/4/5/6/7/8/9/0">>, Tab)]).
  194. t_match_unique(Config) ->
  195. M = get_module(Config),
  196. Tab = M:new(),
  197. M:insert(<<"a/b/c">>, t_match_id1, <<>>, Tab),
  198. M:insert(<<"a/b/+">>, t_match_id1, <<>>, Tab),
  199. M:insert(<<"a/b/c/+">>, t_match_id2, <<>>, Tab),
  200. ?assertEqual(
  201. [t_match_id1, t_match_id1],
  202. [id(X) || X <- matches(M, <<"a/b/c">>, Tab, [])]
  203. ),
  204. ?assertEqual(
  205. [t_match_id1],
  206. [id(X) || X <- matches(M, <<"a/b/c">>, Tab, [unique])]
  207. ).
  208. t_match_wildcard_edge_cases(Config) ->
  209. M = get_module(Config),
  210. CommonTopics = [
  211. <<"a/b">>,
  212. <<"a/b/#">>,
  213. <<"a/b/#">>,
  214. <<"a/b/c">>,
  215. <<"a/b/+">>,
  216. <<"a/b/d">>,
  217. <<"a/+/+">>,
  218. <<"a/+/#">>
  219. ],
  220. Datasets =
  221. [
  222. %% Topics, TopicName, Results
  223. {CommonTopics, <<"a/b/c">>, [2, 3, 4, 5, 7, 8]},
  224. {CommonTopics, <<"a/b">>, [1, 2, 3, 8]},
  225. {[<<"+/b/c">>, <<"/">>], <<"a/b/c">>, [1]},
  226. {[<<"#">>, <<"/">>], <<"a">>, [1]},
  227. {[<<"/">>, <<"+">>], <<"a">>, [2]}
  228. ],
  229. F = fun({Topics, TopicName, Expected}) ->
  230. Tab = M:new(),
  231. _ = [M:insert(T, N, <<>>, Tab) || {N, T} <- lists:enumerate(Topics)],
  232. ?assertEqual(
  233. lists:last(Expected),
  234. id(M:match(TopicName, Tab)),
  235. #{"Base topics" => Topics, "Topic name" => TopicName}
  236. ),
  237. ?assertEqual(
  238. Expected,
  239. [id(X) || X <- matches(M, TopicName, Tab, [unique])],
  240. #{"Base topics" => Topics, "Topic name" => TopicName}
  241. )
  242. end,
  243. lists:foreach(F, Datasets).
  244. t_prop_edgecase(Config) ->
  245. M = get_module(Config),
  246. Tab = M:new(),
  247. Topic = <<"01/01">>,
  248. Filters = [
  249. {1, <<>>},
  250. {2, <<"+/01">>},
  251. {3, <<>>},
  252. {4, <<"+/+/01">>}
  253. ],
  254. _ = [M:insert(F, N, <<>>, Tab) || {N, F} <- Filters],
  255. ?assertMatch([2], [id(X) || X <- matches(M, Topic, Tab, [unique])]).
  256. t_prop_matches(Config) ->
  257. M = get_module(Config),
  258. ?assert(
  259. proper:quickcheck(
  260. topic_matches_prop(M),
  261. [{max_size, 100}, {numtests, 100}]
  262. )
  263. ),
  264. Statistics = [{C, account(C)} || C <- [filters, topics, matches, maxhits]],
  265. ct:pal("Statistics: ~p", [maps:from_list(Statistics)]).
  266. topic_matches_prop(M) ->
  267. ?FORALL(
  268. % Generate a longer list of topics and a shorter list of topic filter patterns.
  269. #{
  270. topics := TTopics,
  271. patterns := Pats
  272. },
  273. emqx_proper_types:fixedmap(#{
  274. % NOTE
  275. % Beware adding non-empty contraint, proper will have a hard time with `topic_t/1`
  276. % for some reason.
  277. topics => scaled(4, list(topic_t([1, 2, 3, 4]))),
  278. patterns => list(topic_filter_pattern_t())
  279. }),
  280. begin
  281. Tab = M:new(),
  282. Topics = [emqx_topic:join(T) || T <- TTopics],
  283. % Produce topic filters from generated topics and patterns.
  284. % Number of filters is equal to the number of patterns, most of the time.
  285. Filters = lists:enumerate(mk_filters(Pats, TTopics)),
  286. _ = [M:insert(F, N, <<>>, Tab) || {N, F} <- Filters],
  287. % Gather some basic statistics
  288. _ = account(filters, length(Filters)),
  289. _ = account(topics, NTopics = length(Topics)),
  290. _ = account(maxhits, NTopics * NTopics),
  291. % Verify that matching each topic against index returns the same results as
  292. % matching it against the list of filters one by one.
  293. lists:all(
  294. fun(Topic) ->
  295. Ids1 = [id(X) || X <- matches(M, Topic, Tab, [unique])],
  296. Ids2 = lists:filtermap(
  297. fun({N, F}) ->
  298. case emqx_topic:match(Topic, F) of
  299. true -> {true, N};
  300. false -> false
  301. end
  302. end,
  303. Filters
  304. ),
  305. % Account a number of matches to compute hitrate later
  306. _ = account(matches, length(Ids1)),
  307. case (Ids2 -- Ids1) ++ (Ids2 -- Ids1) of
  308. [] ->
  309. true;
  310. [_ | _] = _Differences ->
  311. ct:pal(
  312. "Topic name: ~p~n"
  313. "Index results: ~p~n"
  314. "Topic match results: ~p~n"
  315. "Filters: ~p~n",
  316. [Topic, Ids1, Ids2, Filters]
  317. ),
  318. false
  319. end
  320. end,
  321. Topics
  322. )
  323. end
  324. ).
  325. mk_filters([Pat | PRest], [Topic | TRest]) ->
  326. [emqx_topic:join(mk_topic_filter(Pat, Topic)) | mk_filters(PRest, TRest)];
  327. mk_filters(_, _) ->
  328. [].
  329. account(Counter, N) ->
  330. put({?MODULE, Counter}, account(Counter) + N).
  331. account(Counter) ->
  332. emqx_maybe:define(get({?MODULE, Counter}), 0).
  333. %%
  334. match(M, T, Tab) ->
  335. M:match(T, Tab).
  336. matches(M, T, Tab) ->
  337. lists:sort(M:matches(T, Tab, [])).
  338. matches(M, T, Tab, Opts) ->
  339. M:matches(T, Tab, Opts).
  340. id(Match) ->
  341. emqx_trie_search:get_id(Match).
  342. topic(Match) ->
  343. emqx_trie_search:get_topic(Match).
  344. %%
  345. topic_t(EntropyWeights) ->
  346. EWLast = lists:last(EntropyWeights),
  347. ?LET(L, scaled(1 / 4, list(EWLast)), begin
  348. EWs = lists:sublist(EntropyWeights ++ L, length(L)),
  349. ?SIZED(S, [oneof([topic_level_t(S * EW), topic_level_fixed_t()]) || EW <- EWs])
  350. end).
  351. topic_level_t(Entropy) ->
  352. S = floor(1 + math:log2(Entropy) / 4),
  353. ?LET(I, range(1, Entropy), iolist_to_binary(io_lib:format("~*.16.0B", [S, I]))).
  354. topic_level_fixed_t() ->
  355. oneof([
  356. <<"foo">>,
  357. <<"bar">>,
  358. <<"baz">>,
  359. <<"xyzzy">>
  360. ]).
  361. topic_filter_pattern_t() ->
  362. list(topic_level_pattern_t()).
  363. topic_level_pattern_t() ->
  364. frequency([
  365. {5, level},
  366. {2, '+'},
  367. {1, '#'}
  368. ]).
  369. mk_topic_filter([], _) ->
  370. [];
  371. mk_topic_filter(_, []) ->
  372. [];
  373. mk_topic_filter(['#' | _], _) ->
  374. ['#'];
  375. mk_topic_filter(['+' | Rest], [_ | Levels]) ->
  376. ['+' | mk_topic_filter(Rest, Levels)];
  377. mk_topic_filter([level | Rest], [L | Levels]) ->
  378. [L | mk_topic_filter(Rest, Levels)].