Просмотр исходного кода

Merge pull request #13049 from thalesmg/sync-r57-m-20240514

sync `release-57` to `master`
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
202e145db7
40 измененных файлов с 352 добавлено и 257 удалено
  1. 2 2
      apps/emqx/src/emqx_hookpoints.erl
  2. 3 0
      apps/emqx/src/emqx_logger_jsonfmt.erl
  3. 1 1
      apps/emqx/src/emqx_persistent_message.erl
  4. 41 2
      apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl
  5. 2 2
      apps/emqx/test/emqx_cth_suite.erl
  6. 46 7
      apps/emqx/test/emqx_persistent_session_SUITE.erl
  7. 0 2
      apps/emqx/test/emqx_takeover_SUITE.erl
  8. 1 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl
  9. 4 4
      apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl
  10. 0 7
      apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl
  11. 2 2
      apps/emqx_enterprise/src/emqx_enterprise_schema.erl
  12. 1 1
      apps/emqx_machine/priv/reboot_lists.eterm
  13. 16 9
      apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl
  14. 15 0
      apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl
  15. 0 14
      apps/emqx_message_validation/src/emqx_message_validation.app.src
  16. 1 1
      apps/emqx_postgresql/rebar.config
  17. 3 3
      apps/emqx_prometheus/include/emqx_prometheus.hrl
  18. 7 7
      apps/emqx_prometheus/src/emqx_prometheus_api.erl
  19. 23 23
      apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl
  20. 15 15
      apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl
  21. 17 17
      apps/emqx_rule_engine/src/emqx_rule_events.erl
  22. 1 1
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl
  23. 0 0
      apps/emqx_schema_validation/BSL.txt
  24. 3 3
      apps/emqx_message_validation/README.md
  25. 0 0
      apps/emqx_schema_validation/rebar.config
  26. 14 0
      apps/emqx_schema_validation/src/emqx_schema_validation.app.src
  27. 20 20
      apps/emqx_message_validation/src/emqx_message_validation.erl
  28. 8 8
      apps/emqx_message_validation/src/emqx_message_validation_app.erl
  29. 55 55
      apps/emqx_message_validation/src/emqx_message_validation_http_api.erl
  30. 6 6
      apps/emqx_message_validation/src/emqx_message_validation_registry.erl
  31. 5 5
      apps/emqx_message_validation/src/emqx_message_validation_schema.erl
  32. 3 3
      apps/emqx_message_validation/src/emqx_message_validation_sup.erl
  33. 12 12
      apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl
  34. 15 15
      apps/emqx_message_validation/test/emqx_message_validation_tests.erl
  35. 1 0
      changes/ee/fix-13018.en.md
  36. 2 2
      mix.exs
  37. 1 1
      rebar.config.erl
  38. 4 4
      rel/i18n/emqx_prometheus_api.hocon
  39. 1 1
      rel/i18n/emqx_message_validation_http_api.hocon
  40. 1 1
      rel/i18n/emqx_message_validation_schema.hocon

+ 2 - 2
apps/emqx/src/emqx_hookpoints.erl

@@ -60,7 +60,7 @@
     'message.publish',
     'message.puback',
     'message.dropped',
-    'message.validation_failed',
+    'schema.validation_failed',
     'message.delivered',
     'message.acked',
     'delivery.dropped',
@@ -184,7 +184,7 @@ when
 -callback 'message.dropped'(emqx_types:message(), #{node => node()}, _Reason :: atom()) ->
     callback_result().
 
--callback 'message.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
+-callback 'schema.validation_failed'(emqx_types:message(), #{node => node()}, _Ctx :: map()) ->
     callback_result().
 
 -callback 'message.delivered'(emqx_types:clientinfo(), Msg) -> fold_callback_result(Msg) when

+ 3 - 0
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -34,6 +34,9 @@
 %% For CLI HTTP API outputs
 -export([best_effort_json/1, best_effort_json/2, best_effort_json_obj/1]).
 
+%% For emqx_trace_json_formatter
+-export([format_msg/3]).
+
 -ifdef(TEST).
 -include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").

+ 1 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -110,7 +110,7 @@ persist(Msg) ->
     ).
 
 needs_persistence(Msg) ->
-    not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
+    not emqx_message:get_flag(dup, Msg).
 
 -spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
 store_message(Msg) ->

+ 41 - 2
apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl

@@ -23,6 +23,8 @@
 %% logger_formatter:config/0 is not exported.
 -type config() :: map().
 
+-define(DEFAULT_FORMATTER, fun logger:format_otp_report/1).
+
 %%%-----------------------------------------------------------------
 %%% Callback Function
 %%%-----------------------------------------------------------------
@@ -31,9 +33,10 @@
     LogEvent :: logger:log_event(),
     Config :: config().
 format(
-    LogMap0,
-    #{payload_encode := PEncode}
+    LogMap,
+    #{payload_encode := PEncode} = Config
 ) ->
+    LogMap0 = maybe_format_msg(LogMap, Config),
     LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0),
     %% We just make some basic transformations on the input LogMap and then do
     %% an external call to create the JSON text
@@ -46,6 +49,42 @@ format(
 %%% Helper Functions
 %%%-----------------------------------------------------------------
 
+maybe_format_msg(#{msg := Msg, meta := Meta} = LogMap, Config) ->
+    try do_maybe_format_msg(Msg, Meta, Config) of
+        Map when is_map(Map) ->
+            LogMap#{meta => maps:merge(Meta, Map), msg => maps:get(msg, Map, "no_message")};
+        Bin when is_binary(Bin) ->
+            LogMap#{msg => Bin}
+    catch
+        C:R:S ->
+            Meta#{
+                msg => "emqx_logger_jsonfmt_format_error",
+                fmt_raw_input => Msg,
+                fmt_error => C,
+                fmt_reason => R,
+                fmt_stacktrace => S,
+                more => #{
+                    original_log_entry => LogMap,
+                    config => Config
+                }
+            }
+    end.
+
+do_maybe_format_msg(String, _Meta, _Config) when is_list(String); is_binary(String) ->
+    unicode:characters_to_binary(String);
+do_maybe_format_msg(undefined, _Meta, _Config) ->
+    #{};
+do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) ->
+    case is_map(Report) andalso Cb =:= ?DEFAULT_FORMATTER of
+        true ->
+            %% reporting a map without a customised format function
+            Report;
+        false ->
+            emqx_logger_jsonfmt:format_msg(Msg, Meta, Config)
+    end;
+do_maybe_format_msg(Msg, Meta, Config) ->
+    emqx_logger_jsonfmt:format_msg(Msg, Meta, Config).
+
 prepare_log_map(LogMap, PEncode) ->
     NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
     maps:from_list(NewKeyValuePairs).

+ 2 - 2
apps/emqx/test/emqx_cth_suite.erl

@@ -383,8 +383,8 @@ default_appspec(emqx_dashboard, _SuiteOpts) ->
     };
 default_appspec(emqx_schema_registry, _SuiteOpts) ->
     #{schema_mod => emqx_schema_registry_schema, config => #{}};
-default_appspec(emqx_message_validation, _SuiteOpts) ->
-    #{schema_mod => emqx_message_validation_schema, config => #{}};
+default_appspec(emqx_schema_validation, _SuiteOpts) ->
+    #{schema_mod => emqx_schema_validation_schema, config => #{}};
 default_appspec(_, _) ->
     #{}.
 

+ 46 - 7
apps/emqx/test/emqx_persistent_session_SUITE.erl

@@ -27,6 +27,7 @@
 -compile(nowarn_export_all).
 
 -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message).
+-define(EMQX_CONFIG, "sys_topics.sys_heartbeat_interval = 1s\n").
 
 %%--------------------------------------------------------------------
 %% SUITE boilerplate
@@ -66,19 +67,20 @@ groups() ->
 
 init_per_group(persistence_disabled, Config) ->
     [
-        {emqx_config, "session_persistence { enable = false }"},
+        {emqx_config, ?EMQX_CONFIG ++ "session_persistence { enable = false }"},
         {persistence, false}
         | Config
     ];
 init_per_group(persistence_enabled, Config) ->
     [
         {emqx_config,
-            "session_persistence {\n"
-            "  enable = true\n"
-            "  last_alive_update_interval = 100ms\n"
-            "  renew_streams_interval = 100ms\n"
-            "  session_gc_interval = 2s\n"
-            "}"},
+            ?EMQX_CONFIG ++
+                "session_persistence {\n"
+                "  enable = true\n"
+                "  last_alive_update_interval = 100ms\n"
+                "  renew_streams_interval = 100ms\n"
+                "  session_gc_interval = 2s\n"
+                "}"},
         {persistence, ds}
         | Config
     ];
@@ -1334,6 +1336,43 @@ do_t_will_message(Config, Opts) ->
     ),
     ok.
 
+t_sys_message_delivery(Config) ->
+    ConnFun = ?config(conn_fun, Config),
+    SysTopicFilter = emqx_topic:join(["$SYS", "brokers", '+', "uptime"]),
+    SysTopic = emqx_topic:join(["$SYS", "brokers", atom_to_list(node()), "uptime"]),
+    ClientId = ?config(client_id, Config),
+
+    {ok, Client1} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => 30}}
+        | Config
+    ]),
+    {ok, _} = emqtt:ConnFun(Client1),
+    {ok, _, [1]} = emqtt:subscribe(Client1, SysTopicFilter, [{qos, 1}, {rh, 2}]),
+    ?assertMatch(
+        [
+            #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime1},
+            #{topic := SysTopic, qos := 0, retain := false, payload := _Uptime2}
+        ],
+        receive_messages(2)
+    ),
+
+    ok = emqtt:disconnect(Client1),
+
+    {ok, Client2} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {properties, #{'Session-Expiry-Interval' => 30}},
+        {clean_start, false}
+        | Config
+    ]),
+    {ok, _} = emqtt:ConnFun(Client2),
+    ?assertMatch(
+        [#{topic := SysTopic, qos := 0, retain := false, payload := _Uptime3}],
+        receive_messages(1)
+    ).
+
 get_topicwise_order(Msgs) ->
     maps:groups_from_list(fun get_msgpub_topic/1, fun get_msgpub_payload/1, Msgs).
 

+ 0 - 2
apps/emqx/test/emqx_takeover_SUITE.erl

@@ -78,7 +78,6 @@ init_per_group(persistence_enabled = Group, Config) ->
         ],
         #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
     ),
-    emqx_logger:set_log_level(debug),
     [
         {apps, Apps},
         {persistence_enabled, true}
@@ -89,7 +88,6 @@ init_per_group(persistence_disabled = Group, Config) ->
         [{emqx, "session_persistence.enable = false"}],
         #{work_dir => emqx_cth_suite:work_dir(Group, Config)}
     ),
-    emqx_logger:set_log_level(debug),
     [
         {apps, Apps},
         {persistence_enabled, false}

+ 1 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3_aggreg_upload.erl

@@ -35,7 +35,7 @@
 %%-------------------------------------------------------------------------------------------------
 
 namespace() ->
-    "bridge_s3".
+    "bridge_s3_aggreg_upload".
 
 roots() ->
     [].

+ 4 - 4
apps/emqx_dashboard/src/emqx_dashboard_monitor_api.erl

@@ -189,9 +189,9 @@ swagger_desc(sent_bytes) ->
 swagger_desc(dropped) ->
     swagger_desc_format("Dropped messages ");
 swagger_desc(validation_succeeded) ->
-    swagger_desc_format("Message validations succeeded ");
+    swagger_desc_format("Schema validations succeeded ");
 swagger_desc(validation_failed) ->
-    swagger_desc_format("Message validations failed ");
+    swagger_desc_format("Schema validations failed ");
 swagger_desc(persisted) ->
     swagger_desc_format("Messages saved to the durable storage ");
 swagger_desc(subscriptions) ->
@@ -217,9 +217,9 @@ swagger_desc(sent_msg_rate) ->
 swagger_desc(dropped_msg_rate) ->
     swagger_desc_format("Dropped messages ", per);
 swagger_desc(validation_succeeded_rate) ->
-    swagger_desc_format("Message validations succeeded ", per);
+    swagger_desc_format("Schema validations succeeded ", per);
 swagger_desc(validation_failed_rate) ->
-    swagger_desc_format("Message validations failed ", per);
+    swagger_desc_format("Schema validations failed ", per);
 swagger_desc(persisted_rate) ->
     swagger_desc_format("Messages saved to the durable storage ", per);
 swagger_desc(retained_msg_count) ->

+ 0 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -339,13 +339,6 @@ get_streams(Shard, TopicFilter, StartTime) ->
             case generation_get(Shard, GenId) of
                 #{module := Mod, data := GenData} ->
                     Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
-                    ?tp(get_streams_get_gen_topic, #{
-                        gen_id => GenId,
-                        topic => TopicFilter,
-                        start_time => StartTime,
-                        streams => Streams,
-                        gen_data => GenData
-                    }),
                     [
                         {GenId, ?stream_v2(GenId, InnerStream)}
                      || InnerStream <- Streams

+ 2 - 2
apps/emqx_enterprise/src/emqx_enterprise_schema.erl

@@ -15,7 +15,7 @@
 -define(EE_SCHEMA_MODULES, [
     emqx_license_schema,
     emqx_schema_registry_schema,
-    emqx_message_validation_schema,
+    emqx_schema_validation_schema,
     emqx_ft_schema
 ]).
 
@@ -196,6 +196,6 @@ audit_log_conf() ->
 
 tr_prometheus_collectors(Conf) ->
     [
-        {'/prometheus/message_validation', emqx_prometheus_message_validation}
+        {'/prometheus/schema_validation', emqx_prometheus_schema_validation}
         | emqx_conf_schema:tr_prometheus_collectors(Conf)
     ].

+ 1 - 1
apps/emqx_machine/priv/reboot_lists.eterm

@@ -88,7 +88,7 @@
         [
             emqx_license,
             emqx_enterprise,
-            emqx_message_validation,
+            emqx_schema_validation,
             emqx_connector_aggregator,
             emqx_bridge_kafka,
             emqx_bridge_pulsar,

+ 16 - 9
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -1193,7 +1193,7 @@ t_mqueue_messages(Config) ->
     ClientId = atom_to_binary(?FUNCTION_NAME),
     Topic = <<"t/test_mqueue_msgs">>,
     Count = emqx_mgmt:default_row_limit(),
-    {ok, _Client} = client_with_mqueue(ClientId, Topic, Count),
+    ok = client_with_mqueue(ClientId, Topic, Count),
     Path = emqx_mgmt_api_test_util:api_path(["clients", ClientId, "mqueue_messages"]),
     ?assert(Count =< emqx:get_config([mqtt, max_mqueue_len])),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
@@ -1244,14 +1244,16 @@ client_with_mqueue(ClientId, Topic, Count) ->
     {ok, Client} = emqtt:start_link([
         {proto_ver, v5},
         {clientid, ClientId},
-        {clean_start, false},
+        {clean_start, true},
         {properties, #{'Session-Expiry-Interval' => 120}}
     ]),
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, Topic, 1),
+    ct:sleep(300),
     ok = emqtt:disconnect(Client),
+    ct:sleep(100),
     publish_msgs(Topic, Count),
-    {ok, Client}.
+    ok.
 
 client_with_inflight(ClientId, Topic, Count) ->
     {ok, Client} = emqtt:start_link([
@@ -1275,13 +1277,18 @@ publish_msgs(Topic, Count) ->
 
 test_messages(Path, Topic, Count, AuthHeader, PayloadEncoding, IsMqueue) ->
     Qs0 = io_lib:format("payload=~s", [PayloadEncoding]),
-    {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
-    #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
-    #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
 
-    ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
-    ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
-    ?assertEqual(length(Msgs), Count),
+    {Msgs, StartPos, Pos} = ?retry(500, 10, begin
+        {ok, MsgsResp} = emqx_mgmt_api_test_util:request_api(get, Path, Qs0, AuthHeader),
+        #{<<"meta">> := Meta, <<"data">> := Msgs} = emqx_utils_json:decode(MsgsResp),
+        #{<<"start">> := StartPos, <<"position">> := Pos} = Meta,
+
+        ?assertEqual(StartPos, msg_pos(hd(Msgs), IsMqueue)),
+        ?assertEqual(Pos, msg_pos(lists:last(Msgs), IsMqueue)),
+        ?assertEqual(length(Msgs), Count),
+
+        {Msgs, StartPos, Pos}
+    end),
 
     lists:foreach(
         fun({Seq, #{<<"payload">> := P} = M}) ->

+ 15 - 0
apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl

@@ -269,6 +269,8 @@ t_http_test_json_formatter(_Config) ->
         action_id =>
             <<"action:http:emqx_bridge_http_test_lib:connector:http:emqx_bridge_http_test_lib">>
     }),
+    %% We should handle report style logging
+    ?SLOG(error, #{msg => "recursive_republish_detected"}, #{topic => Topic}),
     ok = emqx_trace_handler_SUITE:filesync(Name, topic),
     {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")),
     {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")),
@@ -410,6 +412,19 @@ t_http_test_json_formatter(_Config) ->
         },
         NextFun()
     ),
+    ?assertMatch(
+        #{
+            <<"level">> := <<"error">>,
+            <<"meta">> :=
+                #{
+                    <<"msg">> := <<"recursive_republish_detected">>,
+                    <<"topic">> := <<"/x/y/z">>
+                },
+            <<"msg">> := <<"recursive_republish_detected">>,
+            <<"time">> := _
+        },
+        NextFun()
+    ),
     {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
     ?assertEqual(<<>>, Delete),
 

+ 0 - 14
apps/emqx_message_validation/src/emqx_message_validation.app.src

@@ -1,14 +0,0 @@
-{application, emqx_message_validation, [
-    {description, "EMQX Message Validation"},
-    {vsn, "0.1.0"},
-    {registered, [emqx_message_validation_sup, emqx_message_validation_registry]},
-    {mod, {emqx_message_validation_app, []}},
-    {applications, [
-        kernel,
-        stdlib
-    ]},
-    {env, []},
-    {modules, []},
-
-    {links, []}
-]}.

+ 1 - 1
apps/emqx_postgresql/rebar.config

@@ -2,7 +2,7 @@
 
 {erl_opts, [debug_info]}.
 {deps, [
-    {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.1"}}},
+    {epgsql, {git, "https://github.com/emqx/epgsql", {tag, "4.7.1.2"}}},
     {emqx_connector, {path, "../../apps/emqx_connector"}},
     {emqx_resource, {path, "../../apps/emqx_resource"}}
 ]}.

+ 3 - 3
apps/emqx_prometheus/include/emqx_prometheus.hrl

@@ -22,12 +22,12 @@
 -define(PROMETHEUS_AUTH_COLLECTOR, emqx_prometheus_auth).
 -define(PROMETHEUS_DATA_INTEGRATION_REGISTRY, '/prometheus/data_integration').
 -define(PROMETHEUS_DATA_INTEGRATION_COLLECTOR, emqx_prometheus_data_integration).
--define(PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, '/prometheus/message_validation').
--define(PROMETHEUS_MESSAGE_VALIDATION_COLLECTOR, emqx_prometheus_message_validation).
+-define(PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, '/prometheus/schema_validation').
+-define(PROMETHEUS_SCHEMA_VALIDATION_COLLECTOR, emqx_prometheus_schema_validation).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 -define(PROMETHEUS_EE_REGISTRIES, [
-    ?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY
+    ?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY
 ]).
 %% ELSE if(?EMQX_RELEASE_EDITION == ee).
 -else.

+ 7 - 7
apps/emqx_prometheus/src/emqx_prometheus_api.erl

@@ -49,7 +49,7 @@
     stats/2,
     auth/2,
     data_integration/2,
-    message_validation/2
+    schema_validation/2
 ]).
 
 -export([lookup_from_local_nodes/3]).
@@ -73,7 +73,7 @@ paths() ->
 
 -if(?EMQX_RELEASE_EDITION == ee).
 paths_ee() ->
-    ["/prometheus/message_validation"].
+    ["/prometheus/schema_validation"].
 %% ELSE if(?EMQX_RELEASE_EDITION == ee).
 -else.
 paths_ee() ->
@@ -139,12 +139,12 @@ schema("/prometheus/data_integration") ->
                     #{200 => prometheus_data_schema()}
             }
     };
-schema("/prometheus/message_validation") ->
+schema("/prometheus/schema_validation") ->
     #{
-        'operationId' => message_validation,
+        'operationId' => schema_validation,
         get =>
             #{
-                description => ?DESC(get_prom_message_validation),
+                description => ?DESC(get_prom_schema_validation),
                 tags => ?TAGS,
                 parameters => [ref(mode)],
                 security => security(),
@@ -223,8 +223,8 @@ auth(get, #{headers := Headers, query_string := Qs}) ->
 data_integration(get, #{headers := Headers, query_string := Qs}) ->
     collect(emqx_prometheus_data_integration, collect_opts(Headers, Qs)).
 
-message_validation(get, #{headers := Headers, query_string := Qs}) ->
-    collect(emqx_prometheus_message_validation, collect_opts(Headers, Qs)).
+schema_validation(get, #{headers := Headers, query_string := Qs}) ->
+    collect(emqx_prometheus_schema_validation, collect_opts(Headers, Qs)).
 
 %%--------------------------------------------------------------------
 %% Internal funcs

+ 23 - 23
apps/emqx_prometheus/src/emqx_prometheus_message_validation.erl

@@ -2,7 +2,7 @@
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
 
--module(emqx_prometheus_message_validation).
+-module(emqx_prometheus_schema_validation).
 
 -if(?EMQX_RELEASE_EDITION == ee).
 %% for bpapi
@@ -48,19 +48,19 @@
 -define(MG(K, MAP), maps:get(K, MAP)).
 -define(MG0(K, MAP), maps:get(K, MAP, 0)).
 
--define(metrics_data_key, message_validation_metrics_data).
+-define(metrics_data_key, schema_validation_metrics_data).
 
--define(key_enabled, emqx_message_validation_enable).
--define(key_matched, emqx_message_validation_matched).
--define(key_failed, emqx_message_validation_failed).
--define(key_succeeded, emqx_message_validation_succeeded).
+-define(key_enabled, emqx_schema_validation_enable).
+-define(key_matched, emqx_schema_validation_matched).
+-define(key_failed, emqx_schema_validation_failed).
+-define(key_succeeded, emqx_schema_validation_succeeded).
 
 %%--------------------------------------------------------------------
 %% `emqx_prometheus_cluster' API
 %%--------------------------------------------------------------------
 
 fetch_from_local_node(Mode) ->
-    Validations = emqx_message_validation:list(),
+    Validations = emqx_schema_validation:list(),
     {node(), #{
         ?metrics_data_key => to_validation_data(Mode, Validations)
     }}.
@@ -70,7 +70,7 @@ fetch_cluster_consistented_data() ->
 
 aggre_or_zip_init_acc() ->
     #{
-        ?metrics_data_key => maps:from_keys(message_validation_metric(names), [])
+        ?metrics_data_key => maps:from_keys(schema_validation_metric(names), [])
     }.
 
 logic_sum_metrics() ->
@@ -89,12 +89,12 @@ deregister_cleanup(_) -> ok.
 -spec collect_mf(_Registry, Callback) -> ok when
     _Registry :: prometheus_registry:registry(),
     Callback :: prometheus_collector:collect_mf_callback().
-collect_mf(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY, Callback) ->
+collect_mf(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY, Callback) ->
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
 
-    %% Message Validation Metrics
+    %% Schema Validation Metrics
     RuleMetricDs = ?MG(?metrics_data_key, RawData),
-    ok = add_collect_family(Callback, message_validation_metrics_meta(), RuleMetricDs),
+    ok = add_collect_family(Callback, schema_validation_metrics_meta(), RuleMetricDs),
 
     ok;
 collect_mf(_, _) ->
@@ -104,10 +104,10 @@ collect_mf(_, _) ->
 collect(<<"json">>) ->
     RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
     #{
-        message_validations => collect_json_data(?MG(?metrics_data_key, RawData))
+        schema_validations => collect_json_data(?MG(?metrics_data_key, RawData))
     };
 collect(<<"prometheus">>) ->
-    prometheus_text_format:format(?PROMETHEUS_MESSAGE_VALIDATION_REGISTRY).
+    prometheus_text_format:format(?PROMETHEUS_SCHEMA_VALIDATION_REGISTRY).
 
 %%====================
 %% API Helpers
@@ -128,7 +128,7 @@ collect_metrics(Name, Metrics) ->
 %%--------------------------------------------------------------------
 
 %%========================================
-%% Message Validation Metrics
+%% Schema Validation Metrics
 %%========================================
 collect_mv(K = ?key_enabled, Data) -> gauge_metrics(?MG(K, Data));
 collect_mv(K = ?key_matched, Data) -> counter_metrics(?MG(K, Data));
@@ -140,10 +140,10 @@ collect_mv(K = ?key_succeeded, Data) -> counter_metrics(?MG(K, Data)).
 %%--------------------------------------------------------------------
 
 %%========================================
-%% Message Validation Metrics
+%% Schema Validation Metrics
 %%========================================
 
-message_validation_metrics_meta() ->
+schema_validation_metrics_meta() ->
     [
         {?key_enabled, gauge},
         {?key_matched, counter},
@@ -151,15 +151,15 @@ message_validation_metrics_meta() ->
         {?key_succeeded, counter}
     ].
 
-message_validation_metric(names) ->
-    emqx_prometheus_cluster:metric_names(message_validation_metrics_meta()).
+schema_validation_metric(names) ->
+    emqx_prometheus_cluster:metric_names(schema_validation_metrics_meta()).
 
 to_validation_data(Mode, Validations) ->
     lists:foldl(
         fun(#{name := Name} = Validation, Acc) ->
             merge_acc_with_validations(Mode, Name, get_validation_metrics(Validation), Acc)
         end,
-        maps:from_keys(message_validation_metric(names), []),
+        maps:from_keys(schema_validation_metric(names), []),
         Validations
     ).
 
@@ -176,7 +176,7 @@ validation_point(Mode, Name, V) ->
     {with_node_label(Mode, [{validation_name, Name}]), V}.
 
 get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
-    #{counters := Counters} = emqx_message_validation_registry:get_metrics(Name),
+    #{counters := Counters} = emqx_schema_validation_registry:get_metrics(Name),
     #{
         ?key_enabled => emqx_prometheus_cluster:boolean_to_number(Enabled),
         ?key_matched => ?MG0('matched', Counters),
@@ -192,9 +192,9 @@ get_validation_metrics(#{name := Name, enable := Enabled} = _Rule) ->
 %% merge / zip formatting funcs for type `application/json`
 
 collect_json_data(Data) ->
-    emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_message_validation_metrics/3).
+    emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_schema_validation_metrics/3).
 
-zip_json_message_validation_metrics(Key, Points, [] = _AccIn) ->
+zip_json_schema_validation_metrics(Key, Points, [] = _AccIn) ->
     lists:foldl(
         fun({Labels, Metric}, AccIn2) ->
             LabelsKVMap = maps:from_list(Labels),
@@ -204,7 +204,7 @@ zip_json_message_validation_metrics(Key, Points, [] = _AccIn) ->
         [],
         Points
     );
-zip_json_message_validation_metrics(Key, Points, AllResultsAcc) ->
+zip_json_schema_validation_metrics(Key, Points, AllResultsAcc) ->
     ThisKeyResult = lists:foldl(emqx_prometheus_cluster:point_to_map_fun(Key), [], Points),
     lists:zipwith(fun maps:merge/2, AllResultsAcc, ThisKeyResult).
 

+ 15 - 15
apps/emqx_prometheus/test/emqx_prometheus_data_SUITE.erl

@@ -82,7 +82,7 @@ all() ->
         {group, '/prometheus/stats'},
         {group, '/prometheus/auth'},
         {group, '/prometheus/data_integration'},
-        [{group, '/prometheus/message_validation'} || emqx_release:edition() == ee]
+        [{group, '/prometheus/schema_validation'} || emqx_release:edition() == ee]
     ]).
 
 groups() ->
@@ -100,7 +100,7 @@ groups() ->
         {'/prometheus/stats', ModeGroups},
         {'/prometheus/auth', ModeGroups},
         {'/prometheus/data_integration', ModeGroups},
-        {'/prometheus/message_validation', ModeGroups},
+        {'/prometheus/schema_validation', ModeGroups},
         {?PROM_DATA_MODE__NODE, AcceptGroups},
         {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, AcceptGroups},
         {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, AcceptGroups},
@@ -133,7 +133,7 @@ init_per_suite(Config) ->
             emqx_bridge_http,
             emqx_connector,
             [
-                {emqx_message_validation, #{config => message_validation_config()}}
+                {emqx_schema_validation, #{config => schema_validation_config()}}
              || emqx_release:edition() == ee
             ],
             {emqx_prometheus, emqx_prometheus_SUITE:legacy_conf_default()}
@@ -166,8 +166,8 @@ init_per_group('/prometheus/auth', Config) ->
     [{module, emqx_prometheus_auth} | Config];
 init_per_group('/prometheus/data_integration', Config) ->
     [{module, emqx_prometheus_data_integration} | Config];
-init_per_group('/prometheus/message_validation', Config) ->
-    [{module, emqx_prometheus_message_validation} | Config];
+init_per_group('/prometheus/schema_validation', Config) ->
+    [{module, emqx_prometheus_schema_validation} | Config];
 init_per_group(?PROM_DATA_MODE__NODE, Config) ->
     [{mode, ?PROM_DATA_MODE__NODE} | Config];
 init_per_group(?PROM_DATA_MODE__ALL_NODES_AGGREGATED, Config) ->
@@ -239,7 +239,7 @@ assert_data(_Module, {Code, Header, RawDataBinary}, #{type := <<"prometheus">>,
     assert_prom_data(DataL, Mode);
 assert_data(Module, {Code, JsonData}, #{type := <<"json">>, mode := Mode}) ->
     ?assertEqual(Code, 200),
-    ?assert(is_map(JsonData), true),
+    ?assertMatch(#{}, JsonData),
     assert_json_data(Module, JsonData, Mode).
 
 %%%%%%%%%%%%%%%%%%%%
@@ -355,8 +355,8 @@ metric_meta(<<"emqx_schema_registrys_count">>) -> ?meta(0, 0, 0);
 metric_meta(<<"emqx_rule_", _Tail/binary>>) -> ?meta(1, 1, 2);
 metric_meta(<<"emqx_action_", _Tail/binary>>) -> ?meta(1, 1, 2);
 metric_meta(<<"emqx_connector_", _Tail/binary>>) -> ?meta(1, 1, 2);
-%% `/prometheus/message_validation`
-metric_meta(<<"emqx_message_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
+%% `/prometheus/schema_validation`
+metric_meta(<<"emqx_schema_validation_", _Tail/binary>>) -> ?meta(1, 1, 2);
 %% normal emqx metrics
 metric_meta(<<"emqx_", _Tail/binary>>) -> ?meta(0, 0, 1);
 metric_meta(_) -> #{}.
@@ -821,16 +821,16 @@ assert_json_data__data_integration_overview(M, _) ->
     ).
 -endif.
 
-assert_json_data__message_validations(Ms, _) ->
+assert_json_data__schema_validations(Ms, _) ->
     lists:foreach(
         fun(M) ->
             ?assertMatch(
                 #{
                     validation_name := _,
-                    emqx_message_validation_enable := _,
-                    emqx_message_validation_matched := _,
-                    emqx_message_validation_failed := _,
-                    emqx_message_validation_succeeded := _
+                    emqx_schema_validation_enable := _,
+                    emqx_schema_validation_matched := _,
+                    emqx_schema_validation_failed := _,
+                    emqx_schema_validation_succeeded := _
                 },
                 M
             )
@@ -838,7 +838,7 @@ assert_json_data__message_validations(Ms, _) ->
         Ms
     ).
 
-message_validation_config() ->
+schema_validation_config() ->
     Validation = #{
         <<"enable">> => true,
         <<"name">> => <<"my_validation">>,
@@ -853,7 +853,7 @@ message_validation_config() ->
         ]
     },
     #{
-        <<"message_validation">> => #{
+        <<"schema_validation">> => #{
             <<"validations">> => [Validation]
         }
     }.

+ 17 - 17
apps/emqx_rule_engine/src/emqx_rule_events.erl

@@ -45,7 +45,7 @@
     on_session_unsubscribed/4,
     on_message_publish/2,
     on_message_dropped/4,
-    on_message_validation_failed/3,
+    on_schema_validation_failed/3,
     on_message_delivered/3,
     on_message_acked/3,
     on_delivery_dropped/4,
@@ -80,7 +80,7 @@ event_names() ->
         'message.delivered',
         'message.acked',
         'message.dropped',
-        'message.validation_failed',
+        'schema.validation_failed',
         'delivery.dropped'
     ].
 
@@ -96,7 +96,7 @@ event_topics_enum() ->
         '$events/message_delivered',
         '$events/message_acked',
         '$events/message_dropped',
-        '$events/message_validation_failed',
+        '$events/schema_validation_failed',
         '$events/delivery_dropped'
         % '$events/message_publish' % not possible to use in SELECT FROM
     ].
@@ -237,13 +237,13 @@ on_message_dropped(Message, _, Reason, Conf) ->
     end,
     {ok, Message}.
 
-on_message_validation_failed(Message, ValidationContext, Conf) ->
+on_schema_validation_failed(Message, ValidationContext, Conf) ->
     case ignore_sys_message(Message) of
         true ->
             ok;
         false ->
             apply_event(
-                'message.validation_failed',
+                'schema.validation_failed',
                 fun() -> eventmsg_validation_failed(Message, ValidationContext) end,
                 Conf
             )
@@ -550,7 +550,7 @@ eventmsg_validation_failed(
 ) ->
     #{name := ValidationName} = ValidationContext,
     with_basic_columns(
-        'message.validation_failed',
+        'schema.validation_failed',
         #{
             id => emqx_guid:to_hexstr(Id),
             validation => ValidationName,
@@ -730,16 +730,16 @@ event_info() ->
 
 -if(?EMQX_RELEASE_EDITION == ee).
 %% ELSE (?EMQX_RELEASE_EDITION == ee).
-event_info_message_validation_failed() ->
+event_info_schema_validation_failed() ->
     event_info_common(
-        'message.validation_failed',
-        {<<"message validation failed">>, <<"TODO"/utf8>>},
+        'schema.validation_failed',
+        {<<"schema validation failed">>, <<"TODO"/utf8>>},
         {<<"messages that do not pass configured validations">>, <<"TODO"/utf8>>},
-        <<"SELECT * FROM \"$events/message_validation_failed\" WHERE topic =~ 't/#'">>
+        <<"SELECT * FROM \"$events/schema_validation_failed\" WHERE topic =~ 't/#'">>
     ).
 ee_event_info() ->
     [
-        event_info_message_validation_failed()
+        event_info_schema_validation_failed()
     ].
 -else.
 %% END (?EMQX_RELEASE_EDITION == ee).
@@ -931,7 +931,7 @@ test_columns(Event) ->
     ee_test_columns(Event).
 
 -if(?EMQX_RELEASE_EDITION == ee).
-ee_test_columns('message.validation_failed') ->
+ee_test_columns('schema.validation_failed') ->
     [{<<"validation">>, <<"myvalidation">>}] ++
         test_columns('message.publish').
 %% ELSE (?EMQX_RELEASE_EDITION == ee).
@@ -980,9 +980,9 @@ columns_with_exam('message.dropped') ->
         {<<"timestamp">>, erlang:system_time(millisecond)},
         {<<"node">>, node()}
     ];
-columns_with_exam('message.validation_failed') ->
+columns_with_exam('schema.validation_failed') ->
     [
-        {<<"event">>, 'message.validation_failed'},
+        {<<"event">>, 'schema.validation_failed'},
         {<<"validation">>, <<"my_validation">>},
         {<<"id">>, emqx_guid:to_hexstr(emqx_guid:gen())},
         {<<"clientid">>, <<"c_emqx">>},
@@ -1200,7 +1200,7 @@ hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
 hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
 hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
 hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
-hook_fun('message.validation_failed') -> fun ?MODULE:on_message_validation_failed/3;
+hook_fun('schema.validation_failed') -> fun ?MODULE:on_schema_validation_failed/3;
 hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
 hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
 hook_fun(Event) -> error({invalid_event, Event}).
@@ -1231,7 +1231,7 @@ event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
 event_name(<<"$events/message_delivered">>) -> 'message.delivered';
 event_name(<<"$events/message_acked">>) -> 'message.acked';
 event_name(<<"$events/message_dropped">>) -> 'message.dropped';
-event_name(<<"$events/message_validation_failed">>) -> 'message.validation_failed';
+event_name(<<"$events/schema_validation_failed">>) -> 'schema.validation_failed';
 event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
 event_name(_) -> 'message.publish'.
 
@@ -1246,7 +1246,7 @@ event_topic('session.unsubscribed') -> <<"$events/session_unsubscribed">>;
 event_topic('message.delivered') -> <<"$events/message_delivered">>;
 event_topic('message.acked') -> <<"$events/message_acked">>;
 event_topic('message.dropped') -> <<"$events/message_dropped">>;
-event_topic('message.validation_failed') -> <<"$events/message_validation_failed">>;
+event_topic('schema.validation_failed') -> <<"$events/schema_validation_failed">>;
 event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
 event_topic('message.publish') -> <<"$events/message_publish">>.
 

+ 1 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -27,7 +27,7 @@
     inc_action_metrics/2
 ]).
 
-%% Internal exports used by message validation
+%% Internal exports used by schema validation
 -export([evaluate_select/3, clear_rule_payload/0]).
 
 -import(

apps/emqx_message_validation/BSL.txt → apps/emqx_schema_validation/BSL.txt


+ 3 - 3
apps/emqx_message_validation/README.md

@@ -1,4 +1,4 @@
-# EMQX Message Validation
+# EMQX Schema Validation
 
 This application encapsulates the functionality to validate incoming or internally
 triggered published payloads and take an action upon failure, which can be to just drop
@@ -7,7 +7,7 @@ the message without further processing, or to disconnect the offending client as
 # Documentation
 
 Refer to [Message
-Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html)
+Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/schema-validation.html)
 for more information about the semantics and checks available.
 
 # HTTP APIs
@@ -16,7 +16,7 @@ APIs are provided for validation management, which includes creating,
 updating, looking up, deleting, listing validations.
 
 Refer to [API Docs -
-Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation)
+Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Schema-Validation)
 for more detailed information.
 
 

apps/emqx_message_validation/rebar.config → apps/emqx_schema_validation/rebar.config


+ 14 - 0
apps/emqx_schema_validation/src/emqx_schema_validation.app.src

@@ -0,0 +1,14 @@
+{application, emqx_schema_validation, [
+    {description, "EMQX Schema Validation"},
+    {vsn, "0.1.0"},
+    {registered, [emqx_schema_validation_sup, emqx_schema_validation_registry]},
+    {mod, {emqx_schema_validation_app, []}},
+    {applications, [
+        kernel,
+        stdlib
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 20 - 20
apps/emqx_message_validation/src/emqx_message_validation.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation).
+-module(emqx_schema_validation).
 
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("emqx_utils/include/emqx_message.hrl").
@@ -47,8 +47,8 @@
 %% Type declarations
 %%------------------------------------------------------------------------------
 
--define(TRACE_TAG, "MESSAGE_VALIDATION").
--define(CONF_ROOT, message_validation).
+-define(TRACE_TAG, "SCHEMA_VALIDATION").
+-define(CONF_ROOT, schema_validation).
 -define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
 
 -type validation_name() :: binary().
@@ -72,7 +72,7 @@ load() ->
     Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
     lists:foreach(
         fun({Pos, Validation}) ->
-            ok = emqx_message_validation_registry:insert(Pos, Validation)
+            ok = emqx_schema_validation_registry:insert(Pos, Validation)
         end,
         lists:enumerate(Validations)
     ).
@@ -81,7 +81,7 @@ unload() ->
     Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
     lists:foreach(
         fun(Validation) ->
-            ok = emqx_message_validation_registry:delete(Validation)
+            ok = emqx_schema_validation_registry:delete(Validation)
         end,
         Validations
     ).
@@ -146,7 +146,7 @@ unregister_hooks() ->
 -spec on_message_publish(emqx_types:message()) ->
     {ok, emqx_types:message()} | {stop, emqx_types:message()}.
 on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
-    case emqx_message_validation_registry:matching_validations(Topic) of
+    case emqx_schema_validation_registry:matching_validations(Topic) of
         [] ->
             ok;
         Validations ->
@@ -184,19 +184,19 @@ pre_config_update(?VALIDATIONS_CONF_PATH, {reorder, Order}, OldValidations) ->
 
 post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
     {Pos, Validation} = fetch_with_index(New, Name),
-    ok = emqx_message_validation_registry:insert(Pos, Validation),
+    ok = emqx_schema_validation_registry:insert(Pos, Validation),
     ok;
 post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
     {_Pos, OldValidation} = fetch_with_index(Old, Name),
     {Pos, NewValidation} = fetch_with_index(New, Name),
-    ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation),
+    ok = emqx_schema_validation_registry:update(OldValidation, Pos, NewValidation),
     ok;
 post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
     {_Pos, Validation} = fetch_with_index(Old, Name),
-    ok = emqx_message_validation_registry:delete(Validation),
+    ok = emqx_schema_validation_registry:delete(Validation),
     ok;
 post_config_update(?VALIDATIONS_CONF_PATH, {reorder, _Order}, New, _Old, _AppEnvs) ->
-    ok = emqx_message_validation_registry:reindex_positions(New),
+    ok = emqx_schema_validation_registry:reindex_positions(New),
     ok.
 
 %%------------------------------------------------------------------------------
@@ -395,26 +395,26 @@ run_validations(Validations, Message) ->
         emqx_rule_runtime:clear_rule_payload(),
         Fun = fun(Validation, Acc) ->
             #{name := Name} = Validation,
-            emqx_message_validation_registry:inc_matched(Name),
+            emqx_schema_validation_registry:inc_matched(Name),
             case run_validation(Validation, Message) of
                 ok ->
-                    emqx_message_validation_registry:inc_succeeded(Name),
+                    emqx_schema_validation_registry:inc_succeeded(Name),
                     {cont, Acc};
                 ignore ->
                     trace_failure(Validation, "validation_failed", #{
                         validation => Name,
                         action => ignore
                     }),
-                    emqx_message_validation_registry:inc_failed(Name),
-                    run_message_validation_failed_hook(Message, Validation),
+                    emqx_schema_validation_registry:inc_failed(Name),
+                    run_schema_validation_failed_hook(Message, Validation),
                     {cont, Acc};
                 FailureAction ->
                     trace_failure(Validation, "validation_failed", #{
                         validation => Name,
                         action => FailureAction
                     }),
-                    emqx_message_validation_registry:inc_failed(Name),
-                    run_message_validation_failed_hook(Message, Validation),
+                    emqx_schema_validation_registry:inc_failed(Name),
+                    run_schema_validation_failed_hook(Message, Validation),
                     {halt, FailureAction}
             end
         end,
@@ -457,17 +457,17 @@ trace_failure(#{log_failure := #{level := none}} = Validation, _Msg, _Meta) ->
         name := _Name,
         failure_action := _Action
     } = Validation,
-    ?tp(message_validation_failed, #{log_level => none, name => _Name, action => _Action}),
+    ?tp(schema_validation_failed, #{log_level => none, name => _Name, action => _Action}),
     ok;
 trace_failure(#{log_failure := #{level := Level}} = Validation, Msg, Meta) ->
     #{
         name := _Name,
         failure_action := _Action
     } = Validation,
-    ?tp(message_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
+    ?tp(schema_validation_failed, #{log_level => Level, name => _Name, action => _Action}),
     ?TRACE(Level, ?TRACE_TAG, Msg, Meta).
 
-run_message_validation_failed_hook(Message, Validation) ->
+run_schema_validation_failed_hook(Message, Validation) ->
     #{name := Name} = Validation,
     ValidationContext = #{name => Name},
-    emqx_hooks:run('message.validation_failed', [Message, ValidationContext]).
+    emqx_hooks:run('schema.validation_failed', [Message, ValidationContext]).

+ 8 - 8
apps/emqx_message_validation/src/emqx_message_validation_app.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_app).
+-module(emqx_schema_validation_app).
 
 -behaviour(application).
 
@@ -18,15 +18,15 @@
 
 -spec start(application:start_type(), term()) -> {ok, pid()}.
 start(_Type, _Args) ->
-    {ok, Sup} = emqx_message_validation_sup:start_link(),
-    ok = emqx_message_validation:add_handler(),
-    ok = emqx_message_validation:register_hooks(),
-    ok = emqx_message_validation:load(),
+    {ok, Sup} = emqx_schema_validation_sup:start_link(),
+    ok = emqx_schema_validation:add_handler(),
+    ok = emqx_schema_validation:register_hooks(),
+    ok = emqx_schema_validation:load(),
     {ok, Sup}.
 
 -spec stop(term()) -> ok.
 stop(_State) ->
-    ok = emqx_message_validation:unload(),
-    ok = emqx_message_validation:unregister_hooks(),
-    ok = emqx_message_validation:remove_handler(),
+    ok = emqx_schema_validation:unload(),
+    ok = emqx_schema_validation:unregister_hooks(),
+    ok = emqx_schema_validation:remove_handler(),
     ok.

+ 55 - 55
apps/emqx_message_validation/src/emqx_message_validation_http_api.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_http_api).
+-module(emqx_schema_validation_http_api).
 
 -behaviour(minirest_api).
 
@@ -21,43 +21,43 @@
 
 %% `minirest' handlers
 -export([
-    '/message_validations'/2,
-    '/message_validations/reorder'/2,
-    '/message_validations/validation/:name'/2,
-    '/message_validations/validation/:name/metrics'/2,
-    '/message_validations/validation/:name/metrics/reset'/2,
-    '/message_validations/validation/:name/enable/:enable'/2
+    '/schema_validations'/2,
+    '/schema_validations/reorder'/2,
+    '/schema_validations/validation/:name'/2,
+    '/schema_validations/validation/:name/metrics'/2,
+    '/schema_validations/validation/:name/metrics/reset'/2,
+    '/schema_validations/validation/:name/enable/:enable'/2
 ]).
 
 %%-------------------------------------------------------------------------------------------------
 %% Type definitions
 %%-------------------------------------------------------------------------------------------------
 
--define(TAGS, [<<"Message Validation">>]).
--define(METRIC_NAME, message_validation).
+-define(TAGS, [<<"Schema Validation">>]).
+-define(METRIC_NAME, schema_validation).
 
 %%-------------------------------------------------------------------------------------------------
 %% `minirest' and `minirest_trails' API
 %%-------------------------------------------------------------------------------------------------
 
-namespace() -> "message_validation_http_api".
+namespace() -> "schema_validation_http_api".
 
 api_spec() ->
     emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
 
 paths() ->
     [
-        "/message_validations",
-        "/message_validations/reorder",
-        "/message_validations/validation/:name",
-        "/message_validations/validation/:name/metrics",
-        "/message_validations/validation/:name/metrics/reset",
-        "/message_validations/validation/:name/enable/:enable"
+        "/schema_validations",
+        "/schema_validations/reorder",
+        "/schema_validations/validation/:name",
+        "/schema_validations/validation/:name/metrics",
+        "/schema_validations/validation/:name/metrics/reset",
+        "/schema_validations/validation/:name/enable/:enable"
     ].
 
-schema("/message_validations") ->
+schema("/schema_validations") ->
     #{
-        'operationId' => '/message_validations',
+        'operationId' => '/schema_validations',
         get => #{
             tags => ?TAGS,
             summary => <<"List validations">>,
@@ -67,7 +67,7 @@ schema("/message_validations") ->
                     200 =>
                         emqx_dashboard_swagger:schema_with_examples(
                             array(
-                                emqx_message_validation_schema:api_schema(list)
+                                emqx_schema_validation_schema:api_schema(list)
                             ),
                             example_return_list()
                         )
@@ -78,14 +78,14 @@ schema("/message_validations") ->
             summary => <<"Append a new validation">>,
             description => ?DESC("append_validation"),
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_message_validation_schema:api_schema(post),
+                emqx_schema_validation_schema:api_schema(post),
                 example_input_create()
             ),
             responses =>
                 #{
                     201 =>
                         emqx_dashboard_swagger:schema_with_examples(
-                            emqx_message_validation_schema:api_schema(post),
+                            emqx_schema_validation_schema:api_schema(post),
                             example_return_create()
                         ),
                     400 => error_schema('ALREADY_EXISTS', "Validation already exists")
@@ -96,14 +96,14 @@ schema("/message_validations") ->
             summary => <<"Update a validation">>,
             description => ?DESC("update_validation"),
             'requestBody' => emqx_dashboard_swagger:schema_with_examples(
-                emqx_message_validation_schema:api_schema(put),
+                emqx_schema_validation_schema:api_schema(put),
                 example_input_update()
             ),
             responses =>
                 #{
                     200 =>
                         emqx_dashboard_swagger:schema_with_examples(
-                            emqx_message_validation_schema:api_schema(put),
+                            emqx_schema_validation_schema:api_schema(put),
                             example_return_update()
                         ),
                     404 => error_schema('NOT_FOUND', "Validation not found"),
@@ -111,9 +111,9 @@ schema("/message_validations") ->
                 }
         }
     };
-schema("/message_validations/reorder") ->
+schema("/schema_validations/reorder") ->
     #{
-        'operationId' => '/message_validations/reorder',
+        'operationId' => '/schema_validations/reorder',
         post => #{
             tags => ?TAGS,
             summary => <<"Reorder all validations">>,
@@ -140,9 +140,9 @@ schema("/message_validations/reorder") ->
                 }
         }
     };
-schema("/message_validations/validation/:name") ->
+schema("/schema_validations/validation/:name") ->
     #{
-        'operationId' => '/message_validations/validation/:name',
+        'operationId' => '/schema_validations/validation/:name',
         get => #{
             tags => ?TAGS,
             summary => <<"Lookup a validation">>,
@@ -153,7 +153,7 @@ schema("/message_validations/validation/:name") ->
                     200 =>
                         emqx_dashboard_swagger:schema_with_examples(
                             array(
-                                emqx_message_validation_schema:api_schema(lookup)
+                                emqx_schema_validation_schema:api_schema(lookup)
                             ),
                             example_return_lookup()
                         ),
@@ -172,9 +172,9 @@ schema("/message_validations/validation/:name") ->
                 }
         }
     };
-schema("/message_validations/validation/:name/metrics") ->
+schema("/schema_validations/validation/:name/metrics") ->
     #{
-        'operationId' => '/message_validations/validation/:name/metrics',
+        'operationId' => '/schema_validations/validation/:name/metrics',
         get => #{
             tags => ?TAGS,
             summary => <<"Get validation metrics">>,
@@ -191,9 +191,9 @@ schema("/message_validations/validation/:name/metrics") ->
                 }
         }
     };
-schema("/message_validations/validation/:name/metrics/reset") ->
+schema("/schema_validations/validation/:name/metrics/reset") ->
     #{
-        'operationId' => '/message_validations/validation/:name/metrics/reset',
+        'operationId' => '/schema_validations/validation/:name/metrics/reset',
         post => #{
             tags => ?TAGS,
             summary => <<"Reset validation metrics">>,
@@ -206,9 +206,9 @@ schema("/message_validations/validation/:name/metrics/reset") ->
                 }
         }
     };
-schema("/message_validations/validation/:name/enable/:enable") ->
+schema("/schema_validations/validation/:name/enable/:enable") ->
     #{
-        'operationId' => '/message_validations/validation/:name/enable/:enable',
+        'operationId' => '/schema_validations/validation/:name/enable/:enable',
         post => #{
             tags => ?TAGS,
             summary => <<"Enable or disable validation">>,
@@ -285,29 +285,29 @@ fields(node_metrics) ->
 %% `minirest' handlers
 %%-------------------------------------------------------------------------------------------------
 
-'/message_validations'(get, _Params) ->
-    ?OK(emqx_message_validation:list());
-'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
+'/schema_validations'(get, _Params) ->
+    ?OK(emqx_schema_validation:list());
+'/schema_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
     with_validation(
         Name,
         return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)),
         fun() ->
-            case emqx_message_validation:insert(Params) of
+            case emqx_schema_validation:insert(Params) of
                 {ok, _} ->
-                    {ok, Res} = emqx_message_validation:lookup(Name),
+                    {ok, Res} = emqx_schema_validation:lookup(Name),
                     {201, Res};
                 {error, Error} ->
                     ?BAD_REQUEST(Error)
             end
         end
     );
-'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
+'/schema_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
     with_validation(
         Name,
         fun() ->
-            case emqx_message_validation:update(Params) of
+            case emqx_schema_validation:update(Params) of
                 {ok, _} ->
-                    {ok, Res} = emqx_message_validation:lookup(Name),
+                    {ok, Res} = emqx_schema_validation:lookup(Name),
                     {200, Res};
                 {error, Error} ->
                     ?BAD_REQUEST(Error)
@@ -316,17 +316,17 @@ fields(node_metrics) ->
         not_found()
     ).
 
-'/message_validations/validation/:name'(get, #{bindings := #{name := Name}}) ->
+'/schema_validations/validation/:name'(get, #{bindings := #{name := Name}}) ->
     with_validation(
         Name,
         fun(Validation) -> ?OK(Validation) end,
         not_found()
     );
-'/message_validations/validation/:name'(delete, #{bindings := #{name := Name}}) ->
+'/schema_validations/validation/:name'(delete, #{bindings := #{name := Name}}) ->
     with_validation(
         Name,
         fun() ->
-            case emqx_message_validation:delete(Name) of
+            case emqx_schema_validation:delete(Name) of
                 {ok, _} ->
                     ?NO_CONTENT;
                 {error, Error} ->
@@ -336,10 +336,10 @@ fields(node_metrics) ->
         not_found()
     ).
 
-'/message_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
+'/schema_validations/reorder'(post, #{body := #{<<"order">> := Order}}) ->
     do_reorder(Order).
 
-'/message_validations/validation/:name/enable/:enable'(post, #{
+'/schema_validations/validation/:name/enable/:enable'(post, #{
     bindings := #{name := Name, enable := Enable}
 }) ->
     with_validation(
@@ -348,7 +348,7 @@ fields(node_metrics) ->
         not_found()
     ).
 
-'/message_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
+'/schema_validations/validation/:name/metrics'(get, #{bindings := #{name := Name}}) ->
     with_validation(
         Name,
         fun() ->
@@ -371,7 +371,7 @@ fields(node_metrics) ->
         not_found()
     ).
 
-'/message_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
+'/schema_validations/validation/:name/metrics/reset'(post, #{bindings := #{name := Name}}) ->
     with_validation(
         Name,
         fun() ->
@@ -516,7 +516,7 @@ error_schema(Codes, Message, ExtraFields) when is_list(Codes) andalso is_binary(
     ExtraFields ++ emqx_dashboard_swagger:error_codes(Codes, Message).
 
 do_reorder(Order) ->
-    case emqx_message_validation:reorder(Order) of
+    case emqx_schema_validation:reorder(Order) of
         {ok, _} ->
             ?NO_CONTENT;
         {error,
@@ -538,7 +538,7 @@ do_reorder(Order) ->
 
 do_enable_disable(Validation, Enable) ->
     RawValidation = make_serializable(Validation),
-    case emqx_message_validation:update(RawValidation#{<<"enable">> => Enable}) of
+    case emqx_schema_validation:update(RawValidation#{<<"enable">> => Enable}) of
         {ok, _} ->
             ?NO_CONTENT;
         {error, Reason} ->
@@ -546,7 +546,7 @@ do_enable_disable(Validation, Enable) ->
     end.
 
 with_validation(Name, FoundFn, NotFoundFn) ->
-    case emqx_message_validation:lookup(Name) of
+    case emqx_schema_validation:lookup(Name) of
         {ok, Validation} ->
             {arity, Arity} = erlang:fun_info(FoundFn, arity),
             case Arity of
@@ -564,15 +564,15 @@ not_found() ->
     return(?NOT_FOUND(<<"Validation not found">>)).
 
 make_serializable(Validation) ->
-    Schema = emqx_message_validation_schema,
+    Schema = emqx_schema_validation_schema,
     RawConfig = #{
-        <<"message_validation">> => #{
+        <<"schema_validation">> => #{
             <<"validations">> =>
                 [emqx_utils_maps:binary_key_map(Validation)]
         }
     },
     #{
-        <<"message_validation">> := #{
+        <<"schema_validation">> := #{
             <<"validations">> :=
                 [Serialized]
         }

+ 6 - 6
apps/emqx_message_validation/src/emqx_message_validation_registry.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_registry).
+-module(emqx_schema_validation_registry).
 
 -behaviour(gen_server).
 
@@ -36,10 +36,10 @@
 %% Type declarations
 %%------------------------------------------------------------------------------
 
--define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index).
--define(VALIDATION_TAB, emqx_message_validation_tab).
+-define(VALIDATION_TOPIC_INDEX, emqx_schema_validation_index).
+-define(VALIDATION_TAB, emqx_schema_validation_tab).
 
--define(METRIC_NAME, message_validation).
+-define(METRIC_NAME, schema_validation).
 -define(METRICS, [
     'matched',
     'succeeded',
@@ -106,7 +106,7 @@ matching_validations(Topic) ->
 
 -spec metrics_worker_spec() -> supervisor:child_spec().
 metrics_worker_spec() ->
-    emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME).
+    emqx_metrics_worker:child_spec(schema_validation_metrics, ?METRIC_NAME).
 
 -spec get_metrics(validation_name()) -> emqx_metrics_worker:metrics().
 get_metrics(Name) ->
@@ -243,7 +243,7 @@ transform_validation(Validation = #{checks := Checks}) ->
     Validation#{checks := lists:map(fun transform_check/1, Checks)}.
 
 transform_check(#{type := sql, sql := SQL}) ->
-    {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
+    {ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
     Check;
 transform_check(Check) ->
     Check.

+ 5 - 5
apps/emqx_message_validation/src/emqx_message_validation_schema.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_schema).
+-module(emqx_schema_validation_schema).
 
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -26,12 +26,12 @@
 %% `hocon_schema' API
 %%------------------------------------------------------------------------------
 
-namespace() -> message_validation.
+namespace() -> schema_validation.
 
 roots() ->
-    [{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
+    [{schema_validation, mk(ref(schema_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
 
-fields(message_validation) ->
+fields(schema_validation) ->
     [
         {validations,
             mk(
@@ -199,7 +199,7 @@ ensure_array(L, _) when is_list(L) -> L;
 ensure_array(B, _) -> [B].
 
 validate_sql(SQL) ->
-    case emqx_message_validation:parse_sql_check(SQL) of
+    case emqx_schema_validation:parse_sql_check(SQL) of
         {ok, _Parsed} ->
             ok;
         Error = {error, _} ->

+ 3 - 3
apps/emqx_message_validation/src/emqx_message_validation_sup.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_sup).
+-module(emqx_schema_validation_sup).
 
 -behaviour(supervisor).
 
@@ -23,8 +23,8 @@ start_link() ->
 %%------------------------------------------------------------------------------
 
 init([]) ->
-    Registry = worker_spec(emqx_message_validation_registry),
-    Metrics = emqx_message_validation_registry:metrics_worker_spec(),
+    Registry = worker_spec(emqx_schema_validation_registry),
+    Metrics = emqx_schema_validation_registry:metrics_worker_spec(),
     SupFlags = #{
         strategy => one_for_one,
         intensity => 10,

+ 12 - 12
apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl

@@ -1,7 +1,7 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_http_api_SUITE).
+-module(emqx_schema_validation_http_api_SUITE).
 
 -compile(export_all).
 -compile(nowarn_export_all).
@@ -31,7 +31,7 @@ init_per_suite(Config) ->
                 emqx,
                 emqx_conf,
                 emqx_rule_engine,
-                emqx_message_validation,
+                emqx_schema_validation,
                 emqx_management,
                 emqx_mgmt_api_test_util:emqx_dashboard(),
                 emqx_schema_registry
@@ -66,9 +66,9 @@ end_per_testcase(_TestCase, _Config) ->
 clear_all_validations() ->
     lists:foreach(
         fun(#{name := Name}) ->
-            {ok, _} = emqx_message_validation:delete(Name)
+            {ok, _} = emqx_schema_validation:delete(Name)
         end,
-        emqx_message_validation:list()
+        emqx_schema_validation:list()
     ).
 
 reset_all_global_metrics() ->
@@ -146,7 +146,7 @@ schema_check(Type, SerdeName, Overrides) ->
         Overrides
     ).
 
-api_root() -> "message_validations".
+api_root() -> "schema_validations".
 
 simplify_result(Res) ->
     case Res of
@@ -358,7 +358,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
         ExpectedOrder,
         [
             N
-         || #{name := N} <- emqx_message_validation_registry:matching_validations(Topic)
+         || #{name := N} <- emqx_schema_validation_registry:matching_validations(Topic)
         ],
         Comment
     ).
@@ -366,7 +366,7 @@ assert_index_order(ExpectedOrder, Topic, Comment) ->
 create_failure_tracing_rule() ->
     Params = #{
         enable => true,
-        sql => <<"select * from \"$events/message_validation_failed\" ">>,
+        sql => <<"select * from \"$events/schema_validation_failed\" ">>,
         actions => [make_trace_fn_action()]
     },
     Path = emqx_mgmt_api_test_util:api_path(["rules"]),
@@ -689,7 +689,7 @@ t_log_failure_none(_Config) ->
             ok
         end,
         fun(Trace) ->
-            ?assertMatch([#{log_level := none}], ?of_kind(message_validation_failed, Trace)),
+            ?assertMatch([#{log_level := none}], ?of_kind(schema_validation_failed, Trace)),
             ok
         end
     ),
@@ -719,12 +719,12 @@ t_action_ignore(_Config) ->
             ok
         end,
         fun(Trace) ->
-            ?assertMatch([#{action := ignore}], ?of_kind(message_validation_failed, Trace)),
+            ?assertMatch([#{action := ignore}], ?of_kind(schema_validation_failed, Trace)),
             ok
         end
     ),
     ?assertMatch(
-        [{_, #{data := #{validation := Name1, event := 'message.validation_failed'}}}],
+        [{_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}}],
         get_traced_failures_from_rule_engine()
     ),
     ok.
@@ -1093,8 +1093,8 @@ t_multiple_validations(_Config) ->
 
     ?assertMatch(
         [
-            {_, #{data := #{validation := Name1, event := 'message.validation_failed'}}},
-            {_, #{data := #{validation := Name2, event := 'message.validation_failed'}}}
+            {_, #{data := #{validation := Name1, event := 'schema.validation_failed'}}},
+            {_, #{data := #{validation := Name2, event := 'schema.validation_failed'}}}
         ],
         get_traced_failures_from_rule_engine()
     ),

+ 15 - 15
apps/emqx_message_validation/test/emqx_message_validation_tests.erl

@@ -1,22 +1,22 @@
 %%--------------------------------------------------------------------
 %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%--------------------------------------------------------------------
--module(emqx_message_validation_tests).
+-module(emqx_schema_validation_tests).
 
 -include_lib("eunit/include/eunit.hrl").
 
--define(VALIDATIONS_PATH, "message_validation.validations").
+-define(VALIDATIONS_PATH, "schema_validation.validations").
 
 %%------------------------------------------------------------------------------
 %% Helper fns
 %%------------------------------------------------------------------------------
 
 parse_and_check(InnerConfigs) ->
-    RootBin = <<"message_validation">>,
+    RootBin = <<"schema_validation">>,
     InnerBin = <<"validations">>,
     RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
     #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
-        emqx_message_validation_schema,
+        emqx_schema_validation_schema,
         RawConf,
         #{
             required => false,
@@ -65,9 +65,9 @@ schema_check(Type, SerdeName, Overrides) ->
     ).
 
 eval_sql(Message, SQL) ->
-    {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
+    {ok, Check} = emqx_schema_validation:parse_sql_check(SQL),
     Validation = #{log_failure => #{level => warning}, name => <<"validation">>},
-    emqx_message_validation:evaluate_sql_check(Check, Validation, Message).
+    emqx_schema_validation:evaluate_sql_check(Check, Validation, Message).
 
 message() ->
     message(_Opts = #{}).
@@ -196,7 +196,7 @@ invalid_names_test_() ->
                 {_Schema, [
                     #{
                         kind := validation_error,
-                        path := "message_validation.validations.1.name"
+                        path := "schema_validation.validations.1.name"
                     }
                 ]},
                 parse_and_check([validation(InvalidName, [sql_check()])])
@@ -239,7 +239,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated topics: t/1">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.topics"
+                        path := "schema_validation.validations.1.topics"
                     }
                 ]},
                 parse_and_check([
@@ -256,7 +256,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated topics: t/1">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.topics"
+                        path := "schema_validation.validations.1.topics"
                     }
                 ]},
                 parse_and_check([
@@ -273,7 +273,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated topics: t/1, t/2">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.topics"
+                        path := "schema_validation.validations.1.topics"
                     }
                 ]},
                 parse_and_check([
@@ -320,7 +320,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated schema checks: json:a">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.checks"
+                        path := "schema_validation.validations.1.checks"
                     }
                 ]},
                 parse_and_check([
@@ -336,7 +336,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated schema checks: json:a">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.checks"
+                        path := "schema_validation.validations.1.checks"
                     }
                 ]},
                 parse_and_check([
@@ -353,7 +353,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated schema checks: json:a">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.checks"
+                        path := "schema_validation.validations.1.checks"
                     }
                 ]},
                 parse_and_check([
@@ -370,7 +370,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated schema checks: json:a">>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.checks"
+                        path := "schema_validation.validations.1.checks"
                     }
                 ]},
                 parse_and_check([
@@ -387,7 +387,7 @@ duplicated_check_test_() ->
                     #{
                         reason := <<"duplicated schema checks: ", _/binary>>,
                         kind := validation_error,
-                        path := "message_validation.validations.1.checks"
+                        path := "schema_validation.validations.1.checks"
                     }
                 ]},
                 parse_and_check([

+ 1 - 0
changes/ee/fix-13018.en.md

@@ -0,0 +1 @@
+Reduced log spamming when connection goes down in a Postgres/Timescale/Matrix connector.

+ 2 - 2
mix.exs

@@ -79,7 +79,7 @@ defmodule EMQXUmbrella.MixProject do
       # in conflict by ehttpc and emqtt
       {:gun, github: "emqx/gun", tag: "1.3.11", override: true},
       # in conflict by emqx_connector and system_monitor
-      {:epgsql, github: "emqx/epgsql", tag: "4.7.1.1", override: true},
+      {:epgsql, github: "emqx/epgsql", tag: "4.7.1.2", override: true},
       # in conflict by emqx and observer_cli
       {:recon, github: "ferd/recon", tag: "2.5.1", override: true},
       {:jsx, github: "talentdeficit/jsx", tag: "v3.1.0", override: true},
@@ -189,7 +189,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_s3,
       :emqx_bridge_s3,
       :emqx_schema_registry,
-      :emqx_message_validation,
+      :emqx_schema_validation,
       :emqx_enterprise,
       :emqx_bridge_kinesis,
       :emqx_bridge_azure_event_hub,

+ 1 - 1
rebar.config.erl

@@ -116,7 +116,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
 is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
 is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
 is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
-is_community_umbrella_app("apps/emqx_message_validation") -> false;
+is_community_umbrella_app("apps/emqx_schema_validation") -> false;
 is_community_umbrella_app("apps/emqx_eviction_agent") -> false;
 is_community_umbrella_app("apps/emqx_node_rebalance") -> false;
 is_community_umbrella_app(_) -> true.

+ 4 - 4
rel/i18n/emqx_prometheus_api.hocon

@@ -25,9 +25,9 @@ get_prom_data_integration_data.desc:
 get_prom_data_integration_data.label:
 """Prometheus Metrics for Data Integration"""
 
-get_prom_message_validation.desc:
-"""Get Prometheus Metrics for Message Validation"""
-get_prom_message_validation.label:
-"""Prometheus Metrics for Message Validation"""
+get_prom_schema_validation.desc:
+"""Get Prometheus Metrics for Schema Validation"""
+get_prom_schema_validation.label:
+"""Prometheus Metrics for Schema Validation"""
 
 }

+ 1 - 1
rel/i18n/emqx_message_validation_http_api.hocon

@@ -1,4 +1,4 @@
-emqx_message_validation_http_api {
+emqx_schema_validation_http_api {
 
   list_validations.desc:
   """List validations"""

+ 1 - 1
rel/i18n/emqx_message_validation_schema.hocon

@@ -1,4 +1,4 @@
-emqx_message_validation_schema {
+emqx_schema_validation_schema {
 
   check_avro_type.desc:
   """Avro schema check"""