Просмотр исходного кода

Merge pull request #8068 from lafirest/feat/topic_rewrite_5

feat(rewrite): Support %u and %c placeholders in topic rewrite rules
JianBo He 3 лет назад
Родитель
Сommit
33774c8b16
2 измененных файлов с 68 добавлено и 43 удалено
  1. 32 12
      apps/emqx_modules/src/emqx_rewrite.erl
  2. 36 31
      apps/emqx_modules/test/emqx_rewrite_SUITE.erl

+ 32 - 12
apps/emqx_modules/src/emqx_rewrite.erl

@@ -23,7 +23,7 @@
 -ifdef(TEST).
 -export([
     compile/1,
-    match_and_rewrite/2
+    match_and_rewrite/3
 ]).
 -endif.
 
@@ -92,14 +92,17 @@ unregister_hook() ->
     emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
     emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
 
-rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
-    {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
+rewrite_subscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
+    Binds = fill_client_binds(ClientInfo),
+    {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
 
-rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
-    {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
+rewrite_unsubscribe(ClientInfo, _Properties, TopicFilters, Rules) ->
+    Binds = fill_client_binds(ClientInfo),
+    {ok, [{match_and_rewrite(Topic, Rules, Binds), Opts} || {Topic, Opts} <- TopicFilters]}.
 
 rewrite_publish(Message = #message{topic = Topic}, Rules) ->
-    {ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
+    Binds = fill_client_binds(Message),
+    {ok, Message#message{topic = match_and_rewrite(Topic, Rules, Binds)}}.
 
 %%--------------------------------------------------------------------
 %% Telemetry
@@ -135,15 +138,15 @@ compile(Rules) ->
         Rules
     ).
 
-match_and_rewrite(Topic, []) ->
+match_and_rewrite(Topic, [], _) ->
     Topic;
-match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) ->
+match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules], Binds) ->
     case emqx_topic:match(Topic, Filter) of
-        true -> rewrite(Topic, MP, Dest);
-        false -> match_and_rewrite(Topic, Rules)
+        true -> rewrite(Topic, MP, Dest, Binds);
+        false -> match_and_rewrite(Topic, Rules, Binds)
     end.
 
-rewrite(Topic, MP, Dest) ->
+rewrite(Topic, MP, Dest, Binds) ->
     case re:run(Topic, MP, [{capture, all_but_first, list}]) of
         {match, Captured} ->
             Vars = lists:zip(
@@ -159,9 +162,26 @@ rewrite(Topic, MP, Dest) ->
                         re:replace(Acc, Var, Val, [global])
                     end,
                     Dest,
-                    Vars
+                    Binds ++ Vars
                 )
             );
         nomatch ->
             Topic
     end.
+
+fill_client_binds(#{clientid := ClientId, username := Username}) ->
+    filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]);
+fill_client_binds(#message{from = ClientId, headers = Headers}) ->
+    Username = maps:get(username, Headers, undefined),
+    filter_client_binds([{"\\${clientid}", ClientId}, {"\\${username}", Username}]).
+
+filter_client_binds(Binds) ->
+    lists:filter(
+        fun
+            ({_, undefined}) -> false;
+            ({_, <<"">>}) -> false;
+            ({_, ""}) -> false;
+            (_) -> true
+        end,
+        Binds
+    ).

+ 36 - 31
apps/emqx_modules/test/emqx_rewrite_SUITE.erl

@@ -30,12 +30,36 @@
             <<"re">> => <<"^x/y/(.+)$">>,
             <<"source_topic">> => <<"x/#">>
         },
+        #{
+            <<"action">> => <<"publish">>,
+            <<"dest_topic">> => <<"pub/${username}/$1">>,
+            <<"re">> => <<"^name/(.+)$">>,
+            <<"source_topic">> => <<"name/#">>
+        },
+        #{
+            <<"action">> => <<"publish">>,
+            <<"dest_topic">> => <<"pub/${clientid}/$1">>,
+            <<"re">> => <<"^c/(.+)$">>,
+            <<"source_topic">> => <<"c/#">>
+        },
         #{
             <<"action">> => <<"subscribe">>,
             <<"dest_topic">> => <<"y/z/$2">>,
             <<"re">> => <<"^y/(.+)/z/(.+)$">>,
             <<"source_topic">> => <<"y/+/z/#">>
         },
