Parcourir la source

fix(message validation): validate duplicated topics

Fixes https://emqx.atlassian.net/browse/EMQX-12254
Thales Macedo Garitezi il y a 1 an
Parent
commit
ffedce014f

+ 21 - 0
apps/emqx_message_validation/src/emqx_message_validation_schema.erl

@@ -65,6 +65,7 @@ fields(validation) ->
                 #{
                     desc => ?DESC("topics"),
                     converter => fun ensure_array/2,
+                    validator => fun validate_unique_topics/1,
                     required => true
                 }
             )},
@@ -269,3 +270,23 @@ do_validate_unique_schema_checks(
     end;
 do_validate_unique_schema_checks([_Check | Rest], Seen, Duplicated) ->
     do_validate_unique_schema_checks(Rest, Seen, Duplicated).
+
+validate_unique_topics(Topics) ->
+    Grouped = maps:groups_from_list(
+        fun(T) -> T end,
+        Topics
+    ),
+    DuplicatedMap = maps:filter(
+        fun(_T, Ts) -> length(Ts) > 1 end,
+        Grouped
+    ),
+    case maps:keys(DuplicatedMap) of
+        [] ->
+            ok;
+        Duplicated ->
+            Msg = iolist_to_binary([
+                <<"duplicated topics: ">>,
+                lists:join(", ", Duplicated)
+            ]),
+            {error, Msg}
+    end.

+ 59 - 0
apps/emqx_message_validation/test/emqx_message_validation_tests.erl

@@ -232,6 +232,65 @@ check_test_() ->
 
 duplicated_check_test_() ->
     [
+        {"duplicated topics 1",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := <<"duplicated topics: t/1">>,
+                        kind := validation_error,
+                        path := "message_validation.validations.1.topics"
+                    }
+                ]},
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [schema_check(json, <<"a">>)],
+                        #{<<"topics">> => [<<"t/1">>, <<"t/1">>]}
+                    )
+                ])
+            )},
+        {"duplicated topics 2",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := <<"duplicated topics: t/1">>,
+                        kind := validation_error,
+                        path := "message_validation.validations.1.topics"
+                    }
+                ]},
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [schema_check(json, <<"a">>)],
+                        #{<<"topics">> => [<<"t/1">>, <<"t/#">>, <<"t/1">>]}
+                    )
+                ])
+            )},
+        {"duplicated topics 3",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := <<"duplicated topics: t/1, t/2">>,
+                        kind := validation_error,
+                        path := "message_validation.validations.1.topics"
+                    }
+                ]},
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [schema_check(json, <<"a">>)],
+                        #{
+                            <<"topics">> => [
+                                <<"t/1">>,
+                                <<"t/#">>,
+                                <<"t/1">>,
+                                <<"t/2">>,
+                                <<"t/2">>
+                            ]
+                        }
+                    )
+                ])
+            )},
         {"duplicated sql checks are not checked",
             ?_assertMatch(
                 [#{<<"checks">> := [_, _]}],

+ 1 - 0
changes/ee/fix-12950.en.md

@@ -0,0 +1 @@
+Added a validation to prevent duplicated topics when configuring a message validation.