Explorar o código

Merge pull request #12827 from kjellwinblad/kjell/emqx_rule_tracing/EMQX-11966

Add rule ID tracing support
Kjell Winblad hai 1 ano
pai
achega
2a2fadfbff
Modificáronse 31 ficheiros con 1764 adicións e 300 borrados
  1. 7 2
      apps/emqx/include/emqx_trace.hrl
  2. 28 1
      apps/emqx/src/emqx_trace/emqx_trace.erl
  3. 21 3
      apps/emqx/src/emqx_trace/emqx_trace_handler.erl
  4. 9 1
      apps/emqx_bridge/src/emqx_action_info.erl
  5. 27 0
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  6. 115 133
      apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl
  7. 161 0
      apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl
  8. 4 2
      apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl
  9. 9 1
      apps/emqx_connector/src/emqx_connector_info.erl
  10. 10 1
      apps/emqx_management/src/emqx_mgmt_api_trace.erl
  11. 4 1
      apps/emqx_management/src/emqx_mgmt_cli.erl
  12. 65 2
      apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl
  13. 142 39
      apps/emqx_resource/src/emqx_resource_buffer_worker.erl
  14. 27 28
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  15. 10 1
      apps/emqx_rule_engine/rebar.config
  16. 38 21
      apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
  17. 3 1
      apps/emqx_rule_engine/src/emqx_rule_engine.app.src
  18. 38 0
      apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
  19. 228 58
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl
  20. 65 1
      apps/emqx_rule_engine/src/emqx_rule_sqltester.erl
  21. 111 2
      apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl
  22. 378 0
      apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl
  23. 101 0
      apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl
  24. 100 0
      apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl
  25. 43 0
      apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl
  26. 1 0
      changes/ce/feat-12827.en.md
  27. 1 1
      mix.exs
  28. 1 1
      rebar.config
  29. 5 0
      rel/i18n/emqx_mgmt_api_trace.hocon
  30. 6 0
      rel/i18n/emqx_rule_api_schema.hocon
  31. 6 0
      rel/i18n/emqx_rule_engine_api.hocon

+ 7 - 2
apps/emqx/include/emqx_trace.hrl

@@ -20,9 +20,14 @@
 
 -record(?TRACE, {
     name :: binary() | undefined | '_',
-    type :: clientid | topic | ip_address | undefined | '_',
+    type :: clientid | topic | ip_address | ruleid | undefined | '_',
     filter ::
-        emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_',
+        emqx_types:topic()
+        | emqx_types:clientid()
+        | emqx_trace:ip_address()
+        | emqx_trace:ruleid()
+        | undefined
+        | '_',
     enable = true :: boolean() | '_',
     payload_encode = text :: hex | text | hidden | '_',
     extra = #{} :: map() | '_',

+ 28 - 1
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -28,7 +28,8 @@
     subscribe/3,
     unsubscribe/2,
     log/3,
-    log/4
+    log/4,
+    rendered_action_template/2
 ]).
 
 -export([
@@ -66,6 +67,9 @@
 -export_type([ip_address/0]).
 -type ip_address() :: string().
 
+-export_type([ruleid/0]).
+-type ruleid() :: binary().
+
 publish(#message{topic = <<"$SYS/", _/binary>>}) ->
     ignore;
 publish(#message{from = From, topic = Topic, payload = Payload}) when
@@ -83,6 +87,26 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) ->
 unsubscribe(Topic, SubOpts) ->
     ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
 
+rendered_action_template(ActionID, RenderResult) ->
+    Msg = lists:flatten(io_lib:format("action_template_rendered(~ts)", [ActionID])),
+    TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult),
+    case logger:get_process_metadata() of
+        #{stop_action_after_render := true} ->
+            %% We throw an unrecoverable error to stop action before the
+            %% resource is called/modified
+            StopMsg = lists:flatten(
+                io_lib:format(
+                    "Action ~ts stopped after template rendering due to test setting.",
+                    [ActionID]
+                )
+            ),
+            MsgBin = unicode:characters_to_binary(StopMsg),
+            error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}});
+        _ ->
+            ok
+    end,
+    TraceResult.
+
 log(List, Msg, Meta) ->
     log(debug, List, Msg, Meta).
 
@@ -517,6 +541,9 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
         Error ->
             Error
     end;
