Просмотр исходного кода

feat: implement message validation

Fixes https://emqx.atlassian.net/browse/EMQX-11980
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
f84a996671
34 измененных файлов с 2653 добавлено и 79 удалено
  1. 1 0
      apps/emqx/include/emqx_hooks.hrl
  2. 6 0
      apps/emqx/src/emqx_broker.erl
  3. 14 3
      apps/emqx/src/emqx_channel.erl
  4. 7 5
      apps/emqx/src/emqx_types.erl
  5. 4 0
      apps/emqx/test/emqx_cth_suite.erl
  6. 1 0
      apps/emqx_machine/priv/reboot_lists.eterm
  7. 94 0
      apps/emqx_message_validation/BSL.txt
  8. 29 0
      apps/emqx_message_validation/README.md
  9. 16 0
      apps/emqx_message_validation/rebar.config
  10. 14 0
      apps/emqx_message_validation/src/emqx_message_validation.app.src
  11. 380 0
      apps/emqx_message_validation/src/emqx_message_validation.erl
  12. 32 0
      apps/emqx_message_validation/src/emqx_message_validation_app.erl
  13. 385 0
      apps/emqx_message_validation/src/emqx_message_validation_http_api.erl
  14. 225 0
      apps/emqx_message_validation/src/emqx_message_validation_registry.erl
  15. 206 0
      apps/emqx_message_validation/src/emqx_message_validation_schema.erl
  16. 47 0
      apps/emqx_message_validation/src/emqx_message_validation_sup.erl
  17. 690 0
      apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl
  18. 194 0
      apps/emqx_message_validation/test/emqx_message_validation_tests.erl
  19. 0 6
      apps/emqx_rule_engine/src/emqx_rule_engine.erl
  20. 2 1
      apps/emqx_rule_engine/src/emqx_rule_engine_app.erl
  21. 2 1
      apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
  22. 20 8
      apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl
  23. 42 19
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl
  24. 61 27
      apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl
  25. 12 5
      apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl
  26. 13 1
      apps/emqx_utils/src/emqx_utils.erl
  27. 3 1
      apps/emqx_utils/src/emqx_utils_maps.erl
  28. 34 0
      apps/emqx_utils/test/emqx_utils_tests.erl
  29. 3 0
      changes/ce/feat-12711.en.md
  30. 2 1
      mix.exs
  31. 1 1
      rebar.config
  32. 1 0
      rebar.config.erl
  33. 24 0
      rel/i18n/emqx_message_validation_http_api.hocon
  34. 88 0
      rel/i18n/emqx_message_validation_schema.hocon

+ 1 - 0
apps/emqx/include/emqx_hooks.hrl

@@ -25,6 +25,7 @@
 -define(HP_AUTHN, 970).
 -define(HP_AUTHZ, 960).
 -define(HP_SYS_MSGS, 950).
+-define(HP_MSG_VALIDATION, 945).
 -define(HP_TOPIC_METRICS, 940).
 -define(HP_RETAINER, 930).
 -define(HP_AUTO_SUB, 920).

+ 6 - 0
apps/emqx/src/emqx_broker.erl

@@ -235,6 +235,12 @@ publish(Msg) when is_record(Msg, message) ->
     _ = emqx_trace:publish(Msg),
     emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
     case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
+        #message{headers = #{should_disconnect := true}, topic = Topic} ->
+            ?TRACE("MQTT", "msg_publish_not_allowed_disconnect", #{
+                message => emqx_message:to_log_map(Msg),
+                topic => Topic
+            }),
+            disconnect;
         #message{headers = #{allow_publish := false}, topic = Topic} ->
             ?TRACE("MQTT", "msg_publish_not_allowed", #{
                 message => emqx_message:to_log_map(Msg),

+ 14 - 3
apps/emqx/src/emqx_channel.erl

@@ -702,14 +702,21 @@ packet_to_message(Packet, #channel{
 
 do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
     Result = emqx_broker:publish(Msg),
-    NChannel = ensure_quota(Result, Channel),
-    {ok, NChannel};
+    case Result of
+        disconnect ->
+            handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
+        _ ->
+            NChannel = ensure_quota(Result, Channel),
+            {ok, NChannel}
+    end;
 do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
     PubRes = emqx_broker:publish(Msg),
     RC = puback_reason_code(PacketId, Msg, PubRes),
     case RC of
         undefined ->
             {ok, Channel};
+        disconnect ->
+            handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
         _Value ->
             do_finish_publish(PacketId, PubRes, RC, Channel)
     end;
@@ -719,6 +726,8 @@ do_publish(
     Channel = #channel{clientinfo = ClientInfo, session = Session}
 ) ->
     case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
+        {ok, disconnect, _NSession} ->
+            handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
         {ok, PubRes, NSession} ->
             RC = pubrec_reason_code(PubRes),
             NChannel0 = Channel#channel{session = NSession},
@@ -763,7 +772,9 @@ pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
 puback_reason_code(PacketId, Msg, [] = PubRes) ->
     emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
 puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
-    emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS).
+    emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS);
+puback_reason_code(_PacketId, _Msg, disconnect) ->
+    disconnect.
 
 -compile({inline, [after_message_acked/3]}).
 after_message_acked(ClientInfo, Msg, PubAckProps) ->

+ 7 - 5
apps/emqx/src/emqx_types.erl

@@ -258,11 +258,13 @@
 -type deliver() :: {deliver, topic(), message()}.
 -type delivery() :: #delivery{}.
 -type deliver_result() :: ok | {ok, non_neg_integer()} | {error, term()}.
--type publish_result() :: [
-    {node(), topic(), deliver_result()}
-    | {share, topic(), deliver_result()}
-    | persisted
-].
+-type publish_result() ::
+    [
+        {node(), topic(), deliver_result()}
+        | {share, topic(), deliver_result()}
+        | persisted
+    ]
+    | disconnect.
 -type route() :: #route{}.
 -type route_entry() :: {topic(), node()} | {topic, group()}.
 -type command() :: #command{}.

+ 4 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -380,6 +380,10 @@ default_appspec(emqx_dashboard, _SuiteOpts) ->
             true = emqx_dashboard_listener:is_ready(infinity)
         end
     };
+default_appspec(emqx_schema_registry, _SuiteOpts) ->
+    #{schema_mod => emqx_schema_registry_schema, config => #{}};
+default_appspec(emqx_message_validation, _SuiteOpts) ->
+    #{schema_mod => emqx_message_validation_schema, config => #{}};
 default_appspec(_, _) ->
     #{}.
 

+ 1 - 0
apps/emqx_machine/priv/reboot_lists.eterm

