emqx_rewrite_SUITE.erl 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rewrite_SUITE).
  17. -compile(export_all).
  18. -compile(nowarn_export_all).
  19. -include_lib("emqx/include/emqx_mqtt.hrl").
  20. -include_lib("eunit/include/eunit.hrl").
  21. -define(REWRITE, #{
  22. <<"rewrite">> => [
  23. #{
  24. <<"action">> => <<"publish">>,
  25. <<"dest_topic">> => <<"z/y/$1">>,
  26. <<"re">> => <<"^x/y/(.+)$">>,
  27. <<"source_topic">> => <<"x/#">>
  28. },
  29. #{
  30. <<"action">> => <<"publish">>,
  31. <<"dest_topic">> => <<"pub/${username}/$1">>,
  32. <<"re">> => <<"^name/(.+)$">>,
  33. <<"source_topic">> => <<"name/#">>
  34. },
  35. #{
  36. <<"action">> => <<"publish">>,
  37. <<"dest_topic">> => <<"pub/${clientid}/$1">>,
  38. <<"re">> => <<"^c/(.+)$">>,
  39. <<"source_topic">> => <<"c/#">>
  40. },
  41. #{
  42. <<"action">> => <<"subscribe">>,
  43. <<"dest_topic">> => <<"y/z/$2">>,
  44. <<"re">> => <<"^y/(.+)/z/(.+)$">>,
  45. <<"source_topic">> => <<"y/+/z/#">>
  46. },
  47. #{
  48. <<"action">> => <<"subscribe">>,
  49. <<"dest_topic">> => <<"sub/${username}/$1">>,
  50. <<"re">> => <<"^name/(.+)$">>,
  51. <<"source_topic">> => <<"name/#">>
  52. },
  53. #{
  54. <<"action">> => <<"subscribe">>,
  55. <<"dest_topic">> => <<"sub/${clientid}/$1">>,
  56. <<"re">> => <<"^c/(.+)$">>,
  57. <<"source_topic">> => <<"c/#">>
  58. },
  59. #{
  60. <<"action">> => <<"all">>,
  61. <<"dest_topic">> => <<"all/x/$2">>,
  62. <<"re">> => <<"^all/(.+)/x/(.+)$">>,
  63. <<"source_topic">> => <<"all/+/x/#">>
  64. }
  65. ]
  66. }).
  67. all() -> emqx_common_test_helpers:all(?MODULE).
  68. init_per_suite(Config) ->
  69. emqx_common_test_helpers:boot_modules(all),
  70. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, #{}),
  71. emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
  72. Config.
  73. end_per_suite(_Config) ->
  74. emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]).
  75. init_per_testcase(t_get_basic_usage_info, Config) ->
  76. ok = emqx_rewrite:update([]),
  77. Config;
  78. init_per_testcase(_TestCase, Config) ->
  79. Config.
  80. end_per_testcase(t_get_basic_usage_info, _Config) ->
  81. ok = emqx_rewrite:update([]),
  82. ok;
  83. end_per_testcase(_TestCase, _Config) ->
  84. ok.
  85. t_subscribe_rewrite(_Config) ->
  86. {ok, Conn} = init(),
  87. SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>, <<"name/1">>, <<"c/1">>],
  88. SubDestTopics = [<<"y/z/b">>, <<"y/def">>, <<"sub/u1/1">>, <<"sub/c1/1">>],
  89. {ok, _Props1, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
  90. timer:sleep(150),
  91. Subscriptions = emqx_broker:subscriptions(<<"c1">>),
  92. ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
  93. RecvTopics = [
  94. begin
  95. ok = emqtt:publish(Conn, Topic, <<"payload">>),
  96. {ok, #{topic := RecvTopic}} = receive_publish(100),
  97. RecvTopic
  98. end
  99. || Topic <- SubDestTopics
  100. ],
  101. ?assertEqual(SubDestTopics, RecvTopics),
  102. {ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics),
  103. timer:sleep(100),
  104. ?assertEqual([], emqx_broker:subscriptions(<<"c1">>)),
  105. terminate(Conn).
  106. t_publish_rewrite(_Config) ->
  107. {ok, Conn} = init(),
  108. PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"name/1">>, <<"c/1">>],
  109. PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"pub/u1/1">>, <<"pub/c1/1">>],
  110. {ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
  111. RecvTopics = [
  112. begin
  113. ok = emqtt:publish(Conn, Topic, <<"payload">>),
  114. {ok, #{topic := RecvTopic}} = receive_publish(100),
  115. RecvTopic
  116. end
  117. || Topic <- PubOrigTopics
  118. ],
  119. ?assertEqual(PubDestTopics, RecvTopics),
  120. {ok, _, _} = emqtt:unsubscribe(Conn, PubDestTopics),
  121. terminate(Conn).
  122. t_rewrite_rule(_Config) ->
  123. {PubRules, SubRules, []} = emqx_rewrite:compile(emqx:get_config([rewrite])),
  124. ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
  125. ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
  126. ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
  127. ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
  128. t_rewrite_re_error(_Config) ->
  129. Rules = [
  130. #{
  131. action => subscribe,
  132. source_topic => "y/+/z/#",
  133. re => "{^y/(.+)/z/(.+)$*",
  134. dest_topic => "\"y/z/$2"
  135. }
  136. ],
  137. Error = {
  138. "y/+/z/#",
  139. "{^y/(.+)/z/(.+)$*",
  140. "\"y/z/$2",
  141. {"nothing to repeat", 16}
  142. },
  143. ?assertEqual({[], [], [Error]}, emqx_rewrite:compile(Rules)),
  144. ok.
  145. t_list(_Config) ->
  146. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
  147. Expect = maps:get(<<"rewrite">>, ?REWRITE),
  148. ?assertEqual(Expect, emqx_rewrite:list()),
  149. ok.
  150. t_update(_Config) ->
  151. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
  152. Init = emqx_rewrite:list(),
  153. Rules = [
  154. #{
  155. <<"source_topic">> => <<"test/#">>,
  156. <<"re">> => <<"test/*">>,
  157. <<"dest_topic">> => <<"test1/$2">>,
  158. <<"action">> => <<"publish">>
  159. }
  160. ],
  161. ok = emqx_rewrite:update(Rules),
  162. ?assertEqual(Rules, emqx_rewrite:list()),
  163. ok = emqx_rewrite:update(Init),
  164. ok.
  165. t_update_disable(_Config) ->
  166. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
  167. ?assertEqual(ok, emqx_rewrite:update([])),
  168. timer:sleep(150),
  169. Subs = emqx_hooks:lookup('client.subscribe'),
  170. UnSubs = emqx_hooks:lookup('client.unsubscribe'),
  171. MessagePub = emqx_hooks:lookup('message.publish'),
  172. Filter = fun({_, {Mod, _, _}, _, _}) -> Mod =:= emqx_rewrite end,
  173. ?assertEqual([], lists:filter(Filter, Subs)),
  174. ?assertEqual([], lists:filter(Filter, UnSubs)),
  175. ?assertEqual([], lists:filter(Filter, MessagePub)),
  176. ok.
  177. t_update_re_failed(_Config) ->
  178. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
  179. Re = <<"*^test/*">>,
  180. Rules = [
  181. #{
  182. <<"source_topic">> => <<"test/#">>,
  183. <<"re">> => Re,
  184. <<"dest_topic">> => <<"test1/$2">>,
  185. <<"action">> => <<"publish">>
  186. }
  187. ],
  188. ?assertThrow(
  189. #{
  190. kind := validation_error,
  191. path := "rewrite.1.re",
  192. reason := #{
  193. regexp := <<"*^test/*">>,
  194. compile_error := {"nothing to repeat", 0}
  195. },
  196. value := <<"*^test/*">>
  197. },
  198. emqx_rewrite:update(Rules)
  199. ),
  200. ok.
  201. t_get_basic_usage_info(_Config) ->
  202. ?assertEqual(#{topic_rewrite_rule_count => 0}, emqx_rewrite:get_basic_usage_info()),
  203. RewriteTopics =
  204. lists:map(
  205. fun(N) ->
  206. Num = integer_to_binary(N),
  207. DestTopic = <<"rewrite/dest/", Num/binary>>,
  208. SourceTopic = <<"rewrite/source/", Num/binary>>,
  209. #{
  210. <<"source_topic">> => SourceTopic,
  211. <<"dest_topic">> => DestTopic,
  212. <<"action">> => all,
  213. <<"re">> => DestTopic
  214. }
  215. end,
  216. lists:seq(1, 2)
  217. ),
  218. ok = emqx_rewrite:update(RewriteTopics),
  219. ?assertEqual(#{topic_rewrite_rule_count => 2}, emqx_rewrite:get_basic_usage_info()),
  220. ok.
  221. %%--------------------------------------------------------------------
  222. %% Internal functions
  223. %%--------------------------------------------------------------------
  224. receive_publish(Timeout) ->
  225. receive
  226. {publish, Publish} -> {ok, Publish}
  227. after Timeout -> {error, timeout}
  228. end.
  229. init() ->
  230. ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
  231. ok = emqx_rewrite:enable(),
  232. {ok, C} = emqtt:start_link([{clientid, <<"c1">>}, {username, <<"u1">>}]),
  233. {ok, _} = emqtt:connect(C),
  234. {ok, C}.
  235. terminate(Conn) ->
  236. ok = emqtt:disconnect(Conn),
  237. ok = emqx_rewrite:disable().