+to_trace(#{type := ruleid, ruleid := Filter} = Trace, Rec) ->
+    Trace0 = maps:without([type, ruleid], Trace),
+    to_trace(Trace0, Rec#?TRACE{type = ruleid, filter = Filter});
 to_trace(#{type := Type}, _Rec) ->
     {error, io_lib:format("required ~s field", [Type])};
 to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) ->

+ 21 - 3
apps/emqx/src/emqx_trace/emqx_trace_handler.erl

@@ -33,6 +33,7 @@
 
 %% For logger handler filters callbacks
 -export([
+    filter_ruleid/2,
     filter_clientid/2,
     filter_topic/2,
     filter_ip_address/2
@@ -133,9 +134,23 @@ uninstall(HandlerId) ->
 running() ->
     lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
 
+-spec filter_ruleid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
+filter_ruleid(#{meta := Meta = #{rule_id := RuleId}} = Log, {MatchId, _Name}) ->
+    RuleIDs = maps:get(rule_ids, Meta, #{}),
+    IsMatch = (RuleId =:= MatchId) orelse maps:get(MatchId, RuleIDs, false),
+    filter_ret(IsMatch andalso is_trace(Meta), Log);
+filter_ruleid(#{meta := Meta = #{rule_ids := RuleIDs}} = Log, {MatchId, _Name}) ->
+    filter_ret(maps:get(MatchId, RuleIDs, false) andalso is_trace(Meta), Log);
+filter_ruleid(_Log, _ExpectId) ->
+    stop.
+
 -spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
 filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) ->
-    filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log);
+    ClientIDs = maps:get(client_ids, Meta, #{}),
+    IsMatch = (ClientId =:= MatchId) orelse maps:get(MatchId, ClientIDs, false),
+    filter_ret(IsMatch andalso is_trace(Meta), Log);
+filter_clientid(#{meta := Meta = #{client_ids := ClientIDs}} = Log, {MatchId, _Name}) ->
+    filter_ret(maps:get(MatchId, ClientIDs, false) andalso is_trace(Meta), Log);
 filter_clientid(_Log, _ExpectId) ->
     stop.
 
@@ -164,7 +179,9 @@ filters(#{type := clientid, filter := Filter, name := Name}) ->
 filters(#{type := topic, filter := Filter, name := Name}) ->
     [{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}];
 filters(#{type := ip_address, filter := Filter, name := Name}) ->
-    [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
+    [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}];
+filters(#{type := ruleid, filter := Filter, name := Name}) ->
+    [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}].
 
 formatter(#{type := _Type, payload_encode := PayloadEncode}) ->
     {emqx_trace_formatter, #{
@@ -184,7 +201,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc)
         [{Type, {FilterFun, {Filter, Name}}}] when
             Type =:= topic orelse
                 Type =:= clientid orelse
-                Type =:= ip_address
+                Type =:= ip_address orelse
+                Type =:= ruleid
         ->
             [Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc];
         _ ->

+ 9 - 1
apps/emqx_bridge/src/emqx_action_info.erl

@@ -41,6 +41,9 @@
 ]).
 -export([clean_cache/0]).
 
+%% For tests
+-export([hard_coded_test_action_info_modules/0]).
+
 -callback bridge_v1_type_name() ->
     atom()
     | {
@@ -128,8 +131,13 @@ hard_coded_action_info_modules_common() ->
         emqx_bridge_mqtt_pubsub_action_info
     ].
 
+%% This exists so that it can be mocked for test cases
+hard_coded_test_action_info_modules() -> [].
+
 hard_coded_action_info_modules() ->
-    hard_coded_action_info_modules_common() ++ hard_coded_action_info_modules_ee().
+    hard_coded_action_info_modules_common() ++
+        hard_coded_action_info_modules_ee() ++
+        ?MODULE:hard_coded_test_action_info_modules().
 
 %% ====================================================================
 %% API

+ 27 - 0
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -371,6 +371,7 @@ on_query(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
+    trace_rendered_action_template(InstId, Method, NRequest, Timeout),
     Worker = resolve_pool_worker(State, KeyOrNum),
     Result0 = ehttpc:request(
         Worker,
@@ -480,6 +481,7 @@ on_query_async(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
+    trace_rendered_action_template(InstId, Method, NRequest, Timeout),
     MaxAttempts = maps:get(max_attempts, State, 3),
     Context = #{
         attempt => 1,
@@ -499,6 +501,31 @@ on_query_async(
     ),
     {ok, Worker}.
 
+trace_rendered_action_template(InstId, Method, NRequest, Timeout) ->
+    case NRequest of
+        {Path, Headers} ->
+            emqx_trace:rendered_action_template(
+                InstId,
+                #{
+                    path => Path,
+                    method => Method,
+                    headers => emqx_utils_redact:redact_headers(Headers),
+                    timeout => Timeout
+                }
+            );
+        {Path, Headers, Body} ->
+            emqx_trace:rendered_action_template(
+                InstId,
+                #{
+                    path => Path,
+                    method => Method,
+                    headers => emqx_utils_redact:redact_headers(Headers),
+                    timeout => Timeout,
+                    body => Body
+                }
+            )
+    end.
+
 resolve_pool_worker(State, undefined) ->
     resolve_pool_worker(State, self());
 resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->

+ 115 - 133
apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl

@@ -30,8 +30,8 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx/include/asserts.hrl").
 
--define(BRIDGE_TYPE, <<"webhook">>).
--define(BRIDGE_NAME, atom_to_binary(?MODULE)).
+-define(BRIDGE_TYPE, emqx_bridge_http_test_lib:bridge_type()).
+-define(BRIDGE_NAME, emqx_bridge_http_test_lib:bridge_name()).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -73,21 +73,10 @@ suite() ->
 
 init_per_testcase(t_bad_bridge_config, Config) ->
     Config;
-init_per_testcase(t_send_async_connection_timeout, Config) ->
-    HTTPPath = <<"/path">>,
-    ServerSSLOpts = false,
-    {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
-        _Port = random, HTTPPath, ServerSSLOpts
-    ),
-    ResponseDelayMS = 500,
-    ok = emqx_bridge_http_connector_test_server:set_handler(
-        success_http_handler(#{response_delay => ResponseDelayMS})
-    ),
-    [
-        {http_server, #{port => HTTPPort, path => HTTPPath}},
-        {response_delay_ms, ResponseDelayMS}
-        | Config
-    ];
+init_per_testcase(Case, Config) when
+    Case =:= t_send_async_connection_timeout orelse Case =:= t_send_get_trace_messages
+->
+    emqx_bridge_http_test_lib:init_http_success_server(Config);
 init_per_testcase(t_path_not_found, Config) ->
     HTTPPath = <<"/nonexisting/path">>,
     ServerSSLOpts = false,
@@ -115,7 +104,9 @@ init_per_testcase(t_bridge_probes_header_atoms, Config) ->
     {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
         _Port = random, HTTPPath, ServerSSLOpts
     ),
-    ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
+    ok = emqx_bridge_http_connector_test_server:set_handler(
+        emqx_bridge_http_test_lib:success_http_handler()
+    ),
     [{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
 init_per_testcase(_TestCase, Config) ->
     Server = start_http_server(#{response_delay_ms => 0}),
@@ -126,7 +117,8 @@ end_per_testcase(TestCase, _Config) when
     TestCase =:= t_too_many_requests;
     TestCase =:= t_rule_action_expired;
     TestCase =:= t_bridge_probes_header_atoms;
-    TestCase =:= t_send_async_connection_timeout
+    TestCase =:= t_send_async_connection_timeout;
+    TestCase =:= t_send_get_trace_messages
 ->
     ok = emqx_bridge_http_connector_test_server:stop(),
     persistent_term:erase({?MODULE, times_called}),
@@ -250,115 +242,8 @@ get_metrics(Name) ->
     Type = <<"http">>,
     emqx_bridge:get_metrics(Type, Name).
 
-bridge_async_config(#{port := Port} = Config) ->
-    Type = maps:get(type, Config, ?BRIDGE_TYPE),
-    Name = maps:get(name, Config, ?BRIDGE_NAME),
-    Host = maps:get(host, Config, "localhost"),
-    Path = maps:get(path, Config, ""),
-    PoolSize = maps:get(pool_size, Config, 1),
-    QueryMode = maps:get(query_mode, Config, "async"),
-    ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
-    RequestTimeout = maps:get(request_timeout, Config, "10s"),
-    ResumeInterval = maps:get(resume_interval, Config, "1s"),
-    HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"),
-    ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
-    LocalTopic =
-        case maps:find(local_topic, Config) of
-            {ok, LT} ->
-                lists:flatten(["local_topic = \"", LT, "\""]);
-            error ->
-                ""
-        end,
-    ConfigString = io_lib:format(
-        "bridges.~s.~s {\n"
-        "  url = \"http://~s:~p~s\"\n"
-        "  connect_timeout = \"~p\"\n"
-        "  enable = true\n"
-        %% local_topic
-        "  ~s\n"
-        "  enable_pipelining = 100\n"
-        "  max_retries = 2\n"
-        "  method = \"post\"\n"
-        "  pool_size = ~p\n"
-        "  pool_type = \"random\"\n"
-        "  request_timeout = \"~s\"\n"
-        "  body = \"${id}\"\n"
-        "  resource_opts {\n"
-        "    inflight_window = 100\n"
-        "    health_check_interval = \"~s\"\n"
-        "    max_buffer_bytes = \"1GB\"\n"
-        "    query_mode = \"~s\"\n"
-        "    request_ttl = \"~p\"\n"
-        "    resume_interval = \"~s\"\n"
-        "    start_after_created = \"true\"\n"
-        "    start_timeout = \"5s\"\n"
-        "    worker_pool_size = \"1\"\n"
-        "  }\n"
-        "  ssl {\n"
-        "    enable = false\n"
-        "  }\n"
-        "}\n",
-        [
-            Type,
-            Name,
-            Host,
-            Port,
-            Path,
-            ConnectTimeout,
-            LocalTopic,
-            PoolSize,
-            RequestTimeout,
-            HealthCheckInterval,
-            QueryMode,
-            ResourceRequestTTL,
-            ResumeInterval
-        ]
-    ),
-    ct:pal(ConfigString),
-    parse_and_check(ConfigString, Type, Name).
-
-parse_and_check(ConfigString, BridgeType, Name) ->
-    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
-    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
-    #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
-    RetConfig.
-
 make_bridge(Config) ->
-    Type = ?BRIDGE_TYPE,
-    Name = ?BRIDGE_NAME,
-    BridgeConfig = bridge_async_config(Config#{
-        name => Name,
-        type => Type
-    }),
-    {ok, _} = emqx_bridge:create(
-        Type,
-        Name,
-        BridgeConfig
-    ),
-    emqx_bridge_resource:bridge_id(Type, Name).
-
-success_http_handler() ->
-    success_http_handler(#{response_delay => 0}).
-
-success_http_handler(Opts) ->
-    ResponseDelay = maps:get(response_delay, Opts, 0),
-    TestPid = self(),
-    fun(Req0, State) ->
-        {ok, Body, Req} = cowboy_req:read_body(Req0),
-        Headers = cowboy_req:headers(Req),
-        ct:pal("http request received: ~p", [
-            #{body => Body, headers => Headers, response_delay => ResponseDelay}
-        ]),
-        ResponseDelay > 0 andalso timer:sleep(ResponseDelay),
-        TestPid ! {http, Headers, Body},
-        Rep = cowboy_req:reply(
-            200,
-            #{<<"content-type">> => <<"text/plain">>},
-            <<"hello">>,
-            Req
-        ),
-        {ok, Rep, State}
-    end.
+    emqx_bridge_http_test_lib:make_bridge(Config).
 
 not_found_http_handler() ->
     TestPid = self(),
@@ -452,6 +337,103 @@ t_send_async_connection_timeout(Config) ->
     receive_request_notifications(MessageIDs, ResponseDelayMS, []),
     ok.
 
+t_send_get_trace_messages(Config) ->
+    ResponseDelayMS = ?config(response_delay_ms, Config),
+    #{port := Port, path := Path} = ?config(http_server, Config),
+    BridgeID = make_bridge(#{
+        port => Port,
+        path => Path,
+        pool_size => 1,
+        query_mode => "async",
+        connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms",
+        request_timeout => "10s",
+        resume_interval => "200ms",
+        health_check_interval => "200ms",
+        resource_request_ttl => "infinity"
+    }),
+    RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
+    SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
+    {ok, #{<<"id">> := RuleId}} =
+        emqx_bridge_testlib:create_rule_and_action_http(
+            ?BRIDGE_TYPE,
+            RuleTopic,
+            Config,
+            #{sql => SQL}
+        ),
+    %% ===================================
+    %% Create trace for RuleId
+    %% ===================================
+    Now = erlang:system_time(second) - 10,
+    Start = Now,
+    End = Now + 60,
+    TraceName = atom_to_binary(?FUNCTION_NAME),
+    Trace = #{
+        name => TraceName,
+        type => ruleid,
+        ruleid => RuleId,
+        start_at => Start,
+        end_at => End
+    },
+    emqx_trace_SUITE:reload(),
+    ok = emqx_trace:clear(),
+    {ok, _} = emqx_trace:create(Trace),
+    %% ===================================
+
+    ResourceId = emqx_bridge_resource:resource_id(BridgeID),
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+    ),
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        ?assertEqual(<<>>, read_rule_trace_file(TraceName, Now))
+    ),
+    Msg = emqx_message:make(RuleTopic, <<"{\"id\": 1}">>),
+    emqx:publish(Msg),
+    ?retry(
+        _Interval = 500,
+        _NAttempts = 20,
+        ?assertMatch(
+            #{
+                counters := #{
+                    'matched' := 1,
+                    'actions.failed' := 0,
+                    'actions.failed.unknown' := 0,
+                    'actions.success' := 1,
+                    'actions.total' := 1
+                }
+            },
+            emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
+        )
+    ),
+
+    ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid),
+    {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, Now)),
+
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        begin
+            Bin = read_rule_trace_file(TraceName, Now),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
+        end
+    ),
+    emqx_trace:delete(TraceName),
+    ok.
+
+read_rule_trace_file(TraceName, From) ->
+    emqx_trace:check(),
+    ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid),
+    {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)),
+    Bin.
+
 t_async_free_retries(Config) ->
     #{port := Port} = ?config(http_server, Config),
     _BridgeID = make_bridge(#{
@@ -518,7 +500,7 @@ t_async_common_retries(Config) ->
     ok.
 
 t_bad_bridge_config(_Config) ->
-    BridgeConfig = bridge_async_config(#{port => 12345}),
+    BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{port => 12345}),
     ?assertMatch(
         {ok,
             {{_, 201, _}, _Headers, #{
@@ -540,7 +522,7 @@ t_bad_bridge_config(_Config) ->
 
 t_start_stop(Config) ->
     #{port := Port} = ?config(http_server, Config),
-    BridgeConfig = bridge_async_config(#{
+    BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
         type => ?BRIDGE_TYPE,
         name => ?BRIDGE_NAME,
         port => Port
@@ -554,7 +536,7 @@ t_path_not_found(Config) ->
         begin
             #{port := Port, path := Path} = ?config(http_server, Config),
             MQTTTopic = <<"t/webhook">>,
-            BridgeConfig = bridge_async_config(#{
+            BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
                 type => ?BRIDGE_TYPE,
                 name => ?BRIDGE_NAME,
                 local_topic => MQTTTopic,
@@ -593,7 +575,7 @@ t_too_many_requests(Config) ->
         begin
             #{port := Port, path := Path} = ?config(http_server, Config),
             MQTTTopic = <<"t/webhook">>,
-            BridgeConfig = bridge_async_config(#{
+            BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
                 type => ?BRIDGE_TYPE,
                 name => ?BRIDGE_NAME,
                 local_topic => MQTTTopic,
@@ -633,7 +615,7 @@ t_rule_action_expired(Config) ->
     ?check_trace(
         begin
             RuleTopic = <<"t/webhook/rule">>,
-            BridgeConfig = bridge_async_config(#{
+            BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
                 type => ?BRIDGE_TYPE,
                 name => ?BRIDGE_NAME,
                 host => "non.existent.host",
@@ -689,7 +671,7 @@ t_bridge_probes_header_atoms(Config) ->
     ?check_trace(
         begin
             LocalTopic = <<"t/local/topic">>,
-            BridgeConfig0 = bridge_async_config(#{
+            BridgeConfig0 = emqx_bridge_http_test_lib:bridge_async_config(#{
                 type => ?BRIDGE_TYPE,
                 name => ?BRIDGE_NAME,
                 port => Port,

+ 161 - 0
apps/emqx_bridge_http/test/emqx_bridge_http_test_lib.erl

@@ -0,0 +1,161 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_http_test_lib).
+
+-export([
+    bridge_type/0,
+    bridge_name/0,
+    make_bridge/1,
+    bridge_async_config/1,
+    init_http_success_server/1,
+    success_http_handler/0
+]).
+
+-define(BRIDGE_TYPE, bridge_type()).
+-define(BRIDGE_NAME, bridge_name()).
+
+bridge_type() ->
+    <<"webhook">>.
+
+bridge_name() ->
+    atom_to_binary(?MODULE).
+
+make_bridge(Config) ->
+    Type = ?BRIDGE_TYPE,
+    Name = ?BRIDGE_NAME,
+    BridgeConfig = bridge_async_config(Config#{
+        name => Name,
+        type => Type
+    }),
+    {ok, _} = emqx_bridge:create(
+        Type,
+        Name,
+        BridgeConfig
+    ),
+    emqx_bridge_resource:bridge_id(Type, Name).
+
+bridge_async_config(#{port := Port} = Config) ->
+    Type = maps:get(type, Config, ?BRIDGE_TYPE),
+    Name = maps:get(name, Config, ?BRIDGE_NAME),
+    Host = maps:get(host, Config, "localhost"),
+    Path = maps:get(path, Config, ""),
+    PoolSize = maps:get(pool_size, Config, 1),
+    QueryMode = maps:get(query_mode, Config, "async"),
+    ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
+    RequestTimeout = maps:get(request_timeout, Config, "10s"),
+    ResumeInterval = maps:get(resume_interval, Config, "1s"),
+    HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"),
+    ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
+    LocalTopic =
+        case maps:find(local_topic, Config) of
+            {ok, LT} ->
+                lists:flatten(["local_topic = \"", LT, "\""]);
+            error ->
+                ""
+        end,
+    ConfigString = io_lib:format(
+        "bridges.~s.~s {\n"
+        "  url = \"http://~s:~p~s\"\n"
+        "  connect_timeout = \"~p\"\n"
+        "  enable = true\n"
+        %% local_topic
+        "  ~s\n"
+        "  enable_pipelining = 100\n"
+        "  max_retries = 2\n"
+        "  method = \"post\"\n"
+        "  pool_size = ~p\n"
+        "  pool_type = \"random\"\n"
+        "  request_timeout = \"~s\"\n"
+        "  body = \"${id}\"\n"
+        "  resource_opts {\n"
+        "    inflight_window = 100\n"
+        "    health_check_interval = \"~s\"\n"
+        "    max_buffer_bytes = \"1GB\"\n"
+        "    query_mode = \"~s\"\n"
+        "    request_ttl = \"~p\"\n"
+        "    resume_interval = \"~s\"\n"
+        "    start_after_created = \"true\"\n"
+        "    start_timeout = \"5s\"\n"
+        "    worker_pool_size = \"1\"\n"
+        "  }\n"
+        "  ssl {\n"
+        "    enable = false\n"
+        "  }\n"
+        "}\n",
+        [
+            Type,
+            Name,
+            Host,
+            Port,
+            Path,
+            ConnectTimeout,
+            LocalTopic,
+            PoolSize,
+            RequestTimeout,
+            HealthCheckInterval,
+            QueryMode,
+            ResourceRequestTTL,
+            ResumeInterval
+        ]
+    ),
+    ct:pal(ConfigString),
+    parse_and_check(ConfigString, Type, Name).
+
+parse_and_check(ConfigString, BridgeType, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
+    RetConfig.
+
+success_http_handler() ->
+    success_http_handler(#{response_delay => 0}).
+
+success_http_handler(Opts) ->
+    ResponseDelay = maps:get(response_delay, Opts, 0),
+    TestPid = self(),
+    fun(Req0, State) ->
+        {ok, Body, Req} = cowboy_req:read_body(Req0),
+        Headers = cowboy_req:headers(Req),
+        ct:pal("http request received: ~p", [
+            #{body => Body, headers => Headers, response_delay => ResponseDelay}
+        ]),
+        ResponseDelay > 0 andalso timer:sleep(ResponseDelay),
+        TestPid ! {http, Headers, Body},
+        Rep = cowboy_req:reply(
+            200,
+            #{<<"content-type">> => <<"text/plain">>},
+            <<"hello">>,
+            Req
+        ),
+        {ok, Rep, State}
+    end.
+
+init_http_success_server(Config) ->
+    HTTPPath = <<"/path">>,
+    ServerSSLOpts = false,
+    {ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
+        _Port = random, HTTPPath, ServerSSLOpts
+    ),
+    ResponseDelayMS = 500,
+    ok = emqx_bridge_http_connector_test_server:set_handler(
+        success_http_handler(#{response_delay => ResponseDelayMS})
+    ),
+    [
+        {http_server, #{port => HTTPPort, path => HTTPPath}},
+        {response_delay_ms, ResponseDelayMS},
+        {bridge_name, ?BRIDGE_NAME}
+        | Config
+    ].

+ 4 - 2
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_test_utils.erl

@@ -52,7 +52,7 @@ init_per_group(_Group, Config) ->
 common_init_per_group(Opts) ->
     emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
     ok = emqx_common_test_helpers:start_apps([
-        emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine
+        emqx_conf, emqx_bridge, emqx_bridge_rabbitmq, emqx_rule_engine, emqx_modules
     ]),
     ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
     {ok, _} = application:ensure_all_started(emqx_connector),
@@ -116,7 +116,9 @@ end_per_group(_Group, Config) ->
     } = get_channel_connection(Config),
     amqp_channel:call(Channel, #'queue.purge'{queue = rabbit_mq_queue()}),
     emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine]),
+    ok = emqx_common_test_helpers:stop_apps([
+        emqx_conf, emqx_bridge_rabbitmq, emqx_rule_engine, emqx_modules
+    ]),
     ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
     _ = application:stop(emqx_connector),
     _ = application:stop(emqx_bridge),

+ 9 - 1
apps/emqx_connector/src/emqx_connector_info.erl

@@ -31,6 +31,9 @@
 
 -export([clean_cache/0]).
 
+%% For tests
+-export([hard_coded_test_connector_info_modules/0]).
+
 %% The type name for the conncector
 -callback type_name() -> atom().
 
@@ -117,8 +120,13 @@ hard_coded_connector_info_modules_common() ->
         emqx_bridge_mqtt_pubsub_connector_info
     ].
 
+%% This exists so that it can be mocked for test cases
+hard_coded_test_connector_info_modules() -> [].
+
 hard_coded_connector_info_modules() ->
-    hard_coded_connector_info_modules_common() ++ hard_coded_connector_info_modules_ee().
+    hard_coded_connector_info_modules_common() ++
+        hard_coded_connector_info_modules_ee() ++
+        ?MODULE:hard_coded_test_connector_info_modules().
 
 %% --------------------------------------------------------------------
 %% Atom macros to avoid typos

+ 10 - 1
apps/emqx_management/src/emqx_mgmt_api_trace.erl

@@ -222,7 +222,7 @@ fields(trace) ->
             )},
         {type,
             hoconsc:mk(
-                hoconsc:enum([clientid, topic, ip_address]),
+                hoconsc:enum([clientid, topic, ip_address, ruleid]),
                 #{
                     description => ?DESC(filter_type),
                     required => true,
@@ -257,6 +257,15 @@ fields(trace) ->
                     example => <<"127.0.0.1">>
                 }
             )},
+        {ruleid,
+            hoconsc:mk(
+                binary(),
+                #{
+                    description => ?DESC(ruleid_field),
+                    required => false,
+                    example => <<"my_rule">>
+                }
+            )},
         {status,
             hoconsc:mk(
                 hoconsc:enum([running, stopped, waiting]),

+ 4 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -535,7 +535,9 @@ trace(_) ->
         {"trace stop  topic  <Topic> ", "Stop tracing for a topic on local node"},
         {"trace start ip_address  <IP>    <File> [<Level>] ",
             "Traces for a client ip on local node"},
-        {"trace stop  ip_address  <IP> ", "Stop tracing for a client ip on local node"}
+        {"trace stop  ip_address  <IP> ", "Stop tracing for a client ip on local node"},
+        {"trace start ruleid  <RuleID>    <File> [<Level>] ", "Traces for a rule ID on local node"},
+        {"trace stop  ruleid  <RuleID> ", "Stop tracing for a rule ID on local node"}
     ]).
 
 trace_on(Name, Type, Filter, Level, LogFile) ->
@@ -606,6 +608,7 @@ traces(_) ->
             "and will end after <Duration> seconds. The default value for <Duration> is "
             ?DEFAULT_TRACE_DURATION
             " seconds."},
+        {"traces start <Name> ruleid <RuleID> [<Duration>]", "Traces for a rule ID in cluster"},
         {"traces stop <Name>", "Stop trace in cluster"},
         {"traces delete <Name>", "Delete trace in cluster"}
     ]).

+ 65 - 2
apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl

@@ -122,6 +122,56 @@ t_http_test(_Config) ->
     unload(),
     ok.
 
+t_http_test_rule_trace(_Config) ->
+    emqx_trace:clear(),
+    load(),
+    %% create
+    Name = atom_to_binary(?FUNCTION_NAME),
+    Trace = [
+        {<<"name">>, Name},
+        {<<"type">>, <<"ruleid">>},
+        {<<"ruleid">>, Name}
+    ],
+
+    {ok, Create} = request_api(post, api_path("trace"), Trace),
+    ?assertMatch(#{<<"name">> := Name}, json(Create)),
+
+    {ok, List} = request_api(get, api_path("trace")),
+    [Data] = json(List),
+    ?assertEqual(Name, maps:get(<<"name">>, Data)),
+
+    %% update
+    {ok, Update} = request_api(put, api_path(iolist_to_binary(["trace/", Name, "/stop"])), #{}),
+    ?assertEqual(
+        #{
+            <<"enable">> => false,
+            <<"name">> => Name
+        },
+        json(Update)
+    ),
+    {ok, List1} = request_api(get, api_path("trace")),
+    [Data1] = json(List1),
+    Node = atom_to_binary(node()),
+    ?assertMatch(
+        #{
+            <<"status">> := <<"stopped">>,
+            <<"name">> := Name,
+            <<"log_size">> := #{Node := _},
+            <<"start_at">> := _,
+            <<"end_at">> := _,
+            <<"type">> := <<"ruleid">>,
+            <<"ruleid">> := Name
+        },
+        Data1
+    ),
+
+    %% delete
+    {ok, Delete} = request_api(delete, api_path(["trace/", Name])),
+    ?assertEqual(<<>>, Delete),
+
+    unload(),
+    ok.
+
 t_create_failed(_Config) ->
     load(),
     Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}],
@@ -252,13 +302,16 @@ t_log_file(_Config) ->
     ok.
 
 create_trace(Name, ClientId, Start) ->
+    create_trace(Name, clientid, ClientId, Start).
+
+create_trace(Name, Type, TypeValue, Start) ->
     ?check_trace(
         #{timetrap => 900},
         begin
             {ok, _} = emqx_trace:create([
                 {<<"name">>, Name},
-                {<<"type">>, clientid},
-                {<<"clientid">>, ClientId},
+                {<<"type">>, Type},
+                {atom_to_binary(Type), TypeValue},
                 {<<"start_at">>, Start}
             ]),
             ?block_until(#{?snk_kind := update_trace_done})
@@ -268,6 +321,16 @@ create_trace(Name, ClientId, Start) ->
         end
     ).
 
+create_rule_trace(RuleId) ->
+    Now = erlang:system_time(second),
+    emqx_mgmt_api_trace_SUITE:create_trace(atom_to_binary(?FUNCTION_NAME), ruleid, RuleId, Now - 2).
+
+t_create_rule_trace(_Config) ->
+    load(),
+    create_rule_trace(atom_to_binary(?FUNCTION_NAME)),
+    unload(),
+    ok.
+
 t_stream_log(_Config) ->
     emqx_trace:clear(),
     load(),

+ 142 - 39
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -64,8 +64,10 @@
 
 -define(COLLECT_REQ_LIMIT, 1000).
 -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
--define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
--define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
+-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
+    {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
+).
+-define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)).
 -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
 -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
     {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
@@ -77,7 +79,10 @@
 -type id() :: binary().
 -type index() :: pos_integer().
 -type expire_at() :: infinity | integer().
--type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
+-type trace_context() :: map() | undefined.
+-type queue_query() :: ?QUERY(
+    reply_fun(), request(), HasBeenSent :: boolean(), expire_at(), TraceCtx :: trace_context()
+).
 -type request() :: term().
 -type request_from() :: undefined | gen_statem:from().
 -type timeout_ms() :: emqx_schema:timeout_duration_ms().
@@ -154,7 +159,10 @@ simple_sync_query(Id, Request, QueryOpts0) ->
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
-    Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
+    TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
+    Result = call_query(
+        force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
+    ),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
 
@@ -167,8 +175,9 @@ simple_async_query(Id, Request, QueryOpts0) ->
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
     ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
+    TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
     Result = call_query(
-        async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
+        async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
     ),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     Result.
@@ -439,10 +448,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
     Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
     {ShouldAck, PostFn, DeltaCounters} =
         case QueryOrBatch of
-            ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
+            ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) ->
                 Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
                 reply_caller_defer_metrics(Id, Reply, QueryOpts);
-            [?QUERY(_, _, _, _) | _] = Batch ->
+            [?QUERY(_, _, _, _, _) | _] = Batch ->
                 batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
         end,
     Data1 = aggregate_counters(Data0, DeltaCounters),
@@ -501,11 +510,13 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
                     ReplyFun = maps:get(async_reply_fun, Opts, undefined),
                     HasBeenSent = false,
                     ExpireAt = maps:get(expire_at, Opts),
-                    ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
+                    TraceCtx = maps:get(trace_ctx, Opts, undefined),
+                    ?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt, TraceCtx);
                 (?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
                     HasBeenSent = false,
                     ExpireAt = maps:get(expire_at, Opts),
-                    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
+                    TraceCtx = maps:get(trace_ctx, Opts, undefined),
+                    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx)
             end,
             Requests
         ),
@@ -515,7 +526,7 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
 
 reply_overflown([]) ->
     ok;
-reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
+reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) | More]) ->
     do_reply_caller(ReplyTo, {error, buffer_overflow}),
     reply_overflown(More).
 
@@ -572,7 +583,11 @@ flush(Data0) ->
             {keep_state, Data1};
         {_, false} ->
             ?tp(buffer_worker_flush_before_pop, #{}),
-            {Q1, QAckRef, Batch} = replayq:pop(Q0, #{count_limit => BatchSize}),
+            PopOpts = #{
+                count_limit => BatchSize,
+                stop_before => {fun stop_batching/2, initial_state}
+            },
+            {Q1, QAckRef, Batch} = replayq:pop(Q0, PopOpts),
             Data2 = Data1#{queue := Q1},
             ?tp(buffer_worker_flush_before_sieve_expired, #{}),
             Now = now_(),
@@ -608,6 +623,23 @@ flush(Data0) ->
             end
     end.
 
+stop_batching(Query, initial_state) ->
+    get_stop_flag(Query);
+stop_batching(Query, PrevStopFlag) ->
+    case get_stop_flag(Query) =:= PrevStopFlag of
+        true ->
+            PrevStopFlag;
+        false ->
+            %% We stop beceause we don't want a batch with mixed values for the
+            %% stop_action_after_render option
+            true
+    end.
+
+get_stop_flag(?QUERY(_, _, _, _, #{stop_action_after_render := true})) ->
+    stop_action_after_render;
+get_stop_flag(_) ->
+    no_stop_action_after_render.
+
 -spec do_flush(data(), #{
     is_batch := boolean(),
     batch := [queue_query()],
@@ -630,7 +662,7 @@ do_flush(
         inflight_tid := InflightTID
     } = Data0,
     %% unwrap when not batching (i.e., batch size == 1)
-    [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
+    [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch,
     QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
     Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
     Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
@@ -824,14 +856,14 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
 
 expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
     lists:map(
-        fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) ->
+        fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) ->
             ?REPLY(FROM, SENT, Result)
         end,
         lists:zip(Batch, BatchResults)
     );
 expand_batch_reply(BatchResult, Batch) ->
     lists:map(
-        fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
+        fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) ->
             ?REPLY(FROM, SENT, BatchResult)
         end,
         Batch
@@ -880,7 +912,7 @@ reply_dropped(_ReplyTo, _Result) ->
 -spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
 batch_reply_dropped(Batch, Result) ->
     lists:foreach(
-        fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
+        fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt, _TraceCtx)) ->
             reply_dropped(ReplyTo, Result)
         end,
         Batch
@@ -1093,11 +1125,53 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
         {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
             {error, {unrecoverable_error, unhealthy_target}};
         {ok, _Group, Resource} ->
-            do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
+            set_rule_id_trace_meta_data(Query),
+            QueryResult = do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource),
+            %% do_call_query does not throw an exception as the call to the
+            %% resource is wrapped in a try catch expression so we will always
+            %% unset the trace meta data
+            unset_rule_id_trace_meta_data(),
+            QueryResult;
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
     end.
 
+set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
+    %% Get the rule ids from requests
+    RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
+    ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
+    StopAfterRenderVal =
+        case Requests of
+            %% We know that the batch is not mixed since we prevent this by
+            %% using a stop_after function in the replayq:pop call
+            [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] ->
+                true;
+            [?QUERY(_, _, _, _, _TraceCTX) | _] ->
+                false
+        end,
+    logger:update_process_metadata(#{
+        rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal
+    }),
+    ok;
+set_rule_id_trace_meta_data(Request) ->
+    set_rule_id_trace_meta_data([Request]),
+    ok.
+
+collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) ->
+    Acc#{RuleId => true};
+collect_rule_id(?QUERY(_, _, _, _, _), Acc) ->
+    Acc.
+
+collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
+    Acc#{ClientId => true};
+collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
+    Acc.
+
+unset_rule_id_trace_meta_data() ->
+    logger:update_process_metadata(#{
+        rule_ids => #{}, client_ids => #{}, stop_action_after_render => false
+    }).
+
 %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
 extract_connector_id(Id) when is_binary(Id) ->
     case binary:split(Id, <<":">>, [global]) of
@@ -1208,7 +1282,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
 ).
 
 apply_query_fun(
-    sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts
+    sync,
+    Mod,
+    Id,
+    _Index,
+    _Ref,
+    ?QUERY(_, Request, _, _, _TraceCtx) = _Query,
+    ResSt,
+    Channels,
+    QueryOpts
 ) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
     maybe_reply_to(
@@ -1227,7 +1309,15 @@ apply_query_fun(
         QueryOpts
     );
 apply_query_fun(
-    async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts
+    async,
+    Mod,
+    Id,
+    Index,
+    Ref,
+    ?QUERY(_, Request, _, _, _TraceCtx) = Query,
+    ResSt,
+    Channels,
+    QueryOpts
 ) ->
     ?tp(call_query_async, #{
         id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
@@ -1268,7 +1358,7 @@ apply_query_fun(
     Id,
     _Index,
     _Ref,
-    [?QUERY(_, FirstRequest, _, _) | _] = Batch,
+    [?QUERY(_, FirstRequest, _, _, _) | _] = Batch,
     ResSt,
     Channels,
     QueryOpts
@@ -1276,7 +1366,9 @@ apply_query_fun(
     ?tp(call_batch_query, #{
         id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
     }),
-    Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
+    Requests = lists:map(
+        fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch
+    ),
     maybe_reply_to(
         ?APPLY_RESOURCE(
             call_batch_query,
@@ -1298,7 +1390,7 @@ apply_query_fun(
     Id,
     Index,
     Ref,
-    [?QUERY(_, FirstRequest, _, _) | _] = Batch,
+    [?QUERY(_, FirstRequest, _, _, _) | _] = Batch,
     ResSt,
     Channels,
     QueryOpts
@@ -1321,7 +1413,7 @@ apply_query_fun(
                 min_batch => minimize(Batch)
             },
             Requests = lists:map(
-                fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
+                fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch
             ),
             IsRetriable = false,
             AsyncWorkerMRef = undefined,
@@ -1367,7 +1459,7 @@ handle_async_reply1(
         inflight_tid := InflightTID,
         resource_id := Id,
         buffer_worker := BufferWorkerPid,
-        min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
+        min_query := ?QUERY(ReplyTo, _, _, ExpireAt, _TraceCtx) = _Query
     } = ReplyContext,
     Result
 ) ->
@@ -1399,7 +1491,7 @@ do_handle_async_reply(
         request_ref := Ref,
         buffer_worker := BufferWorkerPid,
         inflight_tid := InflightTID,
-        min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
+        min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query
     },
     Result
 ) ->
@@ -1486,13 +1578,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) ->
     %% So we just take the original flag from the ReplyContext batch
     %% and put it back to the batch found in inflight table
     %% which must have already been set to `false`
-    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
+    [?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) | _] = Batch,
     {RealNotExpired0, RealExpired, Results} =
         sieve_expired_requests_with_results(RealBatch, Now, Results0),
     RealNotExpired =
         lists:map(
-            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
-                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
+            fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt, TraceCtx)) ->
+                ?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt, TraceCtx)
             end,
             RealNotExpired0
         ),
@@ -1678,7 +1770,10 @@ inflight_get_first_retriable(InflightTID, Now) ->
     case ets:select(InflightTID, MatchSpec, _Limit = 1) of
         '$end_of_table' ->
             none;
-        {[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
+        {
+            [{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)}],
+            _Continuation
+        } ->
             case is_expired(ExpireAt, Now) of
                 true ->
                     {expired, Ref, [Query]};
@@ -1714,7 +1809,7 @@ inflight_append(undefined, _InflightItem) ->
     ok;
 inflight_append(
     InflightTID,
-    ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
+    ?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
 ) ->
     Batch = mark_as_sent(Batch0),
     InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
@@ -1726,7 +1821,10 @@ inflight_append(
 inflight_append(
     InflightTID,
     ?INFLIGHT_ITEM(
-        Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef
+        Ref,
+        ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) = Query0,
+        IsRetriable,
+        AsyncWorkerMRef
     )
 ) ->
     Query = mark_as_sent(Query0),
@@ -1790,9 +1888,13 @@ ack_inflight(undefined, _Ref, _BufferWorkerPid) ->
 ack_inflight(InflightTID, Ref, BufferWorkerPid) ->
     {Count, Removed} =
         case ets:take(InflightTID, Ref) of
-            [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
+            [?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
                 {1, true};
-            [?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] ->
+            [
+                ?INFLIGHT_ITEM(
+                    Ref, [?QUERY(_, _, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef
+                )
+            ] ->
                 {length(Batch), true};
             [] ->
                 {0, false}
@@ -1942,9 +2044,9 @@ do_collect_requests(Acc, Count, Limit) ->
 
 mark_as_sent(Batch) when is_list(Batch) ->
     lists:map(fun mark_as_sent/1, Batch);
-mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
+mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt, TraceCtx)) ->
     HasBeenSent = true,
-    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
+    ?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx).
 
 is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
     true;
@@ -1967,7 +2069,7 @@ is_async_return(_) ->
 
 sieve_expired_requests(Batch, Now) ->
     lists:partition(
-        fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
+        fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)) ->
             not is_expired(ExpireAt, Now)
         end,
         Batch
@@ -1978,7 +2080,7 @@ sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) -
     {RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} =
         lists:foldl(
             fun(
-                {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result},
+                {?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx) = Query, Result},
                 {NotExpAcc, ResAcc, ExpAcc}
             ) ->
                 case not is_expired(ExpireAt, Now) of
@@ -2026,15 +2128,16 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
     Opts#{expire_at => ExpireAt}.
 
 %% no need to keep the request for async reply handler
-minimize(?QUERY(_, _, _, _) = Q) ->
+minimize(?QUERY(_, _, _, _, _) = Q) ->
     do_minimize(Q);
 minimize(L) when is_list(L) ->
     lists:map(fun do_minimize/1, L).
 
 -ifdef(TEST).
-do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
+do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt, _TraceCtx) = Query) -> Query.
 -else.