+        #{
+            <<"action">> => <<"subscribe">>,
+            <<"dest_topic">> => <<"sub/${username}/$1">>,
+            <<"re">> => <<"^name/(.+)$">>,
+            <<"source_topic">> => <<"name/#">>
+        },
+        #{
+            <<"action">> => <<"subscribe">>,
+            <<"dest_topic">> => <<"sub/${clientid}/$1">>,
+            <<"re">> => <<"^c/(.+)$">>,
+            <<"source_topic">> => <<"c/#">>
+        },
         #{
             <<"action">> => <<"all">>,
             <<"dest_topic">> => <<"all/x/$2">>,
@@ -69,11 +93,11 @@ end_per_testcase(_TestCase, _Config) ->
 
 t_subscribe_rewrite(_Config) ->
     {ok, Conn} = init(),
-    SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>],
-    SubDestTopics = [<<"y/z/b">>, <<"y/def">>],
+    SubOrigTopics = [<<"y/a/z/b">>, <<"y/def">>, <<"name/1">>, <<"c/1">>],
+    SubDestTopics = [<<"y/z/b">>, <<"y/def">>, <<"sub/u1/1">>, <<"sub/c1/1">>],
     {ok, _Props1, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- SubOrigTopics]),
     timer:sleep(150),
-    Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
+    Subscriptions = emqx_broker:subscriptions(<<"c1">>),
     ?assertEqual(SubDestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
     RecvTopics = [
         begin
@@ -86,14 +110,14 @@ t_subscribe_rewrite(_Config) ->
     ?assertEqual(SubDestTopics, RecvTopics),
     {ok, _, _} = emqtt:unsubscribe(Conn, SubOrigTopics),
     timer:sleep(100),
-    ?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
+    ?assertEqual([], emqx_broker:subscriptions(<<"c1">>)),
 
     terminate(Conn).
 
 t_publish_rewrite(_Config) ->
     {ok, Conn} = init(),
-    PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
-    PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>],
+    PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"name/1">>, <<"c/1">>],
+    PubDestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"pub/u1/1">>, <<"pub/c1/1">>],
     {ok, _Props2, _} = emqtt:subscribe(Conn, [{Topic, ?QOS_1} || Topic <- PubDestTopics]),
     RecvTopics = [
         begin
@@ -109,10 +133,10 @@ t_publish_rewrite(_Config) ->
 
 t_rewrite_rule(_Config) ->
     {PubRules, SubRules, []} = emqx_rewrite:compile(emqx:get_config([rewrite])),
-    ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
-    ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
-    ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
-    ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
+    ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules, [])),
+    ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules, [])),
+    ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules, [])),
+    ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules, [])).
 
 t_rewrite_re_error(_Config) ->
     Rules = [
@@ -134,26 +158,7 @@ t_rewrite_re_error(_Config) ->
 
 t_list(_Config) ->
     ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
-    Expect = [
-        #{
-            <<"action">> => <<"publish">>,
-            <<"dest_topic">> => <<"z/y/$1">>,
-            <<"re">> => <<"^x/y/(.+)$">>,
-            <<"source_topic">> => <<"x/#">>
-        },
-        #{
-            <<"action">> => <<"subscribe">>,
-            <<"dest_topic">> => <<"y/z/$2">>,
-            <<"re">> => <<"^y/(.+)/z/(.+)$">>,
-            <<"source_topic">> => <<"y/+/z/#">>
-        },
-        #{
-            <<"action">> => <<"all">>,
-            <<"dest_topic">> => <<"all/x/$2">>,
-            <<"re">> => <<"^all/(.+)/x/(.+)$">>,
-            <<"source_topic">> => <<"all/+/x/#">>
-        }
-    ],
+    Expect = maps:get(<<"rewrite">>, ?REWRITE),
     ?assertEqual(Expect, emqx_rewrite:list()),
     ok.
 
@@ -246,7 +251,7 @@ receive_publish(Timeout) ->
 init() ->
     ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
     ok = emqx_rewrite:enable(),
-    {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
+    {ok, C} = emqtt:start_link([{clientid, <<"c1">>}, {username, <<"u1">>}]),
     {ok, _} = emqtt:connect(C),
     {ok, C}.