Просмотр исходного кода

Merge pull request #13728 from thalesmg/20240829-m-cluster-link-duplicate-filters

feat(cluster link): check for duplicated topic filters
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
de1c4e540d

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_http, [
 {application, emqx_bridge_http, [
     {description, "EMQX HTTP Bridge and Connector Application"},
     {description, "EMQX HTTP Bridge and Connector Application"},
-    {vsn, "0.3.4"},
+    {vsn, "0.3.5"},
     {registered, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource, ehttpc]},
     {applications, [kernel, stdlib, emqx_resource, ehttpc]},
     {env, [
     {env, [

+ 1 - 1
apps/emqx_cluster_link/src/emqx_cluster_link.app.src

@@ -2,7 +2,7 @@
 {application, emqx_cluster_link, [
 {application, emqx_cluster_link, [
     {description, "EMQX Cluster Linking"},
     {description, "EMQX Cluster Linking"},
     % strict semver, bump manually!
     % strict semver, bump manually!
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {modules, []},
     {modules, []},
     {registered, []},
     {registered, []},
     {applications, [
     {applications, [

+ 19 - 2
apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl

@@ -137,7 +137,7 @@ link_name(#{name := Name}) -> Name;
 link_name(#{<<"name">> := Name}) -> Name.
 link_name(#{<<"name">> := Name}) -> Name.
 
 
 topics_validator(Topics) ->
 topics_validator(Topics) ->
-    Errors = lists:foldl(
+    Errors0 = lists:foldl(
         fun(T, ErrAcc) ->
         fun(T, ErrAcc) ->
             try
             try
                 _ = emqx_topic:validate(T),
                 _ = emqx_topic:validate(T),
@@ -150,7 +150,8 @@ topics_validator(Topics) ->
         [],
         [],
         Topics
         Topics
     ),
     ),
-    check_errors(Errors, invalid_topics, topics).
+    Errors = validate_duplicate_topic_filters(Topics),
+    check_errors(Errors0 ++ Errors, invalid_topics, topics).
 
 
 validate_sys_link_topic(T, ErrAcc) ->
 validate_sys_link_topic(T, ErrAcc) ->
     case emqx_topic:match(T, ?TOPIC_PREFIX_WILDCARD) of
     case emqx_topic:match(T, ?TOPIC_PREFIX_WILDCARD) of
@@ -160,6 +161,22 @@ validate_sys_link_topic(T, ErrAcc) ->
             ErrAcc
             ErrAcc
     end.
     end.
 
 
+validate_duplicate_topic_filters(TopicFilters) ->
+    {Duplicated, _} =
+        lists:foldl(
+            fun(T, {Acc, Seen}) ->
+                case sets:is_element(T, Seen) of
+                    true ->
+                        {[{T, duplicate_topic_filter} | Acc], Seen};
+                    false ->
+                        {Acc, sets:add_element(T, Seen)}
+                end
+            end,
+            {[], sets:new([{version, 2}])},
+            TopicFilters
+        ),
+    Duplicated.
+
 check_errors([] = _Errors, _Reason, _ValuesField) ->
 check_errors([] = _Errors, _Reason, _ValuesField) ->
     ok;
     ok;
 check_errors(Errors, Reason, ValuesField) ->
 check_errors(Errors, Reason, ValuesField) ->

+ 23 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl

@@ -779,3 +779,26 @@ t_update_password(_Config) ->
         []
         []
     ),
     ),
     ok.
     ok.
+
+%% Checks that we forbid duplicate topic filters.
+t_duplicate_topic_filters(_Config) ->
+    ?check_trace(
+        begin
+            Name = atom_to_binary(?FUNCTION_NAME),
+            Params1 = link_params(#{<<"topics">> => [<<"t">>, <<"t">>]}),
+            ?assertMatch(
+                {400, #{
+                    <<"message">> := #{
+                        <<"reason">> := #{
+                            <<"reason">> := <<"invalid_topics">>,
+                            <<"topics">> := #{<<"t">> := <<"duplicate_topic_filter">>}
+                        }
+                    }
+                }},
+                create_link(Name, Params1)
+            ),
+            ok
+        end,
+        []
+    ),
+    ok.

+ 64 - 0
apps/emqx_cluster_link/test/emqx_cluster_link_tests.erl

@@ -0,0 +1,64 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_cluster_link_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+bin(X) -> emqx_utils_conv:bin(X).
+
+parse_and_check(InnerConfigs) ->
+    RootBin = <<"links">>,
+    RawConf = #{RootBin => InnerConfigs},
+    #{RootBin := Checked} = hocon_tconf:check_plain(
+        #{roots => [{links, emqx_cluster_link_schema:links_schema(#{})}]},
+        RawConf,
+        #{
+            required => false,
+            atom_key => false,
+            make_serializable => false
+        }
+    ),
+    Checked.
+
+link_params(Name) ->
+    link_params(Name, _Overrides = #{}).
+
+link_params(Name, Overrides) ->
+    Default = #{
+        <<"name">> => Name,
+        <<"clientid">> => <<"linkclientid">>,
+        <<"password">> => <<"my secret password">>,
+        <<"pool_size">> => 1,
+        <<"server">> => <<"emqxcl_2.nohost:31883">>,
+        <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>]
+    },
+    emqx_utils_maps:deep_merge(Default, Overrides).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+schema_test_() ->
+    [
+        {"topic filters must be unique",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := #{
+                            reason := invalid_topics,
+                            topics := [{<<"t">>, duplicate_topic_filter}]
+                        },
+                        value := [_, _],
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    link_params(<<"l1">>, #{<<"topics">> => [<<"t">>, <<"t">>]})
+                ])
+            )}
+    ].