-do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
+do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt, TraceCtx)) ->
+    ?QUERY(ReplyTo, [], Sent, ExpireAt, TraceCtx).
 -endif.
 
 %% To avoid message loss due to misconfigurations, we adjust

+ 27 - 28
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -277,10 +277,9 @@ t_batch_query_counter(_) ->
         fun(Result, Trace) ->
             ?assertMatch({ok, 0}, Result),
             QueryTrace = ?of_kind(call_batch_query, Trace),
-            ?assertMatch([#{batch := [{query, _, get_counter, _, _}]}], QueryTrace)
+            ?assertMatch([#{batch := [{query, _, get_counter, _, _, _}]}], QueryTrace)
         end
     ),
-
     NMsgs = 1_000,
     ?check_trace(
         ?TRACE_OPTS,
@@ -340,7 +339,7 @@ t_query_counter_async_query(_) ->
         fun(Trace) ->
             %% the callback_mode of 'emqx_connector_demo' is 'always_sync'.
             QueryTrace = ?of_kind(call_query, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
         end
     ),
     %% simple query ignores the query_mode and batching settings in the resource_worker
@@ -351,7 +350,7 @@ t_query_counter_async_query(_) ->
             ?assertMatch({ok, 1000}, Result),
             %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
             QueryTrace = ?of_kind(call_query, Trace),
-            ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
+            ?assertMatch([#{query := {query, _, get_counter, _, _, _}}], QueryTrace)
         end
     ),
     #{counters := C} = emqx_resource:get_metrics(?ID),
@@ -397,7 +396,7 @@ t_query_counter_async_callback(_) ->
         end,
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
         end
     ),
 
@@ -408,7 +407,7 @@ t_query_counter_async_callback(_) ->
         fun(Result, Trace) ->
             ?assertMatch({ok, 1000}, Result),
             QueryTrace = ?of_kind(call_query, Trace),
-            ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
+            ?assertMatch([#{query := {query, _, get_counter, _, _, _}}], QueryTrace)
         end
     ),
     #{counters := C} = emqx_resource:get_metrics(?ID),
@@ -480,7 +479,7 @@ t_query_counter_async_inflight(_) ->
             ),
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
         end
     ),
     tap_metrics(?LINE),
@@ -537,7 +536,7 @@ t_query_counter_async_inflight(_) ->
         end,
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace),
+            ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _, _}} | _], QueryTrace),
             ?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}),
             tap_metrics(?LINE),
             ok
@@ -557,7 +556,7 @@ t_query_counter_async_inflight(_) ->
             ),
         fun(Trace) ->
             QueryTrace = ?of_kind(call_query_async, Trace),
-            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _}} | _], QueryTrace)
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}, _, _, _}} | _], QueryTrace)
         end
     ),
 
@@ -669,8 +668,8 @@ t_query_counter_async_inflight_batch(_) ->
              || Event = #{
                     ?snk_kind := call_batch_query_async,
                     batch := [
-                        {query, _, {inc_counter, 1}, _, _},
-                        {query, _, {inc_counter, 1}, _, _}
+                        {query, _, {inc_counter, 1}, _, _, _},
+                        {query, _, {inc_counter, 1}, _, _, _}
                     ]
                 } <-
                     Trace
@@ -754,7 +753,7 @@ t_query_counter_async_inflight_batch(_) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_batch_query_async, Trace),
             ?assertMatch(
-                [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
+                [#{batch := [{query, _, {inc_counter, _}, _, _, _} | _]} | _],
                 QueryTrace
             )
         end
@@ -779,7 +778,7 @@ t_query_counter_async_inflight_batch(_) ->
         fun(Trace) ->
             QueryTrace = ?of_kind(call_batch_query_async, Trace),
             ?assertMatch(
-                [#{batch := [{query, _, {inc_counter, _}, _, _} | _]} | _],
+                [#{batch := [{query, _, {inc_counter, _}, _, _, _} | _]} | _],
                 QueryTrace
             )
         end
@@ -2051,7 +2050,7 @@ do_t_expiration_before_sending(QueryMode) ->
         end,
         fun(Trace) ->
             ?assertMatch(
-                [#{batch := [{query, _, {inc_counter, 99}, _, _}]}],
+                [#{batch := [{query, _, {inc_counter, 99}, _, _, _}]}],
                 ?of_kind(buffer_worker_flush_all_expired, Trace)
             ),
             Metrics = tap_metrics(?LINE),
@@ -2167,7 +2166,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
                         #{
                             ?snk_kind := handle_async_reply,
                             action := ack,
-                            batch_or_query := [{query, _, {inc_counter, 99}, _, _}]
+                            batch_or_query := [{query, _, {inc_counter, 99}, _, _, _}]
                         },
                         10 * TimeoutMS
                     );
@@ -2189,8 +2188,8 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
             ?assertMatch(
                 [
                     #{
-                        expired := [{query, _, {inc_counter, 199}, _, _}],
-                        not_expired := [{query, _, {inc_counter, 99}, _, _}]
+                        expired := [{query, _, {inc_counter, 199}, _, _, _}],
+                        not_expired := [{query, _, {inc_counter, 99}, _, _, _}]
                     }
                 ],
                 ?of_kind(buffer_worker_flush_potentially_partial, Trace)
@@ -2303,7 +2302,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
                 #{?snk_kind := delay},
                 #{
                     ?snk_kind := handle_async_reply_enter,
-                    batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
+                    batch_or_query := [{query, _, {inc_counter, 199}, _, _, _} | _]
                 }
             ),
 
@@ -2346,8 +2345,8 @@ do_t_expiration_async_after_reply(IsBatch) ->
                         [
                             #{
                                 expired := [
-                                    {query, _, {inc_counter, 199}, _, _},
-                                    {query, _, {inc_counter, 299}, _, _}
+                                    {query, _, {inc_counter, 199}, _, _, _},
+                                    {query, _, {inc_counter, 299}, _, _, _}
                                 ]
                             }
                         ],
@@ -2365,8 +2364,8 @@ do_t_expiration_async_after_reply(IsBatch) ->
                 single ->
                     ?assertMatch(
                         [
-                            #{expired := [{query, _, {inc_counter, 199}, _, _}]},
-                            #{expired := [{query, _, {inc_counter, 299}, _, _}]}
+                            #{expired := [{query, _, {inc_counter, 199}, _, _, _}]},
+                            #{expired := [{query, _, {inc_counter, 299}, _, _, _}]}
                         ],
                         ?of_kind(handle_async_reply_expired, Trace)
                     )
@@ -2417,7 +2416,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
                 #{?snk_kind := delay},
                 #{
                     ?snk_kind := handle_async_reply_enter,
-                    batch_or_query := [{query, _, {inc_counter, 199}, _, _} | _]
+                    batch_or_query := [{query, _, {inc_counter, 199}, _, _, _} | _]
                 }
             ),
 