@@ -88,6 +88,7 @@
         [
             emqx_license,
             emqx_enterprise,
+            emqx_message_validation,
             emqx_bridge_kafka,
             emqx_bridge_pulsar,
             emqx_bridge_gcp_pubsub,

+ 94 - 0
apps/emqx_message_validation/BSL.txt

@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor:             Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work:        EMQX Enterprise Edition
+                      The Licensed Work is (c) 2023
+                      Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+                      modify, and create derivative work for research
+                      or education.
+Change Date:          2028-01-26
+Change License:       Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+   or a license that is compatible with GPL Version 2.0 or a later version,
+   where “compatible” means that software provided under the Change License can
+   be included in a program with software provided under GPL Version 2.0 or a
+   later version. Licensor may specify additional Change Licenses without
+   limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+   impose any additional restriction on the right granted in this License, as
+   the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.

+ 29 - 0
apps/emqx_message_validation/README.md

@@ -0,0 +1,29 @@
+# EMQX Message Validation
+
+This application encapsulates the functionality to validate incoming or internally
+triggered published payloads and take an action upon failure, which can be to just drop
+the message without further processing, or to disconnect the offending client as well.
+
+# Documentation
+
+Refer to [Message
+Validation](https://docs.emqx.com/en/enterprise/latest/data-integration/message-validation.html)
+for more information about the semantics and checks available.
+
+# HTTP APIs
+
+APIs are provided for validation management, which includes creating,
+updating, looking up, deleting, listing validations.
+
+Refer to [API Docs -
+Bridges](https://docs.emqx.com/en/enterprise/latest/admin/api-docs.html#tag/Message-Validation)
+for more detailed information.
+
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

+ 16 - 0
apps/emqx_message_validation/rebar.config

@@ -0,0 +1,16 @@
+%% -*- mode: erlang -*-
+
+{erl_opts, [
+    warn_unused_vars,
+    warn_shadow_vars,
+    warn_unused_import,
+    warn_obsolete_guard,
+    warnings_as_errors,
+    debug_info
+]}.
+{deps, [
+    {emqx, {path, "../emqx"}},
+    {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_rule_engine, {path, "../emqx_rule_engine"}},
+    {emqx_schema_registry, {path, "../emqx_schema_registry"}}
+]}.

+ 14 - 0
apps/emqx_message_validation/src/emqx_message_validation.app.src

@@ -0,0 +1,14 @@
+{application, emqx_message_validation, [
+    {description, "EMQX Message Validation"},
+    {vsn, "0.1.0"},
+    {registered, [emqx_message_validation_sup, emqx_message_validation_registry]},
+    {mod, {emqx_message_validation_app, []}},
+    {applications, [
+        kernel,
+        stdlib
+    ]},
+    {env, []},
+    {modules, []},
+
+    {links, []}
+]}.

+ 380 - 0
apps/emqx_message_validation/src/emqx_message_validation.erl

@@ -0,0 +1,380 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation).
+
+-include_lib("emqx_utils/include/emqx_message.hrl").
+-include_lib("emqx/include/emqx_hooks.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% API
+-export([
+    add_handler/0,
+    remove_handler/0,
+
+    load/0,
+    unload/0,
+
+    list/0,
+    move/2,
+    lookup/1,
+    insert/1,
+    update/1,
+    delete/1
+]).
+
+%% `emqx_hooks' API
+-export([
+    register_hooks/0,
+    unregister_hooks/0,
+
+    on_message_publish/1
+]).
+
+%% `emqx_config_handler' API
+-export([pre_config_update/3, post_config_update/5]).
+
+%% Internal exports
+-export([parse_sql_check/1]).
+
+%% Internal functions; exported for tests
+-export([
+    evaluate_sql_check/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(TRACE_TAG, "MESSAGE_VALIDATION").
+-define(CONF_ROOT, message_validation).
+-define(VALIDATIONS_CONF_PATH, [?CONF_ROOT, validations]).
+
+-type validation_name() :: binary().
+-type validation() :: _TODO.
+-type position() :: front | rear | {'after', validation_name()} | {before, validation_name()}.
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec add_handler() -> ok.
+add_handler() ->
+    ok = emqx_config_handler:add_handler(?VALIDATIONS_CONF_PATH, ?MODULE),
+    ok.
+
+-spec remove_handler() -> ok.
+remove_handler() ->
+    ok = emqx_config_handler:remove_handler(?VALIDATIONS_CONF_PATH),
+    ok.
+
+load() ->
+    lists:foreach(fun insert/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])).
+
+unload() ->
+    lists:foreach(fun delete/1, emqx:get_config(?VALIDATIONS_CONF_PATH, [])).
+
+-spec list() -> [validation()].
+list() ->
+    emqx:get_config(?VALIDATIONS_CONF_PATH, []).
+
+-spec move(validation_name(), position()) ->
+    {ok, _} | {error, _}.
+move(Name, Position) ->
+    emqx:update_config(
+        ?VALIDATIONS_CONF_PATH,
+        {move, Name, Position},
+        #{override_to => cluster}
+    ).
+
+-spec lookup(validation_name()) -> {ok, validation()} | {error, not_found}.
+lookup(Name) ->
+    Validations = emqx:get_config(?VALIDATIONS_CONF_PATH, []),
+    do_lookup(Name, Validations).
+
+-spec insert(validation()) ->
+    {ok, _} | {error, _}.
+insert(Validation) ->
+    emqx:update_config(
+        ?VALIDATIONS_CONF_PATH,
+        {append, Validation},
+        #{override_to => cluster}
+    ).
+
+-spec update(validation()) ->
+    {ok, _} | {error, _}.
+update(Validation) ->
+    emqx:update_config(
+        ?VALIDATIONS_CONF_PATH,
+        {update, Validation},
+        #{override_to => cluster}
+    ).
+
+-spec delete(validation_name()) ->
+    {ok, _} | {error, _}.
+delete(Name) ->
+    emqx:update_config(
+        ?VALIDATIONS_CONF_PATH,
+        {delete, Name},
+        #{override_to => cluster}
+    ).
+
+%%------------------------------------------------------------------------------
+%% Hooks
+%%------------------------------------------------------------------------------
+
+-spec register_hooks() -> ok.
+register_hooks() ->
+    emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_MSG_VALIDATION).
+
+-spec unregister_hooks() -> ok.
+unregister_hooks() ->
+    emqx_hooks:del('message.publish', {?MODULE, on_message_publish}).
+
+-spec on_message_publish(emqx_types:message()) ->
+    {ok, emqx_types:message()} | {stop, emqx_types:message()}.
+on_message_publish(Message = #message{topic = Topic, headers = Headers}) ->
+    case emqx_message_validation_registry:matching_validations(Topic) of
+        [] ->
+            ok;
+        Validations ->
+            case run_validations(Validations, Message) of
+                ok ->
+                    {ok, Message};
+                drop ->
+                    {stop, Message#message{headers = Headers#{allow_publish => false}}};
+                disconnect ->
+                    {stop, Message#message{
+                        headers = Headers#{
+                            allow_publish => false,
+                            should_disconnect => true
+                        }
+                    }}
+            end
+    end.
+
+%%------------------------------------------------------------------------------
+%% `emqx_config_handler' API
+%%------------------------------------------------------------------------------
+
+pre_config_update(?VALIDATIONS_CONF_PATH, {append, Validation}, OldValidations) ->
+    Validations = OldValidations ++ [Validation],
+    {ok, Validations};
+pre_config_update(?VALIDATIONS_CONF_PATH, {update, Validation}, OldValidations) ->
+    replace(OldValidations, Validation);
+pre_config_update(?VALIDATIONS_CONF_PATH, {delete, Validation}, OldValidations) ->
+    delete(OldValidations, Validation);
+pre_config_update(?VALIDATIONS_CONF_PATH, {move, Name, Position}, OldValidations) ->
+    move(OldValidations, Name, Position).
+
+post_config_update(?VALIDATIONS_CONF_PATH, {append, #{<<"name">> := Name}}, New, _Old, _AppEnvs) ->
+    {Pos, Validation} = fetch_with_index(New, Name),
+    ok = emqx_message_validation_registry:insert(Pos, Validation),
+    ok;
+post_config_update(?VALIDATIONS_CONF_PATH, {update, #{<<"name">> := Name}}, New, Old, _AppEnvs) ->
+    {_Pos, OldValidation} = fetch_with_index(Old, Name),
+    {Pos, NewValidation} = fetch_with_index(New, Name),
+    ok = emqx_message_validation_registry:update(OldValidation, Pos, NewValidation),
+    ok;
+post_config_update(?VALIDATIONS_CONF_PATH, {delete, Name}, _New, Old, _AppEnvs) ->
+    {_Pos, Validation} = fetch_with_index(Old, Name),
+    ok = emqx_message_validation_registry:delete(Validation),
+    ok;
+post_config_update(?VALIDATIONS_CONF_PATH, {move, _Name, _Position}, New, _Old, _AppEnvs) ->
+    ok = emqx_message_validation_registry:reindex_positions(New),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+parse_sql_check(SQL) ->
+    case emqx_rule_sqlparser:parse(SQL, #{with_from => false}) of
+        {ok, Select} ->
+            case emqx_rule_sqlparser:select_is_foreach(Select) of
+                true ->
+                    {error, foreach_not_allowed};
+                false ->
+                    Check = #{
+                        type => sql,
+                        fields => emqx_rule_sqlparser:select_fields(Select),
+                        conditions => emqx_rule_sqlparser:select_where(Select)
+                    },
+                    {ok, Check}
+            end;
+        Error = {error, _} ->
+            Error
+    end.
+
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+evaluate_sql_check(Check, Message) ->
+    #{
+        fields := Fields,
+        conditions := Conditions
+    } = Check,
+    {Data, _} = emqx_rule_events:eventmsg_publish(Message),
+    try emqx_rule_runtime:evaluate_select(Fields, Data, Conditions) of
+        {ok, _} ->
+            true;
+        false ->
+            false
+    catch
+        throw:_Reason ->
+            %% TODO: log?
+            false;
+        _Class:_Error:_Stacktrace ->
+            %% TODO: log?
+            false
+    end.
+
+evaluate_schema_check(Check, #message{payload = Data}) ->
+    #{schema := SerdeName} = Check,
+    ExtraArgs =
+        case Check of
+            #{type := protobuf, message_name := MessageName} ->
+                [MessageName];
+            _ ->
+                []
+        end,
+    try
+        emqx_schema_registry_serde:handle_rule_function(schema_check, [SerdeName, Data | ExtraArgs])
+    catch
+        error:{serde_not_found, _} ->
+            false;
+        _Class:_Error:_Stacktrace ->
+            %% TODO: log?
+            false
+    end.
+
+replace(OldValidations, Validation = #{<<"name">> := Name}) ->
+    {Found, RevNewValidations} =
+        lists:foldl(
+            fun
+                (#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
+                    {true, [Validation | Acc]};
+                (Val, {FoundIn, Acc}) ->
+                    {FoundIn, [Val | Acc]}
+            end,
+            {false, []},
+            OldValidations
+        ),
+    case Found of
+        true ->
+            {ok, lists:reverse(RevNewValidations)};
+        false ->
+            {error, not_found}
+    end.
+
+delete(OldValidations, Name) ->
+    {Found, RevNewValidations} =
+        lists:foldl(
+            fun
+                (#{<<"name">> := NameIn}, {_FoundIn, Acc}) when NameIn =:= Name ->
+                    {true, Acc};
+                (Val, {FoundIn, Acc}) ->
+                    {FoundIn, [Val | Acc]}
+            end,
+            {false, []},
+            OldValidations
+        ),
+    case Found of
+        true ->
+            {ok, lists:reverse(RevNewValidations)};
+        false ->
+            {error, not_found}
+    end.
+
+move(OldValidations, Name, front) ->
+    {Validation, Front, Rear} = take(Name, OldValidations),
+    {ok, [Validation | Front ++ Rear]};
+move(OldValidations, Name, rear) ->
+    {Validation, Front, Rear} = take(Name, OldValidations),
+    {ok, Front ++ Rear ++ [Validation]};
+move(OldValidations, Name, {'after', OtherName}) ->
+    {Validation, Front1, Rear1} = take(Name, OldValidations),
+    {OtherValidation, Front2, Rear2} = take(OtherName, Front1 ++ Rear1),
+    {ok, Front2 ++ [OtherValidation, Validation] ++ Rear2};
+move(OldValidations, Name, {before, OtherName}) ->
+    {Validation, Front1, Rear1} = take(Name, OldValidations),
+    {OtherValidation, Front2, Rear2} = take(OtherName, Front1 ++ Rear1),
+    {ok, Front2 ++ [Validation, OtherValidation] ++ Rear2}.
+
+fetch_with_index([{Pos, #{name := Name} = Validation} | _Rest], Name) ->
+    {Pos, Validation};
+fetch_with_index([{_, _} | Rest], Name) ->
+    fetch_with_index(Rest, Name);
+fetch_with_index(Validations, Name) ->
+    fetch_with_index(lists:enumerate(Validations), Name).
+
+take(Name, Validations) ->
+    case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Validations) of
+        {_Front, []} ->
+            throw({validation_not_found, Name});
+        {Front, [Found | Rear]} ->
+            {Found, Front, Rear}
+    end.
+
+do_lookup(_Name, _Validations = []) ->
+    {error, not_found};
+do_lookup(Name, [#{name := Name} = Validation | _Rest]) ->
+    {ok, Validation};
+do_lookup(Name, [_ | Rest]) ->
+    do_lookup(Name, Rest).
+
+run_validations(Validations, Message) ->
+    try
+        emqx_rule_runtime:clear_rule_payload(),
+        Fun = fun(Validation, Acc) ->
+            #{
+                name := Name,
+                log_failure_at := FailureLogLevel
+            } = Validation,
+            case run_validation(Validation, Message) of
+                ok ->
+                    {cont, Acc};
+                FailureAction ->
+                    ?TRACE(
+                        FailureLogLevel,
+                        ?TRACE_TAG,
+                        "validation_failed",
+                        #{validation => Name, action => FailureAction}
+                    ),
+                    {halt, FailureAction}
+            end
+        end,
+        emqx_utils:foldl_while(Fun, _Passed = ok, Validations)
+    after
+        emqx_rule_runtime:clear_rule_payload()
+    end.
+
+run_validation(#{strategy := all_pass} = Validation, Message) ->
+    #{
+        checks := Checks,
+        failure_action := FailureAction
+    } = Validation,
+    Fun = fun(Check, Acc) ->
+        case run_check(Check, Message) of
+            true -> {cont, Acc};
+            false -> {halt, FailureAction}
+        end
+    end,
+    emqx_utils:foldl_while(Fun, _Passed = ok, Checks);
+run_validation(#{strategy := any_pass} = Validation, Message) ->
+    #{
+        checks := Checks,
+        failure_action := FailureAction
+    } = Validation,
+    case lists:any(fun(C) -> run_check(C, Message) end, Checks) of
+        true ->
+            ok;
+        false ->
+            FailureAction
+    end.
+
+run_check(#{type := sql} = Check, Message) ->
+    evaluate_sql_check(Check, Message);
+run_check(Check, Message) ->
+    evaluate_schema_check(Check, Message).

+ 32 - 0
apps/emqx_message_validation/src/emqx_message_validation_app.erl

@@ -0,0 +1,32 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_app).
+
+-behaviour(application).
+
+%% `application' API
+-export([start/2, stop/1]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% `application' API
+%%------------------------------------------------------------------------------
+
+-spec start(application:start_type(), term()) -> {ok, pid()}.
+start(_Type, _Args) ->
+    {ok, Sup} = emqx_message_validation_sup:start_link(),
+    ok = emqx_message_validation:add_handler(),
+    ok = emqx_message_validation:register_hooks(),
+    ok = emqx_message_validation:load(),
+    {ok, Sup}.
+
+-spec stop(term()) -> ok.
+stop(_State) ->
+    ok = emqx_message_validation:unload(),
+    ok = emqx_message_validation:unregister_hooks(),
+    ok = emqx_message_validation:remove_handler(),
+    ok.

+ 385 - 0
apps/emqx_message_validation/src/emqx_message_validation_http_api.erl

@@ -0,0 +1,385 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_http_api).
+
+-behaviour(minirest_api).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
+
+%% `minirest' and `minirest_trails' API
+-export([
+    namespace/0,
+    api_spec/0,
+    fields/1,
+    paths/0,
+    schema/1
+]).
+
+%% `minirest' handlers
+-export([
+    '/message_validations'/2,
+    '/message_validations/:name'/2,
+    '/message_validations/:name/move'/2
+]).
+
+%%-------------------------------------------------------------------------------------------------
+%% Type definitions
+%%-------------------------------------------------------------------------------------------------
+
+-define(TAGS, [<<"Message Validation">>]).
+
+%%-------------------------------------------------------------------------------------------------
+%% `minirest' and `minirest_trails' API
+%%-------------------------------------------------------------------------------------------------
+
+namespace() -> "message_validation_http_api".
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/message_validations",
+        "/message_validations/:name",
+        "/message_validations/:name/move"
+    ].
+
+schema("/message_validations") ->
+    #{
+        'operationId' => '/message_validations',
+        get => #{
+            tags => ?TAGS,
+            summary => <<"List validations">>,
+            description => ?DESC("list_validations"),
+            responses =>
+                #{
+                    200 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            hoconsc:array(
+                                emqx_message_validation_schema:api_schema(list)
+                            ),
+                            #{
+                                sample =>
+                                    #{value => example_return_list()}
+                            }
+                        )
+                }
+        },
+        post => #{
+            tags => ?TAGS,
+            summary => <<"Append a new validation">>,
+            description => ?DESC("append_validation"),
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                emqx_message_validation_schema:api_schema(post),
+                example_input_create()
+            ),
+            responses =>
+                #{
+                    201 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            emqx_message_validation_schema:api_schema(post),
+                            example_return_create()
+                        ),
+                    400 => error_schema('ALREADY_EXISTS', "Validation already exists")
+                }
+        },
+        put => #{
+            tags => ?TAGS,
+            summary => <<"Update a validation">>,
+            description => ?DESC("update_validation"),
+            'requestBody' => emqx_dashboard_swagger:schema_with_examples(
+                emqx_message_validation_schema:api_schema(put),
+                example_input_update()
+            ),
+            responses =>
+                #{
+                    200 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            emqx_message_validation_schema:api_schema(put),
+                            example_return_update()
+                        ),
+                    404 => error_schema('NOT_FOUND', "Validation not found"),
+                    400 => error_schema('BAD_REQUEST', "Bad params")
+                }
+        }
+    };
+schema("/message_validations/:name") ->
+    #{
+        'operationId' => '/message_validations/:name',
+        get => #{
+            tags => ?TAGS,
+            summary => <<"Lookup a validation">>,
+            description => ?DESC("lookup_validation"),
+            parameters => [param_path_name()],
+            responses =>
+                #{
+                    200 =>
+                        emqx_dashboard_swagger:schema_with_examples(
+                            hoconsc:array(
+                                emqx_message_validation_schema:api_schema(lookup)
+                            ),
+                            #{
+                                sample =>
+                                    #{value => example_return_lookup()}
+                            }
+                        ),
+                    404 => error_schema('NOT_FOUND', "Validation not found")
+                }
+        },
+        delete => #{
+            tags => ?TAGS,
+            summary => <<"Delete a validation">>,
+            description => ?DESC("delete_validation"),
+            parameters => [param_path_name()],
+            responses =>
+                #{
+                    204 => <<"Validation deleted">>,
+                    404 => error_schema('NOT_FOUND', "Validation not found")
+                }
+        }
+    };
+schema("/message_validations/:name/move") ->
+    #{
+        'operationId' => '/message_validations/:name/move',
+        post => #{
+            tags => ?TAGS,
+            summary => <<"Change the order of a validation">>,
+            description => ?DESC("move_validation"),
+            parameters => [param_path_name()],
+            'requestBody' =>
+                emqx_dashboard_swagger:schema_with_examples(
+                    hoconsc:union(fun position_union_member_selector/1),
+                    example_position()
+                ),
+            responses =>
+                #{
+                    204 => <<"No Content">>,
+                    400 => error_schema('BAD_REQUEST', <<"Bad request">>),
+                    404 => error_schema('NOT_FOUND', "Validation not found")
+                }
+        }
+    }.
+
+param_path_name() ->
+    {name,
+        mk(
+            binary(),
+            #{
+                in => path,
+                required => true,
+                example => <<"my_validation">>,
+                desc => ?DESC("param_path_name")
+            }
+        )}.
+
+fields(front) ->
+    [{position, mk(front, #{default => front, required => true, in => body})}];
+fields(rear) ->
+    [{position, mk(rear, #{default => rear, required => true, in => body})}];
+fields('after') ->
+    [
+        {position, mk('after', #{default => 'after', required => true, in => body})},
+        {validation, mk(binary(), #{required => true, in => body})}
+    ];
+fields(before) ->
+    [
+        {position, mk(before, #{default => before, required => true, in => body})},
+        {validation, mk(binary(), #{required => true, in => body})}
+    ].
+
+%%-------------------------------------------------------------------------------------------------
+%% `minirest' handlers
+%%-------------------------------------------------------------------------------------------------
+
+'/message_validations'(get, _Params) ->
+    ?OK(emqx_message_validation:list());
+'/message_validations'(post, #{body := Params = #{<<"name">> := Name}}) ->
+    with_validation(
+        Name,
+        return(?BAD_REQUEST('ALREADY_EXISTS', <<"Validation already exists">>)),
+        fun() ->
+            case emqx_message_validation:insert(Params) of
+                {ok, _} ->
+                    {ok, Res} = emqx_message_validation:lookup(Name),
+                    {201, Res};
+                {error, Error} ->
+                    ?BAD_REQUEST(Error)
+            end
+        end
+    );
+'/message_validations'(put, #{body := Params = #{<<"name">> := Name}}) ->
+    with_validation(
+        Name,
+        fun() ->
+            case emqx_message_validation:update(Params) of
+                {ok, _} ->
+                    {ok, Res} = emqx_message_validation:lookup(Name),
+                    {200, Res};
+                {error, Error} ->
+                    ?BAD_REQUEST(Error)
+            end
+        end,
+        not_found()
+    ).
+
+'/message_validations/:name'(get, #{bindings := #{name := Name}}) ->
+    with_validation(
+        Name,
+        fun(Validation) -> ?OK(Validation) end,
+        not_found()
+    );
+'/message_validations/:name'(delete, #{bindings := #{name := Name}}) ->
+    with_validation(
+        Name,
+        fun() ->
+            case emqx_message_validation:delete(Name) of
+                {ok, _} ->
+                    ?NO_CONTENT;
+                {error, Error} ->
+                    ?BAD_REQUEST(Error)
+            end
+        end,
+        not_found()
+    ).
+
+'/message_validations/:name/move'(post, #{bindings := #{name := Name}, body := Body}) ->
+    with_validation(
+        Name,
+        fun() ->
+            do_move(Name, parse_position(Body))
+        end,
+        not_found(Name)
+    ).
+
+%%-------------------------------------------------------------------------------------------------
+%% Internal fns
+%%-------------------------------------------------------------------------------------------------
+
+ref(Struct) -> hoconsc:ref(?MODULE, Struct).
+mk(Type, Opts) -> hoconsc:mk(Type, Opts).
+
+example_input_create() ->
+    %% TODO
+    #{}.
+
+example_input_update() ->
+    %% TODO
+    #{}.
+
+example_return_list() ->
+    %% TODO
+    [].
+
+example_return_create() ->
+    %% TODO
+    #{}.
+
+example_return_update() ->
+    %% TODO
+    #{}.
+
+example_return_lookup() ->
+    %% TODO
+    #{}.
+
+example_position() ->
+    %% TODO
+    #{}.
+
+error_schema(Code, Message) when is_atom(Code) ->
+    error_schema([Code], Message);
+error_schema(Codes, Message) when is_list(Message) ->
+    error_schema(Codes, list_to_binary(Message));
+error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
+    emqx_dashboard_swagger:error_codes(Codes, Message).
+
+position_union_member_selector(all_union_members) ->
+    position_refs();
+position_union_member_selector({value, V}) ->
+    position_refs(V).
+
+position_refs() ->
+    [].
+
+position_types() ->
+    [
+        front,
+        rear,
+        'after',
+        before
+    ].
+
+position_refs(#{<<"position">> := <<"front">>}) ->
+    [ref(front)];
+position_refs(#{<<"position">> := <<"rear">>}) ->
+    [ref(rear)];
+position_refs(#{<<"position">> := <<"after">>}) ->
+    [ref('after')];
+position_refs(#{<<"position">> := <<"before">>}) ->
+    [ref(before)];
+position_refs(_) ->
+    Expected = lists:join(" | ", [atom_to_list(T) || T <- position_types()]),
+    throw(#{
+        field_name => position,
+        expected => iolist_to_binary(Expected)
+    }).
+
+%% Schema is already checked, so we don't need to do further validation.
+parse_position(#{<<"position">> := <<"front">>}) ->
+    front;
+parse_position(#{<<"position">> := <<"rear">>}) ->
+    rear;
+parse_position(#{<<"position">> := <<"after">>, <<"validation">> := OtherValidationName}) ->
+    {'after', OtherValidationName};
+parse_position(#{<<"position">> := <<"before">>, <<"validation">> := OtherValidationName}) ->
+    {before, OtherValidationName}.
+
+do_move(ValidationName, {_, OtherValidationName} = Position) ->
+    with_validation(
+        OtherValidationName,
+        fun() ->
+            case emqx_message_validation:move(ValidationName, Position) of
+                {ok, _} ->
+                    ?NO_CONTENT;
+                {error, Error} ->
+                    ?BAD_REQUEST(Error)
+            end
+        end,
+        bad_request_not_found(OtherValidationName)
+    );
+do_move(ValidationName, Position) ->
+    case emqx_message_validation:move(ValidationName, Position) of
+        {ok, _} ->
+            ?NO_CONTENT;
+        {error, Error} ->
+            ?BAD_REQUEST(Error)
+    end.
+
+with_validation(Name, FoundFn, NotFoundFn) ->
+    case emqx_message_validation:lookup(Name) of
+        {ok, Validation} ->
+            {arity, Arity} = erlang:fun_info(FoundFn, arity),
+            case Arity of
+                1 -> FoundFn(Validation);
+                0 -> FoundFn()
+            end;
+        {error, not_found} ->
+            NotFoundFn()
+    end.
+
+return(Response) ->
+    fun() -> Response end.
+
+not_found() ->
+    return(?NOT_FOUND(<<"Validation not found">>)).
+
+not_found(Name) ->
+    return(?NOT_FOUND(<<"Validation not found: ", Name/binary>>)).
+
+%% After we found the base validation, but not the other one being referenced in a move.
+bad_request_not_found(Name) ->
+    return(?BAD_REQUEST(<<"Validation not found: ", Name/binary>>)).

+ 225 - 0
apps/emqx_message_validation/src/emqx_message_validation_registry.erl

@@ -0,0 +1,225 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_registry).
+
+-behaviour(gen_server).
+
+%% API
+-export([
+    lookup/1,
+    insert/2,
+    update/3,
+    delete/1,
+    reindex_positions/1,
+
+    matching_validations/1,
+
+    start_link/0,
+    metrics_worker_spec/0
+]).
+
+%% `gen_server' API
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+-define(VALIDATION_TOPIC_INDEX, emqx_message_validation_index).
+-define(VALIDATION_TAB, emqx_message_validation_tab).
+
+-define(METRIC_NAME, message_validation).
+-define(METRICS, [
+    'matched',
+    'passed',
+    'failed'
+]).
+-define(RATE_METRICS, ['matched']).
+
+-type validation_name() :: binary().
+-type validation() :: _TODO.
+-type position_index() :: pos_integer().
+
+-record(reindex_positions, {validations :: [validation()]}).
+-record(insert, {pos :: position_index(), validation :: validation()}).
+-record(update, {old :: validation(), pos :: position_index(), new :: validation()}).
+-record(delete, {validation :: validation()}).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+-spec start_link() -> gen_server:start_ret().
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+-spec lookup(validation_name()) ->
+    {ok, validation()} | {error, not_found}.
+lookup(Name) ->
+    case emqx_utils_ets:lookup_value(?VALIDATION_TAB, Name, undefined) of
+        undefined ->
+            {error, not_found};
+        Validation ->
+            {ok, Validation}
+    end.
+
+-spec reindex_positions([validation()]) -> ok.
+reindex_positions(Validations) ->
+    gen_server:call(?MODULE, #reindex_positions{validations = Validations}, infinity).
+
+-spec insert(position_index(), validation()) -> ok.
+insert(Pos, Validation) ->
+    gen_server:call(?MODULE, #insert{pos = Pos, validation = Validation}, infinity).
+
+-spec update(validation(), position_index(), validation()) -> ok.
+update(Old, Pos, New) ->
+    gen_server:call(?MODULE, #update{old = Old, pos = Pos, new = New}, infinity).
+
+-spec delete(validation()) -> ok.
+delete(Validation) ->
+    gen_server:call(?MODULE, #delete{validation = Validation}, infinity).
+
+%% @doc Returns a list of matching validation names, sorted by their configuration order.
+-spec matching_validations(emqx_types:topic()) -> [validation()].
+matching_validations(Topic) ->
+    Validations0 = [
+        {Pos, Validation}
+     || M <- emqx_topic_index:matches(Topic, ?VALIDATION_TOPIC_INDEX, [unique]),
+        [Pos] <- [emqx_topic_index:get_record(M, ?VALIDATION_TOPIC_INDEX)],
+        {ok, Validation} <- [
+            lookup(emqx_topic_index:get_id(M))
+        ]
+    ],
+    Validations1 = lists:sort(fun({Pos1, _V1}, {Pos2, _V2}) -> Pos1 =< Pos2 end, Validations0),
+    lists:map(fun({_Pos, V}) -> V end, Validations1).
+
+-spec metrics_worker_spec() -> supervisor:child_spec().
+metrics_worker_spec() ->
+    emqx_metrics_worker:child_spec(message_validation_metrics, ?METRIC_NAME).
+
+%%------------------------------------------------------------------------------
+%% `gen_server' API
+%%------------------------------------------------------------------------------
+
+init(_) ->
+    create_tables(),
+    State = #{},
+    {ok, State}.
+
+handle_call(#reindex_positions{validations = Validations}, _From, State) ->
+    do_reindex_positions(Validations),
+    {reply, ok, State};
+handle_call(#insert{pos = Pos, validation = Validation}, _From, State) ->
+    do_insert(Pos, Validation),
+    {reply, ok, State};
+handle_call(#update{old = OldValidation, pos = Pos, new = NewValidation}, _From, State) ->
+    ok = do_update(OldValidation, Pos, NewValidation),
+    {reply, ok, State};
+handle_call(#delete{validation = Validation}, _From, State) ->
+    do_delete(Validation),
+    {reply, ok, State};
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+handle_cast(_Cast, State) ->
+    {noreply, State}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+create_tables() ->
+    _ = emqx_utils_ets:new(?VALIDATION_TOPIC_INDEX, [public, ordered_set, {read_concurrency, true}]),
+    _ = emqx_utils_ets:new(?VALIDATION_TAB, [public, ordered_set, {read_concurrency, true}]),
+    ok.
+
+do_reindex_positions(Validations) ->
+    lists:foreach(
+        fun({Pos, Validation}) ->
+            #{
+                name := Name,
+                topics := Topics
+            } = Validation,
+            do_insert_into_tab(Name, Validation, Pos),
+            update_topic_index(Name, Pos, Topics)
+        end,
+        lists:enumerate(Validations)
+    ).
+
+do_insert(Pos, Validation) ->
+    #{
+        name := Name,
+        topics := Topics
+    } = Validation,
+    maybe_create_metrics(Name),
+    do_insert_into_tab(Name, Validation, Pos),
+    update_topic_index(Name, Pos, Topics),
+    ok.
+
+do_update(OldValidation, Pos, NewValidation) ->
+    #{topics := OldTopics} = OldValidation,
+    #{
+        name := Name,
+        topics := NewTopics
+    } = NewValidation,
+    maybe_create_metrics(Name),
+    do_insert_into_tab(Name, NewValidation, Pos),
+    delete_topic_index(Name, OldTopics),
+    update_topic_index(Name, Pos, NewTopics),
+    ok.
+
+do_delete(Validation) ->
+    #{
+        name := Name,
+        topics := Topics
+    } = Validation,
+    ets:delete(?VALIDATION_TAB, Name),
+    delete_topic_index(Name, Topics),
+    drop_metrics(Name),
+    ok.
+
+do_insert_into_tab(Name, Validation0, Pos) ->
+    Validation = transform_validation(Validation0#{pos => Pos}),
+    ets:insert(?VALIDATION_TAB, {Name, Validation}),
+    ok.
+
+maybe_create_metrics(Name) ->
+    case emqx_metrics_worker:has_metrics(?METRIC_NAME, Name) of
+        true ->
+            ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, Name);
+        false ->
+            ok = emqx_metrics_worker:create_metrics(?METRIC_NAME, Name, ?METRICS, ?RATE_METRICS)
+    end.
+
+drop_metrics(Name) ->
+    ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, Name).
+
+update_topic_index(Name, Pos, Topics) ->
+    lists:foreach(
+        fun(Topic) ->
+            true = emqx_topic_index:insert(Topic, Name, Pos, ?VALIDATION_TOPIC_INDEX)
+        end,
+        Topics
+    ).
+
+delete_topic_index(Name, Topics) ->
+    lists:foreach(
+        fun(Topic) ->
+            true = emqx_topic_index:delete(Topic, Name, ?VALIDATION_TOPIC_INDEX)
+        end,
+        Topics
+    ).
+
+transform_validation(Validation = #{checks := Checks}) ->
+    Validation#{checks := lists:map(fun transform_check/1, Checks)}.
+
+transform_check(#{type := sql, sql := SQL}) ->
+    {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
+    Check;
+transform_check(Check) ->
+    Check.

+ 206 - 0
apps/emqx_message_validation/src/emqx_message_validation_schema.erl

@@ -0,0 +1,206 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_schema).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%% `hocon_schema' API
+-export([
+    namespace/0,
+    roots/0,
+    fields/1
+]).
+
+%% `minirest_trails' API
+-export([
+    api_schema/1
+]).
+
+%%------------------------------------------------------------------------------
+%% Type declarations
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% `hocon_schema' API
+%%------------------------------------------------------------------------------
+
+namespace() -> message_validation.
+
+roots() ->
+    [{message_validation, mk(ref(message_validation), #{importance => ?IMPORTANCE_HIDDEN})}].
+
+fields(message_validation) ->
+    [
+        {validations,
+            mk(
+                hoconsc:array(ref(validation)),
+                #{
+                    default => [],
+                    desc => ?DESC("validations"),
+                    validator => fun validate_unique_names/1
+                }
+            )}
+    ];
+fields(validation) ->
+    [
+        {tags, emqx_schema:tags_schema()},
+        {description, emqx_schema:description_schema()},
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {name, mk(binary(), #{required => true, desc => ?DESC("name")})},
+        {topics,
+            mk(
+                hoconsc:union([binary(), hoconsc:array(binary())]),
+                #{
+                    desc => ?DESC("topics"),
+                    converter => fun ensure_array/2,
+                    required => true
+                }
+            )},
+        {strategy,
+            mk(
+                hoconsc:enum([any_pass, all_pass]),
+                #{desc => ?DESC("strategy"), required => true}
+            )},
+        {failure_action,
+            mk(
+                hoconsc:enum([drop, disconnect]),
+                #{desc => ?DESC("failure_action"), required => true}
+            )},
+        {log_failure_at,
+            mk(
+                hoconsc:enum([error, warning, notice, info, debug]),
+                #{desc => ?DESC("log_failure_at"), default => info}
+            )},
+        {checks,
+            mk(
+                hoconsc:array(
+                    hoconsc:union(fun checks_union_member_selector/1)
+                ),
+                #{
+                    required => true,
+                    desc => ?DESC("checks"),
+                    validator => fun
+                        ([]) ->
+                            {error, "at least one check must be defined"};
+                        (_) ->
+                            ok
+                    end
+                }
+            )}
+    ];
+fields(check_sql) ->
+    [
+        {type, mk(sql, #{default => sql, desc => ?DESC("check_sql_type")})},
+        {sql,
+            mk(binary(), #{
+                required => true,
+                desc => ?DESC("check_sql_type"),
+                validator => fun validate_sql/1
+            })}
+    ];
+fields(check_json) ->
+    [
+        {type, mk(json, #{default => json, desc => ?DESC("check_json_type")})},
+        {schema, mk(binary(), #{required => true, desc => ?DESC("check_json_type")})}
+    ];
+fields(check_protobuf) ->
+    [
+        {type, mk(protobuf, #{default => protobuf, desc => ?DESC("check_protobuf_type")})},
+        {schema, mk(binary(), #{required => true, desc => ?DESC("check_protobuf_schema")})},
+        {message_name,
+            mk(binary(), #{required => true, desc => ?DESC("check_protobuf_message_name")})}
+    ];
+fields(check_avro) ->
+    [
+        {type, mk(avro, #{default => avro, desc => ?DESC("check_avro_type")})},
+        {schema, mk(binary(), #{required => true, desc => ?DESC("check_avro_schema")})}
+    ].
+
+checks_union_member_selector(all_union_members) ->
+    checks_refs();
+checks_union_member_selector({value, V}) ->
+    checks_refs(V).
+
+checks_refs() ->
+    [ref(CheckType) || CheckType <- check_types()].
+
+check_types() ->
+    [
+        check_sql,
+        check_json,
+        check_avro,
+        check_protobuf
+    ].
+
+checks_refs(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
+    checks_refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
+checks_refs(#{<<"type">> := <<"sql">>}) ->
+    [ref(check_sql)];
+checks_refs(#{<<"type">> := <<"json">>}) ->
+    [ref(check_json)];
+checks_refs(#{<<"type">> := <<"avro">>}) ->
+    [ref(check_avro)];
+checks_refs(#{<<"type">> := <<"protobuf">>}) ->
+    [ref(check_protobuf)];
+checks_refs(_Value) ->
+    Expected = lists:join(
+        " | ",
+        [
+            Name
+         || T <- check_types(),
+            "check_" ++ Name <- [atom_to_list(T)]
+        ]
+    ),
+    throw(#{
+        field_name => type,
+        expected => iolist_to_binary(Expected)
+    }).
+
+%%------------------------------------------------------------------------------
+%% `minirest_trails' API
+%%------------------------------------------------------------------------------
+
+api_schema(list) ->
+    hoconsc:array(ref(validation));
+api_schema(lookup) ->
+    ref(validation);
+api_schema(post) ->
+    ref(validation);
+api_schema(put) ->
+    ref(validation).
+
+%%------------------------------------------------------------------------------
+%% Internal exports
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+mk(Type, Meta) -> hoconsc:mk(Type, Meta).
+ref(Name) -> hoconsc:ref(?MODULE, Name).
+
+ensure_array(undefined, _) -> undefined;
+ensure_array(L, _) when is_list(L) -> L;
+ensure_array(B, _) -> [B].
+
+validate_sql(SQL) ->
+    case emqx_message_validation:parse_sql_check(SQL) of
+        {ok, _Parsed} ->
+            ok;
+        Error = {error, _} ->
+            Error
+    end.
+
+validate_unique_names(Validations0) ->
+    Validations = emqx_utils_maps:binary_key_map(Validations0),
+    do_validate_unique_names(Validations, #{}).
+
+do_validate_unique_names(_Validations = [], _Acc) ->
+    ok;
+do_validate_unique_names([#{<<"name">> := Name} | _Rest], Acc) when is_map_key(Name, Acc) ->
+    {error, <<"duplicated name: ", Name/binary>>};
+do_validate_unique_names([#{<<"name">> := Name} | Rest], Acc) ->
+    do_validate_unique_names(Rest, Acc#{Name => true}).

+ 47 - 0
apps/emqx_message_validation/src/emqx_message_validation_sup.erl

@@ -0,0 +1,47 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% `supervisor' API
+-export([init/1]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%%------------------------------------------------------------------------------
+%% `supervisor' API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    Registry = worker_spec(emqx_message_validation_registry),
+    Metrics = emqx_message_validation_registry:metrics_worker_spec(),
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    ChildSpecs = [Metrics, Registry],
+    {ok, {SupFlags, ChildSpecs}}.
+
+%%------------------------------------------------------------------------------
+%% Internal fns
+%%------------------------------------------------------------------------------
+
+worker_spec(Mod) ->
+    #{
+        id => Mod,
+        start => {Mod, start_link, []},
+        restart => permanent,
+        shutdown => 5_000,
+        type => worker
+    }.

+ 690 - 0
apps/emqx_message_validation/test/emqx_message_validation_http_api_SUITE.erl

@@ -0,0 +1,690 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_http_api_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/asserts.hrl").
+-include_lib("emqx/include/emqx_mqtt.hrl").
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    emqx_common_test_helpers:clear_screen(),
+    Apps = emqx_cth_suite:start(
+        lists:flatten(
+            [
+                emqx,
+                emqx_conf,
+                emqx_rule_engine,
+                emqx_message_validation,
+                emqx_management,
+                emqx_mgmt_api_test_util:emqx_dashboard(),
+                emqx_schema_registry
+            ]
+        ),
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _} = emqx_common_test_http:create_default_app(),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
+
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    clear_all_validations(),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+-define(assertIndexOrder(EXPECTED, TOPIC), assert_index_order(EXPECTED, TOPIC, #{line => ?LINE})).
+
+clear_all_validations() ->
+    lists:foreach(
+        fun(#{name := Name}) ->
+            {ok, _} = emqx_message_validation:delete(Name)
+        end,
+        emqx_message_validation:list()
+    ).
+
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.
+
+request(Method, Path, Params) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
+        {ok, {Status, Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {ok, {Status, Headers, Body}};
+        {error, {Status, Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {error, {Status, Headers, Body}};
+        Error ->
+            Error
+    end.
+
+validation(Name, Checks) ->
+    validation(Name, Checks, _Overrides = #{}).
+
+validation(Name, Checks, Overrides) ->
+    Default = #{
+        <<"tags">> => [<<"some">>, <<"tags">>],
+        <<"description">> => <<"my validation">>,
+        <<"enable">> => true,
+        <<"name">> => Name,
+        <<"topics">> => <<"t/+">>,
+        <<"strategy">> => <<"all_pass">>,
+        <<"failure_action">> => <<"drop">>,
+        <<"log_failure_at">> => <<"warning">>,
+        <<"checks">> => Checks
+    },
+    emqx_utils_maps:deep_merge(Default, Overrides).
+
+sql_check() ->
+    sql_check(<<"select * where true">>).
+
+sql_check(SQL) ->
+    #{
+        <<"type">> => <<"sql">>,
+        <<"sql">> => SQL
+    }.
+
+schema_check(Type, SerdeName) ->
+    schema_check(Type, SerdeName, _Overrides = #{}).
+
+schema_check(Type, SerdeName, Overrides) ->
+    emqx_utils_maps:deep_merge(
+        #{
+            <<"type">> => emqx_utils_conv:bin(Type),
+            <<"schema">> => SerdeName
+        },
+        Overrides
+    ).
+
+api_root() -> "message_validations".
+
+simplify_result(Res) ->
+    case Res of
+        {error, {{_, Status, _}, _, Body}} ->
+            {Status, Body};
+        {ok, {{_, Status, _}, _, Body}} ->
+            {Status, Body}
+    end.
+
+list() ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list result:\n  ~p", [Res]),
+    simplify_result(Res).
+
+lookup(Name) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
+    Res = request(get, Path, _Params = []),
+    ct:pal("lookup ~s result:\n  ~p", [Name, Res]),
+    simplify_result(Res).
+
+insert(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+    Res = request(post, Path, Params),
+    ct:pal("insert result:\n  ~p", [Res]),
+    simplify_result(Res).
+
+update(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root()]),
+    Res = request(put, Path, Params),
+    ct:pal("update result:\n  ~p", [Res]),
+    simplify_result(Res).
+
+delete(Name) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root(), Name]),
+    Res = request(delete, Path, _Params = []),
+    ct:pal("delete result:\n  ~p", [Res]),
+    simplify_result(Res).
+
+move(Name, Pos) ->
+    Path = emqx_mgmt_api_test_util:api_path([api_root(), Name, "move"]),
+    Res = request(post, Path, Pos),
+    ct:pal("move result:\n  ~p", [Res]),
+    simplify_result(Res).
+
+connect(ClientId) ->
+    connect(ClientId, _IsPersistent = false).
+
+connect(ClientId, IsPersistent) ->
+    Properties = emqx_utils_maps:put_if(#{}, 'Session-Expiry-Interval', 30, IsPersistent),
+    {ok, Client} = emqtt:start_link([
+        {clean_start, true},
+        {clientid, ClientId},
+        {properties, Properties},
+        {proto_ver, v5}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    on_exit(fun() -> catch emqtt:stop(Client) end),
+    Client.
+
+publish(Client, Topic, Payload) ->
+    publish(Client, Topic, Payload, _QoS = 0).
+
+publish(Client, Topic, {raw, Payload}, QoS) ->
+    case emqtt:publish(Client, Topic, Payload, QoS) of
+        ok -> ok;
+        {ok, _} -> ok;
+        Err -> Err
+    end;
+publish(Client, Topic, Payload, QoS) ->
+    case emqtt:publish(Client, Topic, emqx_utils_json:encode(Payload), QoS) of
+        ok -> ok;
+        {ok, _} -> ok;
+        Err -> Err
+    end.
+
+json_valid_payloads() ->
+    [
+        #{i => 10, s => <<"s">>},
+        #{i => 10}
+    ].
+
+json_invalid_payloads() ->
+    [
+        #{i => <<"wrong type">>},
+        #{x => <<"unknown property">>}
+    ].
+
+json_create_serde(SerdeName) ->
+    Source = #{
+        type => object,
+        properties => #{
+            i => #{type => integer},
+            s => #{type => string}
+        },
+        required => [<<"i">>],
+        additionalProperties => false
+    },
+    Schema = #{type => json, source => emqx_utils_json:encode(Source)},
+    ok = emqx_schema_registry:add_schema(SerdeName, Schema),
+    on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
+    ok.
+
+avro_valid_payloads(SerdeName) ->
+    lists:map(
+        fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload) end,
+        [
+            #{i => 10, s => <<"s">>},
+            #{i => 10}
+        ]
+    ).
+
+avro_invalid_payloads() ->
+    [
+        emqx_utils_json:encode(#{i => 10, s => <<"s">>}),
+        <<"">>
+    ].
+
+avro_create_serde(SerdeName) ->
+    Source = #{
+        type => record,
+        name => <<"test">>,
+        namespace => <<"emqx.com">>,
+        fields => [
+            #{name => <<"i">>, type => <<"int">>},
+            #{name => <<"s">>, type => [<<"null">>, <<"string">>], default => <<"null">>}
+        ]
+    },
+    Schema = #{type => avro, source => emqx_utils_json:encode(Source)},
+    ok = emqx_schema_registry:add_schema(SerdeName, Schema),
+    on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
+    ok.
+
+protobuf_valid_payloads(SerdeName, MessageName) ->
+    lists:map(
+        fun(Payload) -> emqx_schema_registry_serde:encode(SerdeName, Payload, [MessageName]) end,
+        [
+            #{<<"name">> => <<"some name">>, <<"id">> => 10, <<"email">> => <<"emqx@emqx.io">>},
+            #{<<"name">> => <<"some name">>, <<"id">> => 10}
+        ]
+    ).
+
+protobuf_invalid_payloads() ->
+    [
+        emqx_utils_json:encode(#{name => <<"a">>, id => 10, email => <<"email">>}),
+        <<"not protobuf">>
+    ].
+
+protobuf_create_serde(SerdeName) ->
+    Source =
+        <<
+            "message Person {\n"
+            "     required string name = 1;\n"
+            "     required int32 id = 2;\n"
+            "     optional string email = 3;\n"
+            "  }\n"
+            "message UnionValue {\n"
+            "    oneof u {\n"
+            "        int32  a = 1;\n"
+            "        string b = 2;\n"
+            "    }\n"
+            "}"
+        >>,
+    Schema = #{type => protobuf, source => Source},
+    ok = emqx_schema_registry:add_schema(SerdeName, Schema),
+    on_exit(fun() -> ok = emqx_schema_registry:delete_schema(SerdeName) end),
+    ok.
+
+%% Checks that the internal order in the registry/index matches expectation.
+assert_index_order(ExpectedOrder, Topic, Comment) ->
+    ?assertEqual(
+        ExpectedOrder,
+        [
+            N
+         || #{name := N} <- emqx_message_validation_registry:matching_validations(Topic)
+        ],
+        Comment
+    ).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+%% Smoke test where we have a single check and `all_pass' strategy.
+t_smoke_test(_Config) ->
+    Name1 = <<"foo">>,
+    Check1 = sql_check(<<"select payload.value as x where x > 15">>),
+    Validation1 = validation(Name1, [Check1]),
+    {201, _} = insert(Validation1),
+
+    lists:foreach(
+        fun({QoS, IsPersistent}) ->
+            ct:pal("qos = ~b, is persistent = ~p", [QoS, IsPersistent]),
+            C = connect(<<"c1">>, IsPersistent),
+            {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+            {200, _} = update(Validation1),
+
+            ok = publish(C, <<"t/1">>, #{value => 20}, QoS),
+            ?assertReceive({publish, _}),
+            ok = publish(C, <<"t/1">>, #{value => 10}, QoS),
+            ?assertNotReceive({publish, _}),
+            ok = publish(C, <<"t/1/a">>, #{value => 10}, QoS),
+            ?assertReceive({publish, _}),
+
+            %% test `disconnect' failure action
+            Validation2 = validation(Name1, [Check1], #{<<"failure_action">> => <<"disconnect">>}),
+            {200, _} = update(Validation2),
+
+            unlink(C),
+            PubRes = publish(C, <<"t/1">>, #{value => 0}, QoS),
+            case QoS =:= 0 of
+                true ->
+                    ?assertMatch(ok, PubRes);
+                false ->
+                    ?assertMatch(
+                        {error, {disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}},
+                        PubRes
+                    )
+            end,
+            ?assertNotReceive({publish, _}),
+            ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
+
+            ok
+        end,
+        [
+            {QoS, IsPersistent}
+         || IsPersistent <- [false, true],
+            QoS <- [0, 1, 2]
+        ]
+    ),
+
+    ok.
+
+t_crud(_Config) ->
+    ?assertMatch({200, []}, list()),
+
+    Name1 = <<"foo">>,
+    Validation1 = validation(Name1, [sql_check()]),
+
+    ?assertMatch({201, #{<<"name">> := Name1}}, insert(Validation1)),
+    ?assertMatch({200, #{<<"name">> := Name1}}, lookup(Name1)),
+    ?assertMatch({200, [#{<<"name">> := Name1}]}, list()),
+    %% Duplicated name
+    ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, insert(Validation1)),
+
+    Name2 = <<"bar">>,
+    Validation2 = validation(Name2, [sql_check()]),
+    %% Not found
+    ?assertMatch({404, _}, update(Validation2)),
+    ?assertMatch({201, _}, insert(Validation2)),
+    ?assertMatch(
+        {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]},
+        list()
+    ),
+    ?assertMatch({200, #{<<"name">> := Name2}}, lookup(Name2)),
+    Validation1b = validation(Name1, [
+        sql_check(<<"select * where true">>),
+        sql_check(<<"select * where false">>)
+    ]),
+    ?assertMatch({200, _}, update(Validation1b)),
+    ?assertMatch({200, #{<<"checks">> := [_, _]}}, lookup(Name1)),
+    %% order is unchanged
+    ?assertMatch(
+        {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]},
+        list()
+    ),
+
+    ?assertMatch({204, _}, delete(Name1)),
+    ?assertMatch({404, _}, lookup(Name1)),
+    ?assertMatch({200, [#{<<"name">> := Name2}]}, list()),
+    ?assertMatch({404, _}, update(Validation1)),
+
+    ok.
+
+t_move(_Config) ->
+    lists:foreach(
+        fun(Pos) ->
+            ?assertMatch({404, _}, move(<<"nonexistent_validation">>, Pos))
+        end,
+        [
+            #{<<"position">> => <<"front">>},
+            #{<<"position">> => <<"rear">>},
+            #{<<"position">> => <<"after">>, <<"validation">> => <<"also_non_existent">>},
+            #{<<"position">> => <<"before">>, <<"validation">> => <<"also_non_existent">>}
+        ]
+    ),
+
+    Topic = <<"t">>,
+
+    Name1 = <<"foo">>,
+    Validation1 = validation(Name1, [sql_check()], #{<<"topics">> => Topic}),
+    {201, _} = insert(Validation1),
+
+    %% bogus positions
+    lists:foreach(
+        fun(Pos) ->
+            ?assertMatch(
+                {400, #{<<"message">> := #{<<"kind">> := <<"validation_error">>}}},
+                move(Name1, Pos)
+            )
+        end,
+        [
+            #{<<"position">> => <<"foo">>},
+            #{<<"position">> => <<"bar">>, <<"validation">> => Name1}
+        ]
+    ),
+
+    lists:foreach(
+        fun(Pos) ->
+            ?assertMatch({204, _}, move(Name1, Pos)),
+            ?assertMatch({200, [#{<<"name">> := Name1}]}, list())
+        end,
+        [
+            #{<<"position">> => <<"front">>},
+            #{<<"position">> => <<"rear">>}
+        ]
+    ),
+
+    lists:foreach(
+        fun(Pos) ->
+            ?assertMatch({400, _}, move(Name1, Pos))
+        end,
+        [
+            #{<<"position">> => <<"after">>, <<"validation">> => <<"nonexistent">>},
+            #{<<"position">> => <<"before">>, <<"validation">> => <<"nonexistent">>}
+        ]
+    ),
+
+    Name2 = <<"bar">>,
+    Validation2 = validation(Name2, [sql_check()], #{<<"topics">> => Topic}),
+    {201, _} = insert(Validation2),
+    ?assertMatch({200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, list()),
+    ?assertIndexOrder([Name1, Name2], Topic),
+
+    ?assertMatch({204, _}, move(Name1, #{<<"position">> => <<"rear">>})),
+    ?assertMatch({200, [#{<<"name">> := Name2}, #{<<"name">> := Name1}]}, list()),
+    ?assertIndexOrder([Name2, Name1], Topic),
+
+    ?assertMatch({204, _}, move(Name1, #{<<"position">> => <<"front">>})),
+    ?assertMatch({200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}]}, list()),
+    ?assertIndexOrder([Name1, Name2], Topic),
+
+    Name3 = <<"baz">>,
+    Validation3 = validation(Name3, [sql_check()], #{<<"topics">> => Topic}),
+    {201, _} = insert(Validation3),
+    ?assertMatch(
+        {200, [#{<<"name">> := Name1}, #{<<"name">> := Name2}, #{<<"name">> := Name3}]},
+        list()
+    ),
+    ?assertIndexOrder([Name1, Name2, Name3], Topic),
+
+    ?assertMatch(
+        {204, _}, move(Name3, #{<<"position">> => <<"before">>, <<"validation">> => Name2})
+    ),
+    ?assertMatch(
+        {200, [#{<<"name">> := Name1}, #{<<"name">> := Name3}, #{<<"name">> := Name2}]},
+        list()
+    ),
+    ?assertIndexOrder([Name1, Name3, Name2], Topic),
+
+    ?assertMatch(
+        {204, _}, move(Name1, #{<<"position">> => <<"after">>, <<"validation">> => Name2})
+    ),
+    ?assertMatch(
+        {200, [#{<<"name">> := Name3}, #{<<"name">> := Name2}, #{<<"name">> := Name1}]},
+        list()
+    ),
+    ?assertIndexOrder([Name3, Name2, Name1], Topic),
+
+    ok.
+
+%% Check the `all_pass' strategy
+t_all_pass(_Config) ->
+    Name1 = <<"foo">>,
+    Check1 = sql_check(<<"select payload.x as x where x > 5">>),
+    Check2 = sql_check(<<"select payload.x as x where x > 10">>),
+    Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"all_pass">>}),
+    {201, _} = insert(Validation1),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+    ok = publish(C, <<"t/1">>, #{x => 0}),
+    ?assertNotReceive({publish, _}),
+    ok = publish(C, <<"t/1">>, #{x => 7}),
+    ?assertNotReceive({publish, _}),
+    ok = publish(C, <<"t/1">>, #{x => 11}),
+    ?assertReceive({publish, _}),
+
+    ok.
+
+%% Check the `any_pass' strategy
+t_any_pass(_Config) ->
+    Name1 = <<"foo">>,
+    Check1 = sql_check(<<"select payload.x as x where x > 5">>),
+    Check2 = sql_check(<<"select payload.x as x where x > 10">>),
+    Validation1 = validation(Name1, [Check1, Check2], #{<<"strategy">> => <<"any_pass">>}),
+    {201, _} = insert(Validation1),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+    ok = publish(C, <<"t/1">>, #{x => 11}),
+    ?assertReceive({publish, _}),
+    ok = publish(C, <<"t/1">>, #{x => 7}),
+    ?assertReceive({publish, _}),
+    ok = publish(C, <<"t/1">>, #{x => 0}),
+    ?assertNotReceive({publish, _}),
+
+    ok.
+
+%% Checks that multiple validations are run in order.
+t_multiple_validations(_Config) ->
+    Name1 = <<"foo">>,
+    Check1 = sql_check(<<"select payload.x as x, payload.y as y where x > 10 or y > 0">>),
+    Validation1 = validation(Name1, [Check1], #{<<"failure_action">> => <<"drop">>}),
+    {201, _} = insert(Validation1),
+
+    Name2 = <<"bar">>,
+    Check2 = sql_check(<<"select payload.x as x where x > 5">>),
+    Validation2 = validation(Name2, [Check2], #{<<"failure_action">> => <<"disconnect">>}),
+    {201, _} = insert(Validation2),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+    ok = publish(C, <<"t/1">>, #{x => 11, y => 1}),
+    ?assertReceive({publish, _}),
+    ok = publish(C, <<"t/1">>, #{x => 7, y => 0}),
+    ?assertNotReceive({publish, _}),
+    ?assertNotReceive({disconnected, _, _}),
+    unlink(C),
+    ok = publish(C, <<"t/1">>, #{x => 0, y => 1}),
+    ?assertNotReceive({publish, _}),
+    ?assertReceive({disconnected, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, _}),
+
+    ok.
+
+t_schema_check_non_existent_serde(_Config) ->
+    SerdeName = <<"idontexist">>,
+    Name1 = <<"foo">>,
+    Check1 = schema_check(json, SerdeName),
+    Validation1 = validation(Name1, [Check1]),
+    {201, _} = insert(Validation1),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+    ok = publish(C, <<"t/1">>, #{i => 10, s => <<"s">>}),
+    ?assertNotReceive({publish, _}),
+
+    ok.
+
+t_schema_check_json(_Config) ->
+    SerdeName = <<"myserde">>,
+    json_create_serde(SerdeName),
+
+    Name1 = <<"foo">>,
+    Check1 = schema_check(json, SerdeName),
+    Validation1 = validation(Name1, [Check1]),
+    {201, _} = insert(Validation1),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, Payload),
+            ?assertReceive({publish, _})
+        end,
+        json_valid_payloads()
+    ),
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, Payload),
+            ?assertNotReceive({publish, _})
+        end,
+        json_invalid_payloads()
+    ),
+
+    ok.
+
+t_schema_check_avro(_Config) ->
+    SerdeName = <<"myserde">>,
+    avro_create_serde(SerdeName),
+
+    Name1 = <<"foo">>,
+    Check1 = schema_check(avro, SerdeName),
+    Validation1 = validation(Name1, [Check1]),
+    {201, _} = insert(Validation1),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, {raw, Payload}),
+            ?assertReceive({publish, _})
+        end,
+        avro_valid_payloads(SerdeName)
+    ),
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, {raw, Payload}),
+            ?assertNotReceive({publish, _})
+        end,
+        avro_invalid_payloads()
+    ),
+
+    ok.
+
+t_schema_check_protobuf(_Config) ->
+    SerdeName = <<"myserde">>,
+    MessageName = <<"Person">>,
+    protobuf_create_serde(SerdeName),
+
+    Name1 = <<"foo">>,
+    Check1 = schema_check(protobuf, SerdeName, #{<<"message_name">> => MessageName}),
+    Validation1 = validation(Name1, [Check1]),
+    {201, _} = insert(Validation1),
+
+    C = connect(<<"c1">>),
+    {ok, _, [_]} = emqtt:subscribe(C, <<"t/#">>),
+
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, {raw, Payload}),
+            ?assertReceive({publish, _})
+        end,
+        protobuf_valid_payloads(SerdeName, MessageName)
+    ),
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, {raw, Payload}),
+            ?assertNotReceive({publish, _})
+        end,
+        protobuf_invalid_payloads()
+    ),
+
+    %% Bad config: unknown message name
+    Check2 = schema_check(protobuf, SerdeName, #{<<"message_name">> => <<"idontexist">>}),
+    Validation2 = validation(Name1, [Check2]),
+    {200, _} = update(Validation2),
+
+    lists:foreach(
+        fun(Payload) ->
+            ok = publish(C, <<"t/1">>, {raw, Payload}),
+            ?assertNotReceive({publish, _})
+        end,
+        protobuf_valid_payloads(SerdeName, MessageName)
+    ),
+
+    ok.

+ 194 - 0
apps/emqx_message_validation/test/emqx_message_validation_tests.erl

@@ -0,0 +1,194 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_message_validation_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(VALIDATIONS_PATH, "message_validation.validations").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+parse_and_check(InnerConfigs) ->
+    RootBin = <<"message_validation">>,
+    InnerBin = <<"validations">>,
+    RawConf = #{RootBin => #{InnerBin => InnerConfigs}},
+    #{RootBin := #{InnerBin := Checked}} = hocon_tconf:check_plain(
+        emqx_message_validation_schema,
+        RawConf,
+        #{
+            required => false,
+            atom_key => false,
+            make_serializable => false
+        }
+    ),
+    Checked.
+
+validation(Name, Checks) ->
+    validation(Name, Checks, _Overrides = #{}).
+
+validation(Name, Checks, Overrides) ->
+    Default = #{
+        <<"tags">> => [<<"some">>, <<"tags">>],
+        <<"description">> => <<"my validation">>,
+        <<"enable">> => true,
+        <<"name">> => Name,
+        <<"topics">> => <<"t/+">>,
+        <<"strategy">> => <<"all_pass">>,
+        <<"failure_action">> => <<"drop">>,
+        <<"log_failure_at">> => <<"warning">>,
+        <<"checks">> => Checks
+    },
+    emqx_utils_maps:deep_merge(Default, Overrides).
+
+sql_check() ->
+    sql_check(<<"select * where true">>).
+
+sql_check(SQL) ->
+    #{
+        <<"type">> => <<"sql">>,
+        <<"sql">> => SQL
+    }.
+
+eval_sql(Message, SQL) ->
+    {ok, Check} = emqx_message_validation:parse_sql_check(SQL),
+    emqx_message_validation:evaluate_sql_check(Check, Message).
+
+message() ->
+    message(_Opts = #{}).
+
+message(Opts) ->
+    Defaults = #{
+        id => emqx_guid:gen(),
+        qos => 0,
+        from => emqx_guid:to_hexstr(emqx_guid:gen()),
+        flags => #{retain => false},
+        headers => #{
+            proto_ver => v5,
+            properties => #{'User-Property' => [{<<"a">>, <<"b">>}]}
+        },
+        topic => <<"t/t">>,
+        payload => emqx_utils_json:encode(#{value => 10}),
+        timestamp => 1710272561615,
+        extra => []
+    },
+    emqx_message:from_map(emqx_utils_maps:deep_merge(Defaults, Opts)).
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+schema_test_() ->
+    [
+        {"topics is always a list 1",
+            ?_assertMatch(
+                [#{<<"topics">> := [<<"t/1">>]}],
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [sql_check()],
+                        #{<<"topics">> => <<"t/1">>}
+                    )
+                ])
+            )},
+        {"topics is always a list 2",
+            ?_assertMatch(
+                [#{<<"topics">> := [<<"t/1">>]}],
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [sql_check()],
+                        #{<<"topics">> => [<<"t/1">>]}
+                    )
+                ])
+            )},
+        {"foreach expression is not allowed",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := foreach_not_allowed,
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [sql_check(<<"foreach foo as f where true">>)]
+                    )
+                ])
+            )},
+        {"from clause is not allowed",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := non_empty_from_clause,
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        [sql_check(<<"select * from t">>)]
+                    )
+                ])
+            )},
+        {"names are unique",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := <<"duplicated name:", _/binary>>,
+                        path := ?VALIDATIONS_PATH,
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    validation(<<"foo">>, [sql_check()]),
+                    validation(<<"foo">>, [sql_check()])
+                ])
+            )},
+        {"checks must be non-empty",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        reason := "at least one check must be defined",
+                        kind := validation_error
+                    }
+                ]},
+                parse_and_check([
+                    validation(
+                        <<"foo">>,
+                        []
+                    )
+                ])
+            )},
+        {"bogus check type",
+            ?_assertThrow(
+                {_Schema, [
+                    #{
+                        expected := <<"sql", _/binary>>,
+                        kind := validation_error,
+                        field_name := type
+                    }
+                ]},
+                parse_and_check([validation(<<"foo">>, [#{<<"type">> => <<"foo">>}])])
+            )}
+    ].
+
+check_test_() ->
+    [
+        {"denied by payload 1",
+            ?_assertNot(eval_sql(message(), <<"select * where payload.value > 15">>))},
+        {"denied by payload 2",
+            ?_assertNot(eval_sql(message(), <<"select payload.value as x where x > 15">>))},
+        {"allowed by payload 1",
+            ?_assert(eval_sql(message(), <<"select * where payload.value > 5">>))},
+        {"allowed by payload 2",
+            ?_assert(eval_sql(message(), <<"select payload.value as x where x > 5">>))},
+        {"always passes 1", ?_assert(eval_sql(message(), <<"select * where true">>))},
+        {"always passes 2", ?_assert(eval_sql(message(), <<"select * where 1 = 1">>))},
+        {"never passes 1", ?_assertNot(eval_sql(message(), <<"select * where false">>))},
+        {"never passes 2", ?_assertNot(eval_sql(message(), <<"select * where 1 = 2">>))},
+        {"never passes 3", ?_assertNot(eval_sql(message(), <<"select * where true and false">>))}
+    ].

+ 0 - 6
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -51,12 +51,6 @@
     get_rules_ordered_by_ts/0
 ]).
 
-%% exported for cluster_call
--export([
-    do_delete_rule/1,
-    do_insert_rule/1
-]).
-
 -export([
     load_hooks_for_rule/1,
     unload_hooks_for_rule/1,

+ 2 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_app.erl

@@ -41,4 +41,5 @@ stop(_State) ->
     RulePath = [RuleEngine | _] = ?KEY_PATH,
     emqx_conf:remove_handler(RulePath ++ ['?']),
     emqx_conf:remove_handler([RuleEngine]),
-    ok = emqx_rule_events:unload().
+    ok = emqx_rule_events:unload(),
+    ok.

+ 2 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl

@@ -38,7 +38,8 @@ namespace() -> rule_engine.
 tags() ->
     [<<"Rule Engine">>].
 
-roots() -> [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
+roots() ->
+    [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
 
 fields("rule_engine") ->
     rule_engine_settings() ++

+ 20 - 8
apps/emqx_rule_engine/src/emqx_rule_engine_sup.erl

@@ -28,13 +28,25 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Registry = #{
-        id => emqx_rule_engine,
-        start => {emqx_rule_engine, start_link, []},
+    RuleEngineRegistry = worker_spec(emqx_rule_engine),
+    RuleEngineMetrics = emqx_metrics_worker:child_spec(rule_metrics),
+    SupFlags = #{
+        strategy => one_for_one,
+        intensity => 10,
+        period => 10
+    },
+    Children = [
+        RuleEngineRegistry,
+        RuleEngineMetrics
+    ],
+    {ok, {SupFlags, Children}}.
+
+worker_spec(Mod) ->
+    #{
+        id => Mod,
+        start => {Mod, start_link, []},
         restart => permanent,
-        shutdown => 5000,
+        shutdown => 5_000,
         type => worker,
-        modules => [emqx_rule_engine]
-    },
-    Metrics = emqx_metrics_worker:child_spec(rule_metrics),
-    {ok, {{one_for_one, 10, 10}, [Registry, Metrics]}}.
+        modules => [Mod]
+    }.

+ 42 - 19
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -26,6 +26,9 @@
     inc_action_metrics/2
 ]).
 
+%% Internal exports used by message validation
+-export([evaluate_select/3, clear_rule_payload/0]).
+
 -import(
     emqx_rule_maps,
     [
@@ -129,27 +132,16 @@ do_apply_rule(
     Columns,
     Envs
 ) ->
-    {Selected, Collection} = ?RAISE(
-        select_and_collect(Fields, Columns),
-        {select_and_collect_error, {EXCLASS, EXCPTION, ST}}
-    ),
-    ColumnsAndSelected = maps:merge(Columns, Selected),
-    case
-        ?RAISE(
-            match_conditions(Conditions, ColumnsAndSelected),
-            {match_conditions_error, {EXCLASS, EXCPTION, ST}}
-        )
-    of
-        true ->
-            Collection2 = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
-            case Collection2 of
+    case evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) of
+        {ok, ColumnsAndSelected, FinalCollection} ->
+            case FinalCollection of
                 [] ->
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
                 _ ->
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
             end,
             NewEnvs = maps:merge(ColumnsAndSelected, Envs),
-            {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- Collection2]};
+            {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
         false ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
@@ -165,6 +157,16 @@ do_apply_rule(
     Columns,
     Envs
 ) ->
+    case evaluate_select(Fields, Columns, Conditions) of
+        {ok, Selected} ->
+            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
+            {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
+        false ->
+            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
+            {error, nomatch}
+    end.
+
+evaluate_select(Fields, Columns, Conditions) ->
     Selected = ?RAISE(
         select_and_transform(Fields, Columns),
         {select_and_transform_error, {EXCLASS, EXCPTION, ST}}
@@ -176,11 +178,28 @@ do_apply_rule(
         )
     of
         true ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
-            {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
+            {ok, Selected};
         false ->
-            ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
-            {error, nomatch}
+            false
+    end.
+
+evaluate_foreach(Fields, Columns, Conditions, InCase, DoEach) ->
+    {Selected, Collection} = ?RAISE(
+        select_and_collect(Fields, Columns),
+        {select_and_collect_error, {EXCLASS, EXCPTION, ST}}
+    ),
+    ColumnsAndSelected = maps:merge(Columns, Selected),
+    case
+        ?RAISE(
+            match_conditions(Conditions, ColumnsAndSelected),
+            {match_conditions_error, {EXCLASS, EXCPTION, ST}}
+        )
+    of
+        true ->
+            FinalCollection = filter_collection(ColumnsAndSelected, InCase, DoEach, Collection),
+            {ok, ColumnsAndSelected, FinalCollection};
+        false ->
+            false
     end.
 
 clear_rule_payload() ->
@@ -281,6 +300,10 @@ match_conditions({'fun', {_, Name}, Args}, Data) ->
     apply_func(Name, [eval(Arg, Data) || Arg <- Args], Data);
 match_conditions({Op, L, R}, Data) when ?is_comp(Op) ->
     compare(Op, eval(L, Data), eval(R, Data));
+match_conditions({const, true}, _Data) ->
+    true;
+match_conditions({const, false}, _Data) ->
+    false;
 match_conditions({}, _Data) ->
     true.
 

+ 61 - 27
apps/emqx_rule_engine/src/emqx_rule_sqlparser.erl

@@ -18,7 +18,7 @@
 
 -include("rule_engine.hrl").
 
--export([parse/1]).
+-export([parse/1, parse/2]).
 
 -export([
     select_fields/1,
@@ -62,37 +62,29 @@
 
 -type field() :: const() | variable() | {as, field(), alias()} | sql_func().
 
+-type parse_opts() :: #{
+    %% Whether `from' clause should be mandatory.
+    %% Default: `true'.
+    with_from => boolean()
+}.
+
 -export_type([select/0]).
 
 %% Parse one select statement.
 -spec parse(string() | binary()) -> {ok, select()} | {error, term()}.
 parse(Sql) ->
-    try
-        case rulesql:parsetree(Sql) of
-            {ok, {select, Clauses}} ->
-                {ok, #select{
-                    is_foreach = false,
-                    fields = get_value(fields, Clauses),
-                    doeach = [],
-                    incase = {},
-                    from = get_value(from, Clauses),
-                    where = get_value(where, Clauses)
-                }};
-            {ok, {foreach, Clauses}} ->
-                {ok, #select{
-                    is_foreach = true,
-                    fields = get_value(fields, Clauses),
-                    doeach = get_value(do, Clauses, []),
-                    incase = get_value(incase, Clauses, {}),
-                    from = get_value(from, Clauses),
-                    where = get_value(where, Clauses)
-                }};
-            Error ->
-                {error, Error}
-        end
-    catch
-        _Error:Reason:StackTrace ->
-            {error, {Reason, StackTrace}}
+    parse(Sql, _Opts = #{}).
+
+-spec parse(string() | binary(), parse_opts()) -> {ok, select()} | {error, term()}.
+parse(Sql, Opts) ->
+    WithFrom = maps:get(with_from, Opts, true),
+    case do_parse(Sql) of
+        {ok, Parsed} when WithFrom ->
+            ensure_non_empty_from(Parsed);
+        {ok, Parsed} ->
+            ensure_empty_from(Parsed);
+        Error = {error, _} ->
+            Error
     end.
 
 -spec select_fields(select()) -> list(field()).
@@ -118,3 +110,45 @@ select_from(#select{from = From}) ->
 -spec select_where(select()) -> tuple().
 select_where(#select{where = Where}) ->
     Where.
+
+-spec do_parse(string() | binary()) -> {ok, select()} | {error, term()}.
+do_parse(Sql) ->
+    try
+        case rulesql:parsetree(Sql) of
+            {ok, {select, Clauses}} ->
+                Parsed = #select{
+                    is_foreach = false,
+                    fields = get_value(fields, Clauses),
+                    doeach = [],
+                    incase = {},
+                    from = get_value(from, Clauses),
+                    where = get_value(where, Clauses)
+                },
+                {ok, Parsed};
+            {ok, {foreach, Clauses}} ->
+                Parsed = #select{
+                    is_foreach = true,
+                    fields = get_value(fields, Clauses),
+                    doeach = get_value(do, Clauses, []),
+                    incase = get_value(incase, Clauses, {}),
+                    from = get_value(from, Clauses),
+                    where = get_value(where, Clauses)
+                },
+                {ok, Parsed};
+            Error ->
+                {error, Error}
+        end
+    catch
+        _Error:Reason:StackTrace ->
+            {error, {Reason, StackTrace}}
+    end.
+
+ensure_non_empty_from(#select{from = []}) ->
+    {error, empty_from_clause};
+ensure_non_empty_from(Parsed) ->
+    {ok, Parsed}.
+
+ensure_empty_from(#select{from = [_ | _]}) ->
+    {error, non_empty_from_clause};
+ensure_empty_from(Parsed) ->
+    {ok, Parsed}.

+ 12 - 5
apps/emqx_rule_engine/test/emqx_rule_funcs_SUITE.erl

@@ -27,12 +27,19 @@
 %%-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{on_output, fun ct:print/2}]))).
 
 init_per_suite(Config) ->
-    application:load(emqx_conf),
-    ConfigConf = <<"rule_engine {jq_function_default_timeout=10s}">>,
-    ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ConfigConf),
-    Config.
+    Apps = emqx_cth_suite:start(
+        [
+            emqx,
+            emqx_conf,
+            {emqx_rule_engine, "rule_engine {jq_function_default_timeout=10s}"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    [{apps, Apps} | Config].
 
-end_per_suite(_Config) ->
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
     ok.
 
 eventmsg_publish(Msg) ->

+ 13 - 1
apps/emqx_utils/src/emqx_utils.erl

@@ -66,7 +66,8 @@
     tcp_keepalive_opts/4,
     format/1,
     call_first_defined/1,
-    ntoa/1
+    ntoa/1,
+    foldl_while/3
 ]).
 
 -export([
@@ -176,6 +177,17 @@ pipeline([Fun | More], Input, State) ->
         {error, Reason, NState} -> {error, Reason, NState}
     end.
 
+-spec foldl_while(fun((X, Acc) -> {cont | halt, Acc}), Acc, [X]) -> Acc.
+foldl_while(_Fun, Acc, []) ->
+    Acc;
+foldl_while(Fun, Acc, [X | Xs]) ->
+    case Fun(X, Acc) of
+        {cont, NewAcc} ->
+            foldl_while(Fun, NewAcc, Xs);
+        {halt, NewAcc} ->
+            NewAcc
+    end.
+
 -compile({inline, [apply_fun/3]}).
 apply_fun(Fun, Input, State) ->
     case erlang:fun_info(Fun, arity) of

+ 3 - 1
apps/emqx_utils/src/emqx_utils_maps.erl

@@ -158,7 +158,9 @@ deep_convert(Val, _, _Args) ->
 unsafe_atom_key_map(Map) ->
     convert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
 
--spec binary_key_map(map()) -> map().
+-spec binary_key_map
+    (map()) -> map();
+    (list()) -> list().
 binary_key_map(Map) ->
     deep_convert(
         Map,

+ 34 - 0
apps/emqx_utils/test/emqx_utils_tests.erl

@@ -28,3 +28,37 @@ is_redacted_test_() ->
         ?_assert(emqx_utils:is_redacted(password, fun() -> <<"******">> end)),
         ?_assert(emqx_utils:is_redacted(password, emqx_secret:wrap(<<"******">>)))
     ].
+
+foldl_while_test_() ->
+    [
+        ?_assertEqual(
+            [3, 2, 1],
+            emqx_utils:foldl_while(fun(X, Acc) -> {cont, [X | Acc]} end, [], [1, 2, 3])
+        ),
+        ?_assertEqual(
+            [1],
+            emqx_utils:foldl_while(
+                fun
+                    (X, Acc) when X == 2 ->
+                        {halt, Acc};
+                    (X, Acc) ->
+                        {cont, [X | Acc]}
+                end,
+                [],
+                [1, 2, 3]
+            )
+        ),
+        ?_assertEqual(
+            finished,
+            emqx_utils:foldl_while(
+                fun
+                    (X, _Acc) when X == 3 ->
+                        {halt, finished};
+                    (X, Acc) ->
+                        {cont, [X | Acc]}
+                end,
+                [],
+                [1, 2, 3]
+            )
+        )
+    ].

+ 3 - 0
changes/ce/feat-12711.en.md

@@ -0,0 +1,3 @@
+Implemented message validation feature.
+
+With this feature, once validations are configured for certain topic filters, the configured checks are run against published messages and, if they are not accepted by a validation, the message is dropped and the client may be disconnected, depending on the configuration.

+ 2 - 1
mix.exs

@@ -65,7 +65,7 @@ defmodule EMQXUmbrella.MixProject do
       # maybe forbid to fetch quicer
       {:emqtt,
        github: "emqx/emqtt", tag: "1.10.1", override: true, system_env: maybe_no_quic_env()},
-      {:rulesql, github: "emqx/rulesql", tag: "0.1.8"},
+      {:rulesql, github: "emqx/rulesql", tag: "0.2.0"},
       {:observer_cli, "1.7.1"},
       {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
       {:telemetry, "1.1.0"},
@@ -184,6 +184,7 @@ defmodule EMQXUmbrella.MixProject do
       :emqx_s3,
       :emqx_bridge_s3,
       :emqx_schema_registry,
+      :emqx_message_validation,
       :emqx_enterprise,
       :emqx_bridge_kinesis,
       :emqx_bridge_azure_event_hub,

+ 1 - 1
rebar.config

@@ -91,7 +91,7 @@
     {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}},
-    {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.8"}}},
+    {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}},
     % NOTE: depends on recon 2.5.x
     {observer_cli, "1.7.1"},
     {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}},

+ 1 - 0
rebar.config.erl

@@ -115,6 +115,7 @@ is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
 is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
 is_community_umbrella_app("apps/emqx_gateway_jt808") -> false;
 is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
+is_community_umbrella_app("apps/emqx_message_validation") -> false;
 is_community_umbrella_app(_) -> true.
 
 %% BUILD_WITHOUT_JQ

+ 24 - 0
rel/i18n/emqx_message_validation_http_api.hocon

@@ -0,0 +1,24 @@
+emqx_message_validation_http_api {
+
+  list_validations.desc:
+  """List validations"""
+
+  lookup_validation.desc:
+  """Lookup a validation"""
+
+  update_validation.desc:
+  """Update a validation"""
+
+  delete_validation.desc:
+  """Delete a validation"""
+
+  append_validation.desc:
+  """Append a new validation to the list of validations"""
+
+  move_validation.desc:
+  """Change the order of a validation in the list of validations"""
+
+  param_path_name.desc:
+  """Validation name"""
+
+}

+ 88 - 0
rel/i18n/emqx_message_validation_schema.hocon

@@ -0,0 +1,88 @@
+emqx_message_validation_schema {
+
+  check_avro_type.desc:
+  """Avro schema check"""
+  check_avro_type.label:
+  """Avro schema check"""
+
+  check_avro_schema.desc:
+  """Schema name to use during check."""
+  check_avro_schema.label:
+  """Schema name"""
+
+  check_json_type.desc:
+  """JSON schema check"""
+  check_json_type.label:
+  """JSON schema check"""
+
+  check_json_schema.desc:
+  """Schema name to use during check."""
+  check_json_schema.label:
+  """Schema name"""
+
+  check_protobuf_type.desc:
+  """Protobuf schema check"""
+  check_protobuf_type.label:
+  """Protobuf schema check"""
+
+  check_protobuf_schema.desc:
+  """Schema name to use during check."""
+  check_protobuf_schema.label:
+  """Schema name"""
+
+  check_protobuf_message_name.desc:
+  """Message name to use during check."""
+  check_protobuf_message_name.label:
+  """Message name"""
+
+  check_sql_type.desc:
+  """Use rule-engine's SQL to validate the message. SQL here is the same as in rule-engine,
+  just with the different that the `FROM` clause must be omitted.
+  A SQL statement which yields any value is considered successfully validated, otherwise failed.
+  For example <code>SELECT payload.foo + payload.bar as sum WHERE sum > 0</code>
+  validates that the sum of field `foo` and `bar` is a positive value."""
+  check_sql_type.label:
+  """SQL schema check"""
+
+  check_sql_schema.desc:
+  """Schema name to use during check."""
+  check_sql_schema.label:
+  """Schema name"""
+
+  topics.desc:
+  """A single topic filter or list of topic filters that this validation should validate."""
+  topics.label:
+  """Topic filter(s)"""
+
+  name.desc:
+  """The name for this validation.  Must be unique among all validations."""
+  name.desc:
+  """Name"""
+
+  strategy.desc:
+  """How the validation should consider the checks to be successful.
+
+  <code>all_pass</code>: All checks will be evaluated and must pass.
+  <code>any_pass</code>: Any passing check will suffice.  Stops at the first success."""
+  strategy.desc:
+  """Strategy"""
+
+  failure_action.desc:
+  """How to proceed if the validation fails.
+
+  <code>drop</code>: The offending message is simply dropped without further processing.
+  <code>disconnect</code>: The message is not published, and the publishing client is disconnected."""
+  failure_action.label:
+  """Failure action"""
+
+  log_failure_at.desc:
+  """Log level at which failures will be logged."""
+  log_failure_at.label:
+  """Failure log level"""
+
+  checks.desc:
+  """Checks that will be performed during validation.  They are evaluated in the same order as defined."""
+  checks.label:
+  """Checks"""
+
+}