emqx_rewrite.erl 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_rewrite).
  17. -include_lib("emqx/include/emqx.hrl").
  18. -include_lib("emqx/include/logger.hrl").
  19. -include_lib("emqx/include/emqx_mqtt.hrl").
  20. -include_lib("emqx/include/emqx_hooks.hrl").
  21. -ifdef(TEST).
  22. -export([
  23. compile/1,
  24. match_and_rewrite/3
  25. ]).
  26. -endif.
  27. %% APIs
  28. -export([
  29. rewrite_subscribe/4,
  30. rewrite_unsubscribe/4,
  31. rewrite_publish/2
  32. ]).
  33. -export([
  34. enable/0,
  35. disable/0
  36. ]).
  37. -export([
  38. list/0,
  39. update/1,
  40. post_config_update/5
  41. ]).
  42. %% exported for `emqx_telemetry'
  43. -export([get_basic_usage_info/0]).
  44. %%--------------------------------------------------------------------
  45. %% Load/Unload
  46. %%--------------------------------------------------------------------
  47. enable() ->
  48. emqx_conf:add_handler([rewrite], ?MODULE),
  49. Rules = emqx_conf:get([rewrite], []),
  50. register_hook(Rules).
  51. disable() ->
  52. emqx_conf:remove_handler([rewrite]),
  53. unregister_hook(),
  54. ok.
  55. list() ->
  56. emqx_conf:get_raw([<<"rewrite">>], []).
  57. update(Rules0) ->
  58. {ok, _} = emqx_conf:update([rewrite], Rules0, #{override_to => cluster}),
  59. ok.
  60. post_config_update(_KeyPath, _Config, Rules, _OldConf, _AppEnvs) ->
  61. register_hook(Rules).
  62. register_hook([]) ->
  63. unregister_hook();
  64. register_hook(Rules) ->
  65. {PubRules, SubRules, ErrRules} = compile(Rules),
  66. emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}, ?HP_REWRITE),
  67. emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}, ?HP_REWRITE),
  68. emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}, ?HP_REWRITE),
  69. case ErrRules of
  70. [] ->
  71. ok;
  72. _ ->
  73. ?SLOG(error, #{rewrite_rule_re_complie_failed => ErrRules}),
  74. {error, ErrRules}
  75. end.
  76. unregister_hook() ->
  77. emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
  78. emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
  79. emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
  80. rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
  81. Binds = fill_client_binds(ClientInfo),
  82. {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
  83. rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
  84. Binds = fill_client_binds(ClientInfo),
  85. {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
  86. rewrite_publish(Message = #message{topic = Topic}, Rules) ->
  87. Binds = fill_client_binds(Message),
  88. {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
  89. %%--------------------------------------------------------------------
  90. %% Telemetry
  91. %%--------------------------------------------------------------------
  92. -spec get_basic_usage_info() -> #{topic_rewrite_rule_count => non_neg_integer()}.
  93. get_basic_usage_info() ->
  94. RewriteRules = list(),
  95. #{topic_rewrite_rule_count => length(RewriteRules)}.
  96. %%--------------------------------------------------------------------
  97. %% Internal functions
  98. %%--------------------------------------------------------------------
  99. compile(Rules) ->
  100. lists:foldl(
  101. fun(Rule, {Publish, Subscribe, Error}) ->
  102. #{source_topic := Topic, re := Re, dest_topic := Dest, action := Action} = Rule,
  103. case re:compile(Re) of
  104. {ok, MP} ->
  105. case Action of
  106. publish ->
  107. {[{Topic, MP, Dest} | Publish], Subscribe, Error};
  108. subscribe ->
  109. {Publish, [{Topic, MP, Dest} | Subscribe], Error};
  110. all ->
  111. {[{Topic, MP, Dest} | Publish], [{Topic, MP, Dest} | Subscribe], Error}
  112. end;
  113. {error, ErrSpec} ->
  114. {Publish, Subscribe, [{Topic, Re, Dest, ErrSpec}]}
  115. end
  116. end,
  117. {[], [], []},
  118. Rules
  119. ).
  120. match_and_rewrite(Topic, [], _) ->
  121. Topic;
  122. match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) ->
  123. case emqx_topic:match(Topic, Filter) of
  124. true -> rewrite(Topic, MP, Dest, Binds);
  125. false -> match_and_rewrite(Topic, Rules, Binds)
  126. end.
  127. rewrite(Topic, MP, Dest, Binds) ->
  128. case re:run(Topic, MP, [{capture, all_but_first, list}]) of
  129. {match, Captured} ->
  130. Vars = lists:zip(
  131. [
  132. "\\$" ++ integer_to_list(I)
  133. || I <- lists:seq(1, length(Captured))
  134. ],
  135. Captured
  136. ),
  137. iolist_to_binary(
  138. lists:foldl(
  139. fun({Var, Val}, Acc) ->
  140. re:replace(Acc, Var, Val, [global])
  141. end,
  142. Dest,
  143. Binds ++ Vars
  144. )
  145. );
  146. nomatch ->
  147. Topic
  148. end.
  149. fill_client_binds(#{clientid := ClientId, username := Username}) ->
  150. filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]);
  151. fill_client_binds(#message{from = ClientId, headers = Headers}) ->
  152. Username = maps:get(username, Headers, undefined),
  153. filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]).
  154. filter_client_binds(Binds) ->
  155. lists:filter(
  156. fun
  157. ({_, undefined}) -> false;
  158. ({_, <<"">>}) -> false;
  159. ({_, ""}) -> false;
  160. (_) -> true
  161. end,
  162. Binds
  163. ).