@@ -2451,8 +2450,8 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
                 [
                     #{
                         expired := [
-                            {query, _, {inc_counter, 199}, _, _},
-                            {query, _, {inc_counter, 299}, _, _}
+                            {query, _, {inc_counter, 199}, _, _, _},
+                            {query, _, {inc_counter, 299}, _, _, _}
                         ]
                     }
                 ],
@@ -2578,7 +2577,7 @@ do_t_expiration_retry() ->
         end,
         fun(Trace) ->
             ?assertMatch(
-                [#{expired := [{query, _, {inc_counter, 1}, _, _}]}],
+                [#{expired := [{query, _, {inc_counter, 1}, _, _, _}]}],
                 ?of_kind(buffer_worker_retry_expired, Trace)
             ),
             Metrics = tap_metrics(?LINE),
@@ -2655,8 +2654,8 @@ t_expiration_retry_batch_multiple_times(_Config) ->
         fun(Trace) ->
             ?assertMatch(
                 [
-                    #{expired := [{query, _, {inc_counter, 1}, _, _}]},
-                    #{expired := [{query, _, {inc_counter, 2}, _, _}]}
+                    #{expired := [{query, _, {inc_counter, 1}, _, _, _}]},
+                    #{expired := [{query, _, {inc_counter, 2}, _, _, _}]}
                 ],
                 ?of_kind(buffer_worker_retry_expired, Trace)
             ),

+ 10 - 1
apps/emqx_rule_engine/rebar.config

@@ -2,7 +2,16 @@
 
 {deps, [
     {emqx, {path, "../emqx"}},
-    {emqx_utils, {path, "../emqx_utils"}}
+    {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_modules, {path, "../emqx_modules"}}
+]}.
+
+{profiles, [
+    {test, [
+        {deps, [
+            {emqx_bridge_http, {path, "../emqx_bridge_http"}}
+        ]}
+    ]}
 ]}.
 
 {erl_opts, [

+ 38 - 21
apps/emqx_rule_engine/src/emqx_rule_api_schema.erl

@@ -26,7 +26,7 @@
 
 -export([namespace/0, roots/0, fields/1]).
 
--type tag() :: rule_creation | rule_test | rule_engine.
+-type tag() :: rule_creation | rule_test | rule_engine | rule_apply_test.
 
 -spec check_params(map(), tag()) -> {ok, map()} | {error, term()}.
 check_params(Params, Tag) ->
@@ -54,7 +54,8 @@ roots() ->
         {"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
         {"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
         {"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
-        {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
+        {"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})},
+        {"rule_apply_test", sc(ref("rule_apply_test"), #{desc => ?DESC("root_apply_rule_test")})}
     ].
 
 fields("rule_engine") ->
@@ -101,28 +102,21 @@ fields("rule_events") ->
     ];
 fields("rule_test") ->
     [
-        {"context",
+        rule_input_message_context(),
+        {"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})}
+    ];
+fields("rule_apply_test") ->
+    [
+        rule_input_message_context(),
+        {"stop_action_after_template_rendering",
             sc(
-                hoconsc:union([
-                    ref("ctx_pub"),
-                    ref("ctx_sub"),
-                    ref("ctx_unsub"),
-                    ref("ctx_delivered"),
-                    ref("ctx_acked"),
-                    ref("ctx_dropped"),
-                    ref("ctx_connected"),
-                    ref("ctx_disconnected"),
-                    ref("ctx_connack"),
-                    ref("ctx_check_authz_complete"),
-                    ref("ctx_bridge_mqtt"),
-                    ref("ctx_delivery_dropped")
-                ]),
+                typerefl:boolean(),
                 #{
-                    desc => ?DESC("test_context"),
-                    default => #{}
+                    desc =>
+                        ?DESC("stop_action_after_template_render"),
+                    default => true
                 }
-            )},
-        {"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})}
+            )}
     ];
 fields("metrics") ->
     [
@@ -315,6 +309,29 @@ fields("ctx_delivery_dropped") ->
         | msg_event_common_fields()
     ].
 
+rule_input_message_context() ->
+    {"context",
+        sc(
+            hoconsc:union([
+                ref("ctx_pub"),
+                ref("ctx_sub"),
+                ref("ctx_unsub"),
+                ref("ctx_delivered"),
+                ref("ctx_acked"),
+                ref("ctx_dropped"),
+                ref("ctx_connected"),
+                ref("ctx_disconnected"),
+                ref("ctx_connack"),
+                ref("ctx_check_authz_complete"),
+                ref("ctx_bridge_mqtt"),
+                ref("ctx_delivery_dropped")
+            ]),
+            #{
+                desc => ?DESC("test_context"),
+                default => #{}
+            }
+        )}.
+
 qos() ->
     {"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
 

+ 3 - 1
apps/emqx_rule_engine/src/emqx_rule_engine.app.src

@@ -17,7 +17,9 @@
         %% rule_engine should wait for bridge connector start,
         %% it's will check action/connector ref's exist.
         emqx_bridge,
-        emqx_connector
+        emqx_connector,
+        %% Needed to start the tracing functionality
+        emqx_modules
     ]},
     {mod, {emqx_rule_engine_app, []}},
     {env, []},

+ 38 - 0
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -37,6 +37,7 @@
     '/rule_test'/2,
     '/rules'/2,
     '/rules/:id'/2,
+    '/rules/:id/test'/2,
     '/rules/:id/metrics'/2,
     '/rules/:id/metrics/reset'/2
 ]).
@@ -145,6 +146,7 @@ paths() ->
         "/rule_test",
         "/rules",
         "/rules/:id",
+        "/rules/:id/test",
         "/rules/:id/metrics",
         "/rules/:id/metrics/reset"
     ].
@@ -161,6 +163,9 @@ rule_creation_schema() ->
 rule_test_schema() ->
     ref(emqx_rule_api_schema, "rule_test").
 
+rule_apply_test_schema() ->
+    ref(emqx_rule_api_schema, "rule_apply_test").
+
 rule_info_schema() ->
     ref(emqx_rule_api_schema, "rule_info").
 
@@ -258,6 +263,21 @@ schema("/rules/:id") ->
             }
         }
     };
+schema("/rules/:id/test") ->
+    #{
+        'operationId' => '/rules/:id/test',
+        post => #{
+            tags => [<<"rules">>],
+            description => ?DESC("api11"),
+            summary => <<"Apply a rule for testing">>,
+            'requestBody' => rule_apply_test_schema(),
+            responses => #{
+                400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
+                412 => error_schema('NOT_MATCH', "SQL Not Match"),
+                200 => <<"Rule Applied">>
+            }
+        }
+    };
 schema("/rules/:id/metrics") ->
     #{
         'operationId' => '/rules/:id/metrics',
@@ -392,6 +412,24 @@ param_path_id() ->
         end
     ).
 
+'/rules/:id/test'(post, #{body := Params, bindings := #{id := RuleId}}) ->
+    ?CHECK_PARAMS(
+        Params,
+        rule_apply_test,
+        begin
+            case emqx_rule_sqltester:apply_rule(RuleId, CheckedParams) of
+                {ok, Result} ->
+                    {200, Result};
+                {error, {parse_error, Reason}} ->
+                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
+                {error, nomatch} ->
+                    {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
+                {error, Reason} ->
+                    {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
+            end
+        end
+    ).
+
 '/rules/:id'(get, #{bindings := #{id := Id}}) ->
     case emqx_rule_engine:get_rule(Id) of
         {ok, Rule} ->

+ 228 - 58
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -69,6 +69,14 @@ apply_rule_discard_result(Rule, Columns, Envs) ->
     ok.
 
 apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
+    set_process_trace_metadata(RuleID, Columns),
+    trace_rule_sql(
+        "rule_activated",
+        #{
+            input => Columns, environment => Envs
+        },
+        debug
+    ),
     ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
     clear_rule_payload(),
     try
@@ -77,48 +85,80 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
         %% ignore the errors if select or match failed
         _:Reason = {select_and_transform_error, Error} ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
-            ?SLOG(warning, #{
-                msg => "SELECT_clause_exception",
-                rule_id => RuleID,
-                reason => Error
-            }),
+            trace_rule_sql(
+                "SELECT_clause_exception",
+                #{
+                    reason => Error
+                },
+                warning
+            ),
             {error, Reason};
         _:Reason = {match_conditions_error, Error} ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
-            ?SLOG(warning, #{
-                msg => "WHERE_clause_exception",
-                rule_id => RuleID,
-                reason => Error
-            }),
+            trace_rule_sql(
+                "WHERE_clause_exception",
+                #{
+                    reason => Error
+                },
+                warning
+            ),
             {error, Reason};
         _:Reason = {select_and_collect_error, Error} ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
-            ?SLOG(warning, #{
-                msg => "FOREACH_clause_exception",
-                rule_id => RuleID,
-                reason => Error
-            }),
+            trace_rule_sql(
+                "FOREACH_clause_exception",
+                #{
+                    reason => Error
+                },
+                warning
+            ),
             {error, Reason};
         _:Reason = {match_incase_error, Error} ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
-            ?SLOG(warning, #{
-                msg => "INCASE_clause_exception",
-                rule_id => RuleID,
-                reason => Error
-            }),
+            trace_rule_sql(
+                "INCASE_clause_exception",
+                #{
+                    reason => Error
+                },
+                warning
+            ),
             {error, Reason};
         Class:Error:StkTrace ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
-            ?SLOG(error, #{
-                msg => "apply_rule_failed",
-                rule_id => RuleID,
-                exception => Class,
-                reason => Error,
-                stacktrace => StkTrace
-            }),
+            trace_rule_sql(
+                "apply_rule_failed",
+                #{
+                    exception => Class,
+                    reason => Error,
+                    stacktrace => StkTrace
+                },
+                warning
+            ),
             {error, {Error, StkTrace}}
+    after
+        reset_process_trace_metadata(Columns)
     end.
 
+set_process_trace_metadata(RuleID, #{clientid := ClientID}) ->
+    logger:update_process_metadata(#{
+        rule_id => RuleID,
+        clientid => ClientID
+    });
+set_process_trace_metadata(RuleID, _) ->
+    logger:update_process_metadata(#{
+        rule_id => RuleID
+    }).
+
+reset_process_trace_metadata(#{clientid := _ClientID}) ->
+    Meta = logger:get_process_metadata(),
+    Meta1 = maps:remove(clientid, Meta),
+    Meta2 = maps:remove(rule_id, Meta1),
+    logger:set_process_metadata(Meta2);
+reset_process_trace_metadata(_) ->
+    Meta = logger:get_process_metadata(),
+    Meta1 = maps:remove(rule_id, Meta),
+    logger:set_process_metadata(Meta1).
+
 do_apply_rule(
     #{
         id := RuleId,
@@ -136,13 +176,18 @@ do_apply_rule(
         {ok, ColumnsAndSelected, FinalCollection} ->
             case FinalCollection of
                 [] ->
+                    trace_rule_sql("FOREACH_yielded_no_result"),
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
                 _ ->
+                    trace_rule_sql(
+                        "FOREACH_yielded_result", #{result => FinalCollection}, debug
+                    ),
                     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
             end,
             NewEnvs = maps:merge(ColumnsAndSelected, Envs),
             {ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
         false ->
+            trace_rule_sql("FOREACH_yielded_no_result_no_match"),
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
     end;
@@ -159,9 +204,11 @@ do_apply_rule(
 ) ->
     case evaluate_select(Fields, Columns, Conditions) of
         {ok, Selected} ->
+            trace_rule_sql("SELECT_yielded_result", #{result => Selected}, debug),
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
             {ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
         false ->
+            trace_rule_sql("SELECT_yielded_no_result_no_match"),
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
             {error, nomatch}
     end.
@@ -345,37 +392,42 @@ handle_action_list(RuleId, Actions, Selected, Envs) ->
 
 handle_action(RuleId, ActId, Selected, Envs) ->
     ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
+    trace_action(ActId, "activating_action"),
     try
-        do_handle_action(RuleId, ActId, Selected, Envs)
+        Result = do_handle_action(RuleId, ActId, Selected, Envs),
+        trace_action(ActId, "action_activated", #{result => Result}),
+        Result
     catch
         throw:out_of_service ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
             ok = emqx_metrics_worker:inc(
                 rule_metrics, RuleId, 'actions.failed.out_of_service'
             ),
-            ?SLOG(warning, #{msg => "out_of_service", action => ActId});
+            trace_action(ActId, "out_of_service", #{}, warning);
         Err:Reason:ST ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
-            ?SLOG(error, #{
-                msg => "action_failed",
-                action => ActId,
-                exception => Err,
-                reason => Reason,
-                stacktrace => ST
-            })
+            trace_action(
+                ActId,
+                "action_failed",
+                #{
+                    exception => Err,
+                    reason => Reason,
+                    stacktrace => ST
+                },
+                error
+            )
     end.
 
 -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target).
-do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
-    ?TRACE(
-        "BRIDGE",
-        "bridge_action",
-        #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
-    ),
-    ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
+do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) ->
+    trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug),
+    {TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
+    ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}},
     case
-        emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
+        emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{
+            reply_to => ReplyTo, trace_ctx => TraceCtx
+        })
     of
         {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
             throw(out_of_service);
@@ -386,22 +438,19 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env
     end;
 do_handle_action(
     RuleId,
-    {bridge_v2, BridgeType, BridgeName},
+    {bridge_v2, BridgeType, BridgeName} = Action,
     Selected,
     _Envs
 ) ->
-    ?TRACE(
-        "BRIDGE",
-        "bridge_action",
-        #{bridge_id => {bridge_v2, BridgeType, BridgeName}}
-    ),
-    ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
+    trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug),
+    {TraceCtx, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
+    ReplyTo = {fun ?MODULE:inc_action_metrics/2, [IncCtx], #{reply_dropped => true}},
     case
         emqx_bridge_v2:send_message(
             BridgeType,
             BridgeName,
             Selected,
-            #{reply_to => ReplyTo}
+            #{reply_to => ReplyTo, trace_ctx => TraceCtx}
         )
     of
         {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
@@ -412,12 +461,72 @@ do_handle_action(
             Result
     end;
 do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) ->
+    trace_action(Action, "call_action_function"),
     %% the function can also throw 'out_of_service'
     Args = maps:get(args, Action, []),
     Result = Mod:Func(Selected, Envs, Args),
-    inc_action_metrics(RuleId, Result),
+    {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
+    inc_action_metrics(IncCtx, Result),
+    trace_action(Action, "call_action_function_result", #{result => Result}, debug),
     Result.
 
+do_handle_action_get_trace_inc_metrics_context(RuleID, Action) ->
+    case {emqx_trace:list(), logger:get_process_metadata()} of
+        {[], #{stop_action_after_render := true}} ->
+            %% Even if there is no trace we still need to pass
+            %% stop_action_after_render in the trace meta data so that the
+            %% action will be stopped.
+            {
+                #{
+                    stop_action_after_render => true
+                },
+                #{
+                    rule_id => RuleID,
+                    action_id => Action
+                }
+            };
+        {[], _} ->
+            %% As a performance/memory optimization, we don't create any trace
+            %% context if there are no trace patterns.
+            {undefined, #{
+                rule_id => RuleID,
+                action_id => Action
+            }};
+        {_List, TraceMeta} ->
+            Ctx = do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta),
+            {maps:remove(action_id, Ctx), Ctx}
+    end.
+
+do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta) ->
+    StopAfterRender = maps:get(stop_action_after_render, TraceMeta, false),
+    case TraceMeta of
+        #{
+            rule_id := RuleID,
+            clientid := ClientID
+        } ->
+            #{
+                rule_id => RuleID,
+                clientid => ClientID,
+                action_id => Action,
+                stop_action_after_render => StopAfterRender
+            };
+        #{
+            rule_id := RuleID
+        } ->
+            #{
+                rule_id => RuleID,
+                action_id => Action,
+                stop_action_after_render => StopAfterRender
+            }
+    end.
+
+action_info({bridge, BridgeType, BridgeName, _ResId}) ->
+    #{type => BridgeType, name => BridgeName};
+action_info({bridge_v2, BridgeType, BridgeName}) ->
+    #{type => BridgeType, name => BridgeName};
+action_info(FuncInfoMap) ->
+    FuncInfoMap.
+
 eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) ->
     case Context of
         [Columns] ->
@@ -596,21 +705,46 @@ nested_put(Alias, Val, Columns0) ->
     Columns = ensure_decoded_payload(Alias, Columns0),
     emqx_rule_maps:nested_put(Alias, Val, Columns).
 
-inc_action_metrics(RuleId, Result) ->
-    _ = do_inc_action_metrics(RuleId, Result),
+inc_action_metrics(TraceCtx, Result) ->
+    _ = do_inc_action_metrics(TraceCtx, Result),
     Result.
 
-do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
+do_inc_action_metrics(
+    #{rule_id := RuleId, action_id := ActId} = TraceContext,
+    {error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason}
+) ->
+    TraceContext1 = maps:remove(action_id, TraceContext),
+    trace_action(
+        ActId,
+        "action_stopped_after_template_rendering",
+        maps:merge(#{reason => Explanation}, TraceContext1)
+    ),
+    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
+    emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
+do_inc_action_metrics(
+    #{rule_id := RuleId, action_id := ActId} = TraceContext,
+    {error, {recoverable_error, _}}
+) ->
+    TraceContext1 = maps:remove(action_id, TraceContext),
+    trace_action(ActId, "out_of_service", TraceContext1),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
-do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
+do_inc_action_metrics(
+    #{rule_id := RuleId, action_id := ActId} = TraceContext,
+    {error, {unrecoverable_error, _} = Reason}
+) ->
+    TraceContext1 = maps:remove(action_id, TraceContext),
+    trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext1)),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
-do_inc_action_metrics(RuleId, R) ->
+do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) ->
+    TraceContext1 = maps:remove(action_id, TraceContext),
     case is_ok_result(R) of
         false ->
+            trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext1)),
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
         true ->
+            trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext1)),
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
     end.
 
@@ -658,3 +792,39 @@ parse_function_name(Module, Name) when is_binary(Name) ->
     end;
 parse_function_name(_Module, Name) when is_atom(Name) ->
     Name.
+
+trace_action(ActId, Message) ->
+    trace_action_bridge("ACTION", ActId, Message).
+
+trace_action(ActId, Message, Extra) ->
+    trace_action_bridge("ACTION", ActId, Message, Extra, debug).
+
+trace_action(ActId, Message, Extra, Level) ->
+    trace_action_bridge("ACTION", ActId, Message, Extra, Level).
+
+trace_action_bridge(Tag, ActId, Message) ->
+    trace_action_bridge(Tag, ActId, Message, #{}, debug).
+
+trace_action_bridge(Tag, ActId, Message, Extra, Level) ->
+    ?TRACE(
+        Level,
+        Tag,
+        Message,
+        maps:merge(
+            #{
+                action_info => action_info(ActId)
+            },
+            Extra
+        )
+    ).
+
+trace_rule_sql(Message) ->
+    trace_rule_sql(Message, #{}, debug).
+
+trace_rule_sql(Message, Extra, Level) ->
+    ?TRACE(
+        Level,
+        "RULE_SQL_EXEC",
+        Message,
+        Extra
+    ).

+ 65 - 1
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -20,9 +20,73 @@
     test/1,
     get_selected_data/3,
     %% Some SQL functions return different results in the test environment
-    is_test_runtime_env/0
+    is_test_runtime_env/0,
+    apply_rule/2
 ]).
 
+apply_rule(
+    RuleId,
+    #{
+        context := Context,
+        stop_action_after_template_rendering := StopAfterRender
+    }
+) ->
+    {ok, Rule} = emqx_rule_engine:get_rule(RuleId),
+    InTopic = get_in_topic(Context),
+    EventTopics = maps:get(from, Rule, []),
+    case lists:all(fun is_publish_topic/1, EventTopics) of
+        true ->
+            %% test if the topic matches the topic filters in the rule
+            case emqx_topic:match_any(InTopic, EventTopics) of
+                true ->
+                    do_apply_matched_rule(
+                        Rule,
+                        Context,
+                        StopAfterRender
+                    );
+                false ->
+                    {error, nomatch}
+            end;
+        false ->
+            case lists:member(InTopic, EventTopics) of
+                true ->
+                    %% the rule is for both publish and events, test it directly
+                    do_apply_matched_rule(Rule, Context, StopAfterRender);
+                false ->
+                    {error, nomatch}
+            end
+    end.
+
+do_apply_matched_rule(Rule, Context, StopAfterRender) ->
+    update_process_trace_metadata(StopAfterRender),
+    ApplyRuleRes = emqx_rule_runtime:apply_rule(
+        Rule,
+        Context,
+        apply_rule_environment()
+    ),
+    reset_trace_process_metadata(StopAfterRender),
+    ApplyRuleRes.
+
+update_process_trace_metadata(true = _StopAfterRender) ->
+    logger:update_process_metadata(#{
+        stop_action_after_render => true
+    });
+update_process_trace_metadata(false = _StopAfterRender) ->
+    ok.
+
+reset_trace_process_metadata(true = _StopAfterRender) ->
+    Meta = logger:get_process_metadata(),
+    NewMeta = maps:remove(stop_action_after_render, Meta),
+    logger:set_process_metadata(NewMeta);
+reset_trace_process_metadata(false = _StopAfterRender) ->
+    ok.
+
+%% At the time of writing the environment passed to the apply rule function is
+%% not used at all for normal actions. When it is used for custom functions it
+%% is first merged with the context so there does not seem to be any need to
+%% set this to anything else then the empty map.
+apply_rule_environment() -> #{}.
+
 -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}.
 test(#{sql := Sql, context := Context}) ->
     case emqx_rule_sqlparser:parse(Sql) of

+ 111 - 2
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -43,7 +43,8 @@ all() ->
         {group, metrics},
         {group, metrics_simple},
         {group, metrics_fail},
-        {group, metrics_fail_simple}
+        {group, metrics_fail_simple},
+        {group, tracing}
     ].
 
 suite() ->
@@ -142,6 +143,9 @@ groups() ->
         {metrics_fail_simple, [], [
             t_rule_metrics_sync_fail,
             t_rule_metrics_async_fail
+        ]},
+        {tracing, [], [
+            t_trace_rule_id
         ]}
     ].
 
@@ -160,7 +164,7 @@ init_per_suite(Config) ->
     Config.
 
 end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
+    emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine, emqx_auth, emqx_bridge]),
     ok.
 
 set_special_configs(emqx_auth) ->
@@ -3632,6 +3636,111 @@ create_bridge(Type, Name, Config) ->
     {ok, _Bridge} = emqx_bridge:create(Type, Name, Config),
     emqx_bridge_resource:bridge_id(Type, Name).
 
+create_rule(Name, SQL) ->
+    Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL),
+    {ok, _} = emqx_rule_engine:create_rule(Rule).
+
+emqtt_client_config() ->
+    [
+        {host, "localhost"},
+        {clientid, <<"client">>},
+        {username, <<"testuser">>},
+        {password, <<"pass">>}
+    ].
+
+filesync(Name, Type) ->
+    ct:sleep(50),
+    filesync(Name, Type, 5).
+
+%% sometime the handler process is not started yet.
+filesync(Name, Type, 0) ->
+    ct:fail("Handler process not started ~p ~p", [Name, Type]);
+filesync(Name0, Type, Retry) ->
+    Name =
+        case is_binary(Name0) of
+            true -> Name0;
+            false -> list_to_binary(Name0)
+        end,
+    try
+        Handler = binary_to_atom(<<"trace_", (atom_to_binary(Type))/binary, "_", Name/binary>>),
+        ok = logger_disk_log_h:filesync(Handler)
+    catch
+        E:R ->
+            ct:pal("Filesync error:~p ~p~n", [{Name, Type, Retry}, {E, R}]),
+            ct:sleep(100),
+            filesync(Name, Type, Retry - 1)
+    end.
+
+t_trace_rule_id(_Config) ->
+    %% Start MQTT Client
+    emqx_trace_SUITE:reload(),
+    {ok, T} = emqtt:start_link(emqtt_client_config()),
+    emqtt:connect(T),
+    %% Create rules
+    create_rule(
+        <<"test_rule_id_1">>,
+        <<"select 1 as rule_number from \"rule_1_topic\"">>
+    ),
+    create_rule(
+        <<"test_rule_id_2">>,
+        <<"select 2 as rule_number from \"rule_2_topic\"">>
+    ),
+    %% Start tracing
+    ok = emqx_trace_handler:install(
+        "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log"
+    ),
+    ok = emqx_trace_handler:install(
+        "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log"
+    ),
+    emqx_trace:check(),
+    ok = filesync("CLI-RULE-1", ruleid),
+    ok = filesync("CLI-RULE-2", ruleid),
+
+    %% Verify the tracing file exits
+    ?assert(filelib:is_regular("tmp/rule_trace_1.log")),
+    ?assert(filelib:is_regular("tmp/rule_trace_2.log")),
+
+    %% Get current traces
+    ?assertMatch(
+        [
+            #{
+                type := ruleid,
+                filter := <<"test_rule_id_1">>,
+                level := debug,
+                dst := "tmp/rule_trace_1.log",
+                name := <<"CLI-RULE-1">>
+            },
+            #{
+                type := ruleid,
+                filter := <<"test_rule_id_2">>,
+                name := <<"CLI-RULE-2">>,
+                level := debug,
+                dst := "tmp/rule_trace_2.log"
+            }
+        ],
+        emqx_trace_handler:running()
+    ),
+
+    %% Trigger rule
+    emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>),
+    ?retry(
+        100,
+        5,
+        begin
+            ok = filesync("CLI-RULE-1", ruleid),
+            {ok, Bin} = file:read_file("tmp/rule_trace_1.log"),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>]))
+        end
+    ),
+    ok = filesync("CLI-RULE-2", ruleid),
+    ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0),
+
+    %% Stop tracing
+    ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>),
+    ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>),
+    ?assertEqual([], emqx_trace_handler:running()),
+    emqtt:disconnect(T).
+
 %%------------------------------------------------------------------------------
 %% Internal helpers
 %%------------------------------------------------------------------------------

+ 378 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -0,0 +1,378 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_api_rule_apply_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    application:load(emqx_conf),
+    AppsToStart = [
+        emqx,
+        emqx_conf,
+        emqx_connector,
+        emqx_bridge,
+        emqx_bridge_http,
+        emqx_rule_engine
+    ],
+    %% I don't know why we need to stop the apps and then start them but if we
+    %% don't do this and other suites run before this suite the test cases will
+    %% fail as it seems like the connector silently refuses to start.
+    ok = emqx_cth_suite:stop(AppsToStart),
+    Apps = emqx_cth_suite:start(
+        AppsToStart,
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    emqx_mgmt_api_test_util:init_suite(),
+    [{apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_cth_suite:stop(Apps),
+    ok.
+
+init_per_testcase(_Case, Config) ->
+    emqx_bridge_http_test_lib:init_http_success_server(Config).
+
+end_per_testcase(_TestCase, _Config) ->
+    ok = emqx_bridge_http_connector_test_server:stop(),
+    emqx_bridge_v2_testlib:delete_all_bridges(),
+    emqx_bridge_v2_testlib:delete_all_connectors(),
+    emqx_common_test_helpers:call_janitor(),
+    ok.
+
+t_basic_apply_rule_trace_ruleid(Config) ->
+    basic_apply_rule_test_helper(Config, ruleid, false).
+
+t_basic_apply_rule_trace_clientid(Config) ->
+    basic_apply_rule_test_helper(Config, clientid, false).
+
+t_basic_apply_rule_trace_ruleid_stop_after_render(Config) ->
+    basic_apply_rule_test_helper(Config, ruleid, true).
+
+basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
+    HTTPServerConfig = ?config(http_server, Config),
+    emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig),
+    #{status := connected} = emqx_bridge_v2:health_check(
+        http, emqx_bridge_http_test_lib:bridge_name()
+    ),
+    %% Create Rule
+    RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
+    SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
+    {ok, #{<<"id">> := RuleId}} =
+        emqx_bridge_testlib:create_rule_and_action_http(
+            http,
+            RuleTopic,
+            Config,
+            #{sql => SQL}
+        ),
+    ClientId = <<"c_emqx">>,
+    %% ===================================
+    %% Create trace for RuleId
+    %% ===================================
+    TraceName = atom_to_binary(?FUNCTION_NAME),
+    TraceValue =
+        case TraceType of
+            ruleid ->
+                RuleId;
+            clientid ->
+                ClientId
+        end,
+    create_trace(TraceName, TraceType, TraceValue),
+    %% ===================================
+    Context = #{
+        clientid => ClientId,
+        event_type => message_publish,
+        payload => <<"{\"msg\": \"hello\"}">>,
+        qos => 1,
+        topic => RuleTopic,
+        username => <<"u_emqx">>
+    },
+    Params = #{
+        <<"context">> => Context,
+        <<"stop_action_after_template_rendering">> => StopAfterRender
+    },
+    emqx_trace:check(),
+    ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType),
+    Now = erlang:system_time(second) - 10,
+    {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)),
+    ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)),
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        begin
+            Bin = read_rule_trace_file(TraceName, TraceType, Now),
+            io:format("THELOG:~n~s", [Bin]),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
+        end
+    ),
+    case StopAfterRender of
+        true ->
+            ?retry(
+                _Interval0 = 200,
+                _NAttempts0 = 20,
+                begin
+                    Bin = read_rule_trace_file(TraceName, TraceType, Now),
+                    io:format("THELOG2:~n~s", [Bin]),
+                    ?assertNotEqual(
+                        nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>])
+                    )
+                end
+            );
+        false ->
+            ?retry(
+                _Interval0 = 200,
+                _NAttempts0 = 20,
+                begin
+                    Bin = read_rule_trace_file(TraceName, TraceType, Now),
+                    io:format("THELOG3:~n~s", [Bin]),
+                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>]))
+                end
+            )
+    end,
+    emqx_trace:delete(TraceName),
+    ok.
+
+create_trace(TraceName, TraceType, TraceValue) ->
+    Now = erlang:system_time(second) - 10,
+    Start = Now,
+    End = Now + 60,
+    Trace = #{
+        name => TraceName,
+        type => TraceType,
+        TraceType => TraceValue,
+        start_at => Start,
+        end_at => End
+    },
+    emqx_trace_SUITE:reload(),
+    ok = emqx_trace:clear(),
+    {ok, _} = emqx_trace:create(Trace).
+
+t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
+    MeckOpts = [passthrough, no_link, no_history, non_strict],
+    catch meck:new(emqx_connector_info, MeckOpts),
+    meck:expect(
+        emqx_connector_info,
+        hard_coded_test_connector_info_modules,
+        0,
+        [emqx_rule_engine_test_connector_info]
+    ),
+    emqx_connector_info:clean_cache(),
+    catch meck:new(emqx_action_info, MeckOpts),
+    meck:expect(
+        emqx_action_info,
+        hard_coded_test_action_info_modules,
+        0,
+        [emqx_rule_engine_test_action_info]
+    ),
+    emqx_action_info:clean_cache(),
+    {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
+    Name = atom_to_binary(?FUNCTION_NAME),
+    ActionConf =
+        #{
+            <<"connector">> => Name,
+            <<"parameters">> =>
+                #{
+                    <<"values">> =>
+                        #{
+                            <<"send_to_pid">> => emqx_utils:bin_to_hexstr(
+                                term_to_binary(self()), upper
+                            )
+                        }
+                },
+            <<"resource_opts">> => #{
+                <<"batch_size">> => 1000,
+                <<"batch_time">> => 500
+            }
+        },
+    {ok, _} = emqx_bridge_v2:create(
+        rule_engine_test,
+        ?FUNCTION_NAME,
+        ActionConf
+    ),
+    SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>,
+    {ok, RuleID} = create_rule_with_action(
+        rule_engine_test,
+        ?FUNCTION_NAME,
+        SQL
+    ),
+    create_trace(Name, ruleid, RuleID),
+    emqx_trace:check(),
+    ok = emqx_trace_handler_SUITE:filesync(Name, ruleid),
+    Now = erlang:system_time(second) - 10,
+    %% Stop
+    ParmsStopAfterRender = apply_rule_parms(true, Name),
+    ParmsNoStopAfterRender = apply_rule_parms(false, Name),
+    %% Check that batching is working
+    Count = 200,
+    CountMsgFun =
+        fun
+            CountMsgFunRec(0 = _CurCount, GotBatchWithAtLeastTwo) ->
+                GotBatchWithAtLeastTwo;
+            CountMsgFunRec(CurCount, GotBatchWithAtLeastTwo) ->
+                receive
+                    List ->
+                        Len = length(List),
+                        CountMsgFunRec(CurCount - Len, GotBatchWithAtLeastTwo orelse (Len > 1))
+                end
+        end,
+    lists:foreach(
+        fun(_) ->
+            {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender)
+        end,
+        lists:seq(1, Count)
+    ),
+    %% We should get the messages and at least one batch with more than 1
+    true = CountMsgFun(Count, false),
+    %% We should check that we don't get any mixed batch
+    CheckBatchesFun =
+        fun
+            CheckBatchesFunRec(0 = _CurCount) ->
+                ok;
+            CheckBatchesFunRec(CurCount) ->
+                receive
+                    [{_, #{<<"stop_after_render">> := StopValue}} | _] = List ->
+                        [
+                            ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg)
+                         || {_, Msg} <- List
+                        ],
+                        Len = length(List),
+                        CheckBatchesFunRec(CurCount - Len)
+                end
+        end,
+    lists:foreach(
+        fun(_) ->
+            case rand:normal() < 0 of
+                true ->
+                    {ok, _} = call_apply_rule_api(RuleID, ParmsStopAfterRender);
+                false ->
+                    {ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender)
+            end
+        end,
+        lists:seq(1, Count)
+    ),
+    CheckBatchesFun(Count),
+    %% Just check that the log file is created as expected
+    ?retry(
+        _Interval0 = 200,
+        _NAttempts0 = 20,
+        begin
+            Bin = read_rule_trace_file(Name, ruleid, Now),
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])),
+            ?assertNotEqual(
+                nomatch, binary:match(Bin, [<<"action_stopped_after_template_rendering">>])
+            )
+        end
+    ),
+    %% Cleanup
+    ok = emqx_trace:delete(Name),
+    ok = emqx_rule_engine:delete_rule(RuleID),
+    ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME),
+    ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME),
+    [_, _] = meck:unload(),
+    ok.
+
+apply_rule_parms(StopAfterRender, Name) ->
+    Payload = #{<<"is_stop_after_render">> => StopAfterRender},
+    Context = #{
+        clientid => Name,
+        event_type => message_publish,
+        payload => emqx_utils_json:encode(Payload),
+        qos => 1,
+        topic => Name,
+        username => <<"u_emqx">>
+    },
+    #{
+        <<"context">> => Context,
+        <<"stop_action_after_template_rendering">> => StopAfterRender
+    }.
+
+create_rule_with_action(ActionType, ActionName, SQL) ->
+    BridgeId = emqx_bridge_resource:bridge_id(ActionType, ActionName),
+    Params = #{
+        enable => true,
+        sql => SQL,
+        actions => [BridgeId]
+    },
+    Path = emqx_mgmt_api_test_util:api_path(["rules"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    ct:pal("rule action params: ~p", [Params]),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res0} ->
+            #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
+            {ok, RuleId};
+        Error ->
+            Error
+    end.
+
+%% Helper Functions
+
+call_apply_rule_api(RuleId, Params) ->
+    Method = post,
+    Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]),
+    Res = request(Method, Path, Params),
+    Res.
+
+request(Method, Path, Params) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
+        {ok, {Status, Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {ok, {Status, Headers, Body}};
+        {error, {Status, Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {error, {Status, Headers, Body}};
+        Error ->
+            Error
+    end.
+
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.
+
+read_rule_trace_file(TraceName, TraceType, From) ->
+    emqx_trace:check(),
+    ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType),
+    {ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)),
+    Bin.

+ 101 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_test_action_info.erl

@@ -0,0 +1,101 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_test_action_info).
+
+-behaviour(emqx_action_info).
+
+-export([
+    bridge_v1_type_name/0,
+    action_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+-define(CONNECTOR_TYPE, rule_engine_test).
+-define(ACTION_TYPE, ?CONNECTOR_TYPE).
+
+bridge_v1_type_name() -> ?ACTION_TYPE.
+
+action_type_name() -> ?ACTION_TYPE.
+
+connector_type_name() -> ?ACTION_TYPE.
+
+schema_module() -> emqx_rule_engine_test_action_info.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+
+namespace() -> "bridge_test_action_info".
+
+roots() -> [].
+
+fields(Field) when
+    Field == "get_connector";
+    Field == "put_connector";
+    Field == "post_connector"
+->
+    Fields =
+        fields(connector_fields) ++
+            emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
+    emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
+fields(Field) when
+    Field == "get_bridge_v2";
+    Field == "post_bridge_v2";
+    Field == "put_bridge_v2"
+->
+    emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rule_engine_test_action));
+fields(action) ->
+    {?ACTION_TYPE,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(?MODULE, rule_engine_test_action)),
+            #{
+                desc => <<"Test Action Config">>,
+                required => false
+            }
+        )};
+fields(rule_engine_test_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        hoconsc:mk(
+            hoconsc:ref(?MODULE, action_parameters),
+            #{
+                required => true,
+                desc => undefined
+            }
+        )
+    );
+fields(action_parameters) ->
+    [
+        {values,
+            hoconsc:mk(
+                typerefl:map(),
+                #{desc => undefined, default => #{}}
+            )}
+    ];
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        fields(connector_fields) ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
+fields("config") ->
+    emqx_resource_schema:fields("resource_opts") ++
+        fields(connector_fields);
+fields(connector_fields) ->
+    [
+        {values,
+            hoconsc:mk(
+                typerefl:map(),
+                #{desc => undefined, default => #{}}
+            )}
+    ].
+desc(_) ->
+    undefined.

+ 100 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl

@@ -0,0 +1,100 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_rule_engine_test_connector).
+
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-behaviour(emqx_resource).
+
+%% callbacks of behaviour emqx_resource
+-export([
+    callback_mode/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_batch_query/3,
+    on_get_status/2,
+    on_add_channel/4,
+    on_remove_channel/3,
+    on_get_channels/1,
+    on_get_channel_status/3
+]).
+
+%% ===================================================================
+callback_mode() -> always_sync.
+
+on_start(
+    _InstId,
+    _Config
+) ->
+    {ok, #{installed_channels => #{}}}.
+
+on_stop(_InstId, _State) ->
+    ok.
+
+on_add_channel(
+    _InstId,
+    #{
+        installed_channels := InstalledChannels
+    } = OldState,
+    ChannelId,
+    ChannelConfig
+) ->
+    NewInstalledChannels = maps:put(ChannelId, ChannelConfig, InstalledChannels),
+    NewState = OldState#{installed_channels => NewInstalledChannels},
+    {ok, NewState}.
+
+on_remove_channel(
+    _InstId,
+    OldState,
+    _ChannelId
+) ->
+    {ok, OldState}.
+
+on_get_channel_status(
+    _ResId,
+    _ChannelId,
+    _State
+) ->
+    connected.
+
+on_get_channels(ResId) ->
+    emqx_bridge_v2:get_channels_for_connector(ResId).
+
+on_query(
+    _InstId,
+    _Query,
+    _State
+) ->
+    ok.
+
+on_batch_query(
+    _InstId,
+    [{ChannelId, _Req} | _] = Msg,
+    #{installed_channels := Channels} = _State
+) ->
+    #{parameters := #{values := #{send_to_pid := PidBin}}} = maps:get(ChannelId, Channels),
+    Pid = binary_to_term(emqx_utils:hexstr_to_bin(PidBin)),
+    Pid ! Msg,
+    emqx_trace:rendered_action_template(ChannelId, #{nothing_to_render => ok}),
+    ok.
+
+on_get_status(_InstId, _State) ->
+    connected.

+ 43 - 0
apps/emqx_rule_engine/test/emqx_rule_engine_test_connector_info.erl

@@ -0,0 +1,43 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_rule_engine_test_connector_info).
+
+-behaviour(emqx_connector_info).
+
+-export([
+    type_name/0,
+    bridge_types/0,
+    resource_callback_module/0,
+    config_schema/0,
+    schema_module/0,
+    api_schema/1
+]).
+
+type_name() ->
+    rule_engine_test.
+
+bridge_types() ->
+    [rule_engine_test].
+
+resource_callback_module() ->
+    emqx_rule_engine_test_connector.
+
+config_schema() ->
+    {rule_engine_test,
+        hoconsc:mk(
+            hoconsc:map(name, hoconsc:ref(emqx_rule_engine_test_action_info, "config_connector")),
+            #{
+                desc => <<"Test Connector Config">>,
+                required => false
+            }
+        )}.
+
+schema_module() ->
+    emqx_rule_engine_test_action_info.
+
+api_schema(Method) ->
+    emqx_connector_schema:api_ref(
+        ?MODULE, <<"rule_engine_test">>, Method ++ "_connector"
+    ).

+ 1 - 0
changes/ce/feat-12827.en.md

@@ -0,0 +1 @@
+It is now possible to trace rules with a new Rule ID trace filter as well as with the Client ID filter. For testing purposes it is now also possible to use a new HTTP API endpoint (rules/:id/test) to artificially apply a rule and optionally stop its actions after they have been rendered.

+ 1 - 1
mix.exs

@@ -60,7 +60,7 @@ defmodule EMQXUmbrella.MixProject do
       {:grpc, github: "emqx/grpc-erl", tag: "0.6.12", override: true},
       {:minirest, github: "emqx/minirest", tag: "1.4.0", override: true},
       {:ecpool, github: "emqx/ecpool", tag: "0.5.7", override: true},
-      {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true},
+      {:replayq, github: "emqx/replayq", tag: "0.3.8", override: true},
       {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
       # maybe forbid to fetch quicer
       {:emqtt,

+ 1 - 1
rebar.config

@@ -88,7 +88,7 @@
     {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.12"}}},
     {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.4.0"}}},
     {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.7"}}},
-    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}},
+    {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.8"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.10.1"}}},
     {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.2.0"}}},

+ 5 - 0
rel/i18n/emqx_mgmt_api_trace.hocon

@@ -80,6 +80,11 @@ client_ip_addess.desc:
 client_ip_addess.label:
 """Client IP Address"""
 
+ruleid.desc:
+"""Specify the Rule ID if the trace type is 'ruleid'."""
+ruleid.label:
+"""Rule ID"""
+
 trace_status.desc:
 """trace status"""
 trace_status.label:

+ 6 - 0
rel/i18n/emqx_rule_api_schema.hocon

@@ -66,6 +66,12 @@ test_context.desc:
 test_context.label:
 """Event Conetxt"""
 
+stop_action_after_template_render.desc:
+"""Set this to true if the action should be stopped after its template has been rendered (default is true)."""
+
+stop_action_after_template_render.label:
+"""Stop Action After Template Rendering"""
+
 node_node.desc:
 """The node name"""
 

+ 6 - 0
rel/i18n/emqx_rule_engine_api.hocon

@@ -90,4 +90,10 @@ api9.desc:
 api9.label:
 """Get configuration"""
 
+api11.desc:
+"""Apply a rule with the given message and environment"""
+
+api11.label:
+"""Apply Rule"""
+
 }