Просмотр исходного кода

Merge pull request #12958 from kjellwinblad/kjell/fixup_trace

fix: rule trace formatting, republish and console stop after rendering
Kjell Winblad 1 год назад
Родитель
Сommit
004dc80fb2

+ 5 - 0
apps/emqx/include/emqx_trace.hrl

@@ -35,6 +35,11 @@
     end_at :: integer() | undefined | '_'
 }).
 
+-record(emqx_trace_format_func_data, {
+    function :: fun((any()) -> any()),
+    data :: any()
+}).
+
 -define(SHARD, ?COMMON_SHARD).
 -define(MAX_SIZE, 30).
 

+ 1 - 1
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -229,7 +229,7 @@ best_effort_json_obj(Map, Config) ->
             do_format_msg("~p", [Map], Config)
     end.
 
-json(A, _) when is_atom(A) -> atom_to_binary(A, utf8);
+json(A, _) when is_atom(A) -> A;
 json(I, _) when is_integer(I) -> I;
 json(F, _) when is_float(F) -> F;
 json(P, C) when is_pid(P) -> json(pid_to_list(P), C);

+ 42 - 12
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -31,7 +31,8 @@
     log/4,
     rendered_action_template/2,
     make_rendered_action_template_trace_context/1,
-    rendered_action_template_with_ctx/2
+    rendered_action_template_with_ctx/2,
+    is_rule_trace_active/0
 ]).
 
 -export([
@@ -96,6 +97,16 @@ unsubscribe(Topic, SubOpts) ->
     ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
 
 rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) ->
+    do_rendered_action_template(ActionID, RenderResult);
+rendered_action_template(#{mod := _, func := _} = ActionID, RenderResult) ->
+    do_rendered_action_template(ActionID, RenderResult);
+rendered_action_template(_ActionID, _RenderResult) ->
+    %% We do nothing if we don't get a valid Action ID. This can happen when
+    %% called from connectors that are used for actions as well as authz and
+    %% authn.
+    ok.
+
+do_rendered_action_template(ActionID, RenderResult) ->
     TraceResult = ?TRACE(
         "QUERY_RENDER",
         "action_template_rendered",
@@ -108,23 +119,25 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) ->
         #{stop_action_after_render := true} ->
             %% We throw an unrecoverable error to stop action before the
             %% resource is called/modified
-            StopMsg = lists:flatten(
+            ActionIDStr =
+                case ActionID of
+                    Bin when is_binary(Bin) ->
+                        Bin;
+                    Term ->
+                        ActionIDFormatted = io_lib:format("~tw", [Term]),
+                        unicode:characters_to_binary(ActionIDFormatted)
+                end,
+            StopMsg =
                 io_lib:format(
                     "Action ~ts stopped after template rendering due to test setting.",
-                    [ActionID]
-                )
-            ),
+                    [ActionIDStr]
+                ),
             MsgBin = unicode:characters_to_binary(StopMsg),
             error(?EMQX_TRACE_STOP_ACTION(MsgBin));
         _ ->
             ok
     end,
-    TraceResult;
-rendered_action_template(_ActionID, _RenderResult) ->
-    %% We do nothing if we don't get a valid Action ID. This can happen when
-    %% called from connectors that are used for actions as well as authz and
-    %% authn.
-    ok.
+    TraceResult.
 
 %% The following two functions are used for connectors that don't do the
 %% rendering in the main process (the one that called on_*query). In this case
@@ -165,6 +178,16 @@ rendered_action_template_with_ctx(
         logger:set_process_metadata(OldMetaData)
     end.
 
+is_rule_trace_active() ->
+    case logger:get_process_metadata() of
+        #{rule_id := RID} when is_binary(RID) ->
+            true;
+        #{rule_ids := RIDs} when map_size(RIDs) > 0 ->
+            true;
+        _ ->
+            false
+    end.
+
 log(List, Msg, Meta) ->
     log(debug, List, Msg, Meta).
 
@@ -382,7 +405,14 @@ code_change(_, State, _Extra) ->
     {ok, State}.
 
 insert_new_trace(Trace) ->
-    transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]).
+    case transaction(fun emqx_trace_dl:insert_new_trace/1, [Trace]) of
+        {error, _} = Error ->
+            Error;
+        Res ->
+            %% We call this to ensure the trace is active when we return
+            check(),
+            Res
+    end.
 
 update_trace(Traces) ->
     Now = now_second(),

+ 24 - 5
apps/emqx/src/emqx_trace/emqx_trace_formatter.erl

@@ -15,9 +15,11 @@
 %%--------------------------------------------------------------------
 -module(emqx_trace_formatter).
 -include("emqx_mqtt.hrl").
+-include("emqx_trace.hrl").
 
 -export([format/2]).
 -export([format_meta_map/1]).
+-export([evaluate_lazy_values/1]).
 
 %% logger_formatter:config/0 is not exported.
 -type config() :: map().
@@ -28,18 +30,35 @@
     LogEvent :: logger:log_event(),
     Config :: config().
 format(
-    #{level := debug, meta := Meta = #{trace_tag := Tag}, msg := Msg},
+    #{level := debug, meta := Meta0 = #{trace_tag := Tag}, msg := Msg},
     #{payload_encode := PEncode}
 ) ->
+    Meta1 = evaluate_lazy_values(Meta0),
     Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
-    ClientId = to_iolist(maps:get(clientid, Meta, "")),
-    Peername = maps:get(peername, Meta, ""),
-    MetaBin = format_meta(Meta, PEncode),
+    ClientId = to_iolist(maps:get(clientid, Meta1, "")),
+    Peername = maps:get(peername, Meta1, ""),
+    MetaBin = format_meta(Meta1, PEncode),
     Msg1 = to_iolist(Msg),
     Tag1 = to_iolist(Tag),
     [Time, " [", Tag1, "] ", ClientId, "@", Peername, " msg: ", Msg1, ", ", MetaBin, "\n"];
 format(Event, Config) ->
-    emqx_logger_textfmt:format(Event, Config).
+    emqx_logger_textfmt:format(evaluate_lazy_values(Event), Config).
+
+evaluate_lazy_values(Map) when is_map(Map) ->
+    maps:map(fun evaluate_lazy_values_kv/2, Map);
+evaluate_lazy_values(V) ->
+    V.
+
+evaluate_lazy_values_kv(_K, #emqx_trace_format_func_data{function = Formatter, data = V}) ->
+    try
+        NewV = Formatter(V),
+        evaluate_lazy_values(NewV)
+    catch
+        _:_ ->
+            V
+    end;
+evaluate_lazy_values_kv(_K, V) ->
+    evaluate_lazy_values(V).
 
 format_meta_map(Meta) ->
     Encode = emqx_trace_handler:payload_encode(),

+ 27 - 18
apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl

@@ -16,6 +16,7 @@
 -module(emqx_trace_json_formatter).
 
 -include("emqx_mqtt.hrl").
+-include("emqx_trace.hrl").
 
 -export([format/2]).
 
@@ -30,15 +31,16 @@
     LogEvent :: logger:log_event(),
     Config :: config().
 format(
-    LogMap,
+    LogMap0,
     #{payload_encode := PEncode}
 ) ->
+    LogMap1 = emqx_trace_formatter:evaluate_lazy_values(LogMap0),
     %% We just make some basic transformations on the input LogMap and then do
     %% an external call to create the JSON text
     Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
-    LogMap1 = LogMap#{time => Time},
-    LogMap2 = prepare_log_map(LogMap1, PEncode),
-    [emqx_logger_jsonfmt:best_effort_json(LogMap2, [force_utf8]), "\n"].
+    LogMap2 = LogMap1#{time => Time},
+    LogMap3 = prepare_log_map(LogMap2, PEncode),
+    [emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"].
 
 %%%-----------------------------------------------------------------
 %%% Helper Functions
@@ -48,21 +50,26 @@ prepare_log_map(LogMap, PEncode) ->
     NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
     maps:from_list(NewKeyValuePairs).
 
-prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) ->
-    %% A cusom formatter is provided with the value
-    try
-        NewV = Formatter(V),
-        prepare_key_value(K, NewV, PEncode)
-    catch
-        _:_ ->
-            {K, V}
-    end;
-prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when
-    is_integer(Status), is_list(Headers), is_binary(Body)
+prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when
+    is_integer(I1),
+    is_integer(I2),
+    is_integer(I3),
+    is_integer(I4)
+->
+    %% We assume this is an IP address
+    {host, unicode:characters_to_binary(inet:ntoa(IP))};
+prepare_key_value(host, {I1, I2, I3, I4, I5, I6, I7, I8} = IP, _PEncode) when
+    is_integer(I1),
+    is_integer(I2),
+    is_integer(I3),
+    is_integer(I4),
+    is_integer(I5),
+    is_integer(I6),
+    is_integer(I7),
+    is_integer(I8)
 ->
-    %% This is unlikely anything else then info about a HTTP request so we make
-    %% it more structured
-    prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode);
+    %% We assume this is an IP address
+    {host, unicode:characters_to_binary(inet:ntoa(IP))};
 prepare_key_value(payload = K, V, PEncode) ->
     NewV =
         try
@@ -137,6 +144,8 @@ format_map_set_to_list(Map) ->
     ],
     lists:sort(Items).
 
+format_action_info(#{mod := _Mod, func := _Func} = FuncCall) ->
+    FuncCall;
 format_action_info(V) ->
     [<<"action">>, Type, Name | _] = binary:split(V, <<":">>, [global]),
     #{

+ 4 - 1
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -252,11 +252,14 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
 create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
     BridgeName = ?config(bridge_name, Config),
     BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
+    create_rule_and_action(BridgeId, RuleTopic, Opts).
+
+create_rule_and_action(Action, RuleTopic, Opts) ->
     SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
     Params = #{
         enable => true,
         sql => SQL,
-        actions => [BridgeId]
+        actions => [Action]
     },
     Path = emqx_mgmt_api_test_util:api_path(["rules"]),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),

+ 6 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl

@@ -27,6 +27,8 @@
 -export([execute/2]).
 -endif.
 
+-include_lib("emqx/include/emqx_trace.hrl").
+
 %%%===================================================================
 %%% API
 %%%===================================================================
@@ -107,7 +109,10 @@ do_query(Table, Query0, Templates, TraceRenderedCTX) ->
         Query = apply_template(Query0, Templates),
         emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
             table => Table,
-            query => {fun trace_format_query/1, Query}
+            query => #emqx_trace_format_func_data{
+                function = fun trace_format_query/1,
+                data = Query
+            }
         }),
         execute(Query, Table)
     catch

+ 77 - 12
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -20,6 +20,7 @@
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 
 -behaviour(emqx_resource).
 
@@ -35,7 +36,8 @@
     on_add_channel/4,
     on_remove_channel/3,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 
 -export([reply_delegator/3]).
@@ -232,6 +234,7 @@ on_start(
         port => Port,
         connect_timeout => ConnectTimeout,
         base_path => BasePath,
+        scheme => Scheme,
         request => preprocess_request(maps:get(request, Config, undefined))
     },
     case start_pool(InstId, PoolOpts) of
@@ -359,7 +362,7 @@ on_query(InstId, {Method, Request, Timeout}, State) ->
 on_query(
     InstId,
     {ActionId, KeyOrNum, Method, Request, Timeout, Retry},
-    #{base_path := BasePath, host := Host} = State
+    #{base_path := BasePath, host := Host, scheme := Scheme, port := Port} = State
 ) ->
     ?TRACE(
         "QUERY",
@@ -373,7 +376,7 @@ on_query(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
-    trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
+    trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
     Worker = resolve_pool_worker(State, KeyOrNum),
     Result0 = ehttpc:request(
         Worker,
@@ -469,7 +472,7 @@ on_query_async(
     InstId,
     {ActionId, KeyOrNum, Method, Request, Timeout},
     ReplyFunAndArgs,
-    #{base_path := BasePath, host := Host} = State
+    #{base_path := BasePath, host := Host, port := Port, scheme := Scheme} = State
 ) ->
     Worker = resolve_pool_worker(State, KeyOrNum),
     ?TRACE(
@@ -483,7 +486,7 @@ on_query_async(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
-    trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
+    trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
     MaxAttempts = maps:get(max_attempts, State, 3),
     Context = #{
         attempt => 1,
@@ -492,7 +495,8 @@ on_query_async(
         key_or_num => KeyOrNum,
         method => Method,
         request => NRequest,
-        timeout => Timeout
+        timeout => Timeout,
+        trace_metadata => logger:get_process_metadata()
     },
     ok = ehttpc:request_async(
         Worker,
@@ -503,17 +507,25 @@ on_query_async(
     ),
     {ok, Worker}.
 
-trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) ->
+trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout) ->
     case NRequest of
         {Path, Headers} ->
             emqx_trace:rendered_action_template(
                 ActionId,
                 #{
                     host => Host,
+                    port => Port,
                     path => Path,
                     method => Method,
-                    headers => {fun emqx_utils_redact:redact_headers/1, Headers},
-                    timeout => Timeout
+                    headers => #emqx_trace_format_func_data{
+                        function = fun emqx_utils_redact:redact_headers/1,
+                        data = Headers
+                    },
+                    timeout => Timeout,
+                    url => #emqx_trace_format_func_data{
+                        function = fun render_url/1,
+                        data = {Scheme, Host, Port, Path}
+                    }
                 }
             );
         {Path, Headers, Body} ->
@@ -521,15 +533,42 @@ trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) ->
                 ActionId,
                 #{
                     host => Host,
+                    port => Port,
                     path => Path,
                     method => Method,
-                    headers => {fun emqx_utils_redact:redact_headers/1, Headers},
+                    headers => #emqx_trace_format_func_data{
+                        function = fun emqx_utils_redact:redact_headers/1,
+                        data = Headers
+                    },
                     timeout => Timeout,
-                    body => {fun log_format_body/1, Body}
+                    body => #emqx_trace_format_func_data{
+                        function = fun log_format_body/1,
+                        data = Body
+                    },
+                    url => #emqx_trace_format_func_data{
+                        function = fun render_url/1,
+                        data = {Scheme, Host, Port, Path}
+                    }
                 }
             )
     end.
 
+render_url({Scheme, Host, Port, Path}) ->
+    SchemeStr =
+        case Scheme of
+            http ->
+                <<"http://">>;
+            https ->
+                <<"https://">>
+        end,
+    unicode:characters_to_binary([
+        SchemeStr,
+        Host,
+        <<":">>,
+        erlang:integer_to_binary(Port),
+        Path
+    ]).
+
 log_format_body(Body) ->
     unicode:characters_to_binary(Body).
 
@@ -605,6 +644,26 @@ on_get_channel_status(
     %% XXX: Reuse the connector status
     on_get_status(InstId, State).
 
+on_format_query_result({ok, Status, Headers, Body}) ->
+    #{
+        result => ok,
+        response => #{
+            status => Status,
+            headers => Headers,
+            body => Body
+        }
+    };
+on_format_query_result({ok, Status, Headers}) ->
+    #{
+        result => ok,
+        response => #{
+            status => Status,
+            headers => Headers
+        }
+    };
+on_format_query_result(Result) ->
+    Result.
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -807,9 +866,15 @@ to_bin(Str) when is_list(Str) ->
 to_bin(Atom) when is_atom(Atom) ->
     atom_to_binary(Atom, utf8).
 
-reply_delegator(Context, ReplyFunAndArgs, Result0) ->
+reply_delegator(
+    #{trace_metadata := TraceMetadata} = Context,
+    ReplyFunAndArgs,
+    Result0
+) ->
     spawn(fun() ->
+        logger:set_process_metadata(TraceMetadata),
         Result = transform_result(Result0),
+        logger:unset_process_metadata(),
         maybe_retry(Result, Context, ReplyFunAndArgs)
     end).
 

+ 7 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -20,7 +20,8 @@
     on_query/3,
     on_batch_query/3,
     on_get_status/2,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 
 %% -------------------------------------------------------------------------------------------------
@@ -161,6 +162,11 @@ on_batch_query(
             Error
     end.
 
+on_format_query_result({ok, Msg}) ->
+    #{result => ok, message => Msg};
+on_format_query_result(Res) ->
+    Res.
+
 %% -------------------------------------------------------------------------------------------------
 %% private helpers
 %% -------------------------------------------------------------------------------------------------

+ 5 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -7,6 +7,7 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/trace.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 -include("emqx_bridge_s3.hrl").
 
 -behaviour(emqx_resource).
@@ -320,7 +321,10 @@ run_simple_upload(
     emqx_trace:rendered_action_template(ChannelID, #{
         bucket => Bucket,
         key => Key,
-        content => Content
+        content => #emqx_trace_format_func_data{
+            function = fun unicode:characters_to_binary/1,
+            data = Content
+        }
     }),
     case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
         ok ->

+ 116 - 45
apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl

@@ -290,54 +290,125 @@ t_http_test_json_formatter(_Config) ->
         end
      || JSONEntry <- LogEntries
     ],
+    ListIterFun =
+        fun
+            ListIterFunRec([]) ->
+                ok;
+            ListIterFunRec([Item | Rest]) ->
+                receive
+                    From ->
+                        From ! {list_iter_item, Item}
+                end,
+                ListIterFunRec(Rest)
+        end,
+    ListIter = spawn_link(fun() -> ListIterFun(DecodedLogEntries) end),
+    NextFun =
+        fun() ->
+            ListIter ! self(),
+            receive
+                {list_iter_item, Item} ->
+                    Item
+            end
+        end,
     ?assertMatch(
-        [
-            #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}},
-            #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}},
-            #{
-                <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>}
-            },
-            #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}},
-            #{<<"meta">> := #{<<"str">> := <<"str">>}},
-            #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}},
-            #{<<"meta">> := <<_/binary>>},
-            #{<<"meta">> := #{<<"integer">> := 42}},
-            #{<<"meta">> := #{<<"float">> := 1.2}},
-            #{<<"meta">> := <<_/binary>>},
-            #{<<"meta">> := <<_/binary>>},
-            #{<<"meta">> := <<_/binary>>},
-            #{<<"meta">> := #{<<"sub">> := #{}}},
-            #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}},
-            #{<<"meta">> := #{<<"true">> := <<"true">>, <<"false">> := <<"false">>}},
-            #{
-                <<"meta">> := #{
-                    <<"list">> := #{
-                        <<"key">> := <<"value">>,
-                        <<"key2">> := <<"value2">>
-                    }
-                }
-            },
-            #{
-                <<"meta">> := #{
-                    <<"client_ids">> := [<<"a">>, <<"b">>, <<"c">>]
-                }
-            },
-            #{
-                <<"meta">> := #{
-                    <<"rule_ids">> := [<<"a">>, <<"b">>, <<"c">>]
+        #{<<"meta">> := #{<<"payload">> := <<"log_this_message">>}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"payload">> := <<"\nlog\nthis\nmessage">>}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{<<"payload">> := <<"\\\nlog\n_\\n_this\nmessage\\">>}
+        },
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"payload">> := <<"\"log_this_message\"">>}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"str">> := <<"str">>}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"term">> := <<"{notjson}">>}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := <<_/binary>>},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"integer">> := 42}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"float">> := 1.2}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := <<_/binary>>},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := <<_/binary>>},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := <<_/binary>>},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"sub">> := #{}}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"sub">> := #{<<"key">> := <<"value">>}}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{<<"meta">> := #{<<"true">> := true, <<"false">> := false}},
+        NextFun()
+    ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{
+                <<"list">> := #{
+                    <<"key">> := <<"value">>,
+                    <<"key2">> := <<"value2">>
                 }
-            },
-            #{
-                <<"meta">> := #{
-                    <<"action_info">> := #{
-                        <<"type">> := <<"http">>,
-                        <<"name">> := <<"emqx_bridge_http_test_lib">>
-                    }
+            }
+        },
+        NextFun()
+    ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{
+                <<"client_ids">> := [<<"a">>, <<"b">>, <<"c">>]
+            }
+        },
+        NextFun()
+    ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{
+                <<"rule_ids">> := [<<"a">>, <<"b">>, <<"c">>]
+            }
+        },
+        NextFun()
+    ),
+    ?assertMatch(
+        #{
+            <<"meta">> := #{
+                <<"action_info">> := #{
+                    <<"type">> := <<"http">>,
+                    <<"name">> := <<"emqx_bridge_http_test_lib">>
                 }
             }
-            | _
-        ],
-        DecodedLogEntries
+        },
+        NextFun()
     ),
     {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))),
     ?assertEqual(<<>>, Delete),
@@ -495,7 +566,7 @@ create_trace(Name, Type, TypeValue, Start) ->
             ?block_until(#{?snk_kind := update_trace_done})
         end,
         fun(Trace) ->
-            ?assertMatch([#{}], ?of_kind(update_trace_done, Trace))
+            ?assertMatch([#{} | _], ?of_kind(update_trace_done, Trace))
         end
     ).
 

+ 7 - 1
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -38,7 +38,8 @@
     on_add_channel/4,
     on_remove_channel/3,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 
 -export([connect/1]).
@@ -695,6 +696,11 @@ handle_result({error, Error}) ->
 handle_result(Res) ->
     Res.
 
+on_format_query_result({ok, Cnt}) when is_integer(Cnt) ->
+    #{result => ok, affected_rows => Cnt};
+on_format_query_result(Res) ->
+    Res.
+
 handle_batch_result([{ok, Count} | Rest], Acc) ->
     handle_batch_result(Rest, Acc + Count);
 handle_batch_result([{error, Error} | _Rest], _Acc) ->

+ 21 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -86,7 +86,9 @@
     forget_allocated_resources/1,
     deallocate_resource/2,
     %% Get channel config from resource
-    call_get_channel_config/3
+    call_get_channel_config/3,
+    % Call the format query result function
+    call_format_query_result/2
 ]).
 
 %% Direct calls to the callback module
@@ -154,7 +156,8 @@
     on_add_channel/4,
     on_remove_channel/3,
     on_get_channels/1,
-    query_mode/1
+    query_mode/1,
+    on_format_query_result/1
 ]).
 
 %% when calling emqx_resource:start/1
@@ -230,6 +233,14 @@
     ResId :: term()
 ) -> [term()].
 
+%% When given the result of a on_*query call this function should return a
+%% version of the result that is suitable for JSON trace logging. This
+%% typically means converting Erlang tuples to maps with appropriate names for
+%% the values in the tuple.
+-callback on_format_query_result(
+    QueryResult :: term()
+) -> term().
+
 -define(SAFE_CALL(EXPR),
     (fun() ->
         try
@@ -551,6 +562,14 @@ call_get_channel_config(ResId, ChannelId, Mod) ->
                 <<"on_get_channels callback function not available for resource id", ResId/binary>>}
     end.
 
+call_format_query_result(Mod, Result) ->
+    case erlang:function_exported(Mod, on_format_query_result, 1) of
+        true ->
+            Mod:on_format_query_result(Result);
+        false ->
+            Result
+    end.
+
 -spec call_stop(resource_id(), module(), resource_state()) -> term().
 call_stop(ResId, Mod, ResourceState) ->
     ?SAFE_CALL(begin

+ 29 - 6
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -104,6 +104,20 @@ pre_process_action_args(_, Args) ->
 %%--------------------------------------------------------------------
 -spec console(map(), map(), map()) -> any().
 console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
+    case logger:get_process_metadata() of
+        #{action_id := ActionID} ->
+            emqx_trace:rendered_action_template(
+                ActionID,
+                #{
+                    selected => Selected,
+                    environment => Envs
+                }
+            );
+        _ ->
+            %% We may not have an action ID in the metadata if this is called
+            %% from a test case or similar
+            ok
+    end,
     ?ULOG(
         "[rule action] ~ts~n"
         "\tAction Data: ~p~n"
@@ -149,15 +163,24 @@ republish(
     PubProps0 = render_pub_props(UserPropertiesTemplate, Selected, Env),
     MQTTProps = render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
     PubProps = maps:merge(PubProps0, MQTTProps),
+    TraceInfo = #{
+        flags => Flags,
+        topic => Topic,
+        payload => Payload,
+        pub_props => PubProps
+    },
+    case logger:get_process_metadata() of
+        #{action_id := ActionID} ->
+            emqx_trace:rendered_action_template(ActionID, TraceInfo);
+        _ ->
+            %% We may not have an action ID in the metadata if this is called
+            %% from a test case or similar
+            ok
+    end,
     ?TRACE(
         "RULE",
         "republish_message",
-        #{
-            flags => Flags,
-            topic => Topic,
-            payload => Payload,
-            pub_props => PubProps
-        }
+        TraceInfo
     ),
     safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
 

+ 4 - 1
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -274,6 +274,7 @@ schema("/rules/:id/test") ->
             responses => #{
                 400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
                 412 => error_schema('NOT_MATCH', "SQL Not Match"),
+                404 => error_schema('RULE_NOT_FOUND', "The rule could not be found"),
                 200 => <<"Rule Applied">>
             }
         }
@@ -419,11 +420,13 @@ param_path_id() ->
         begin
             case emqx_rule_sqltester:apply_rule(RuleId, CheckedParams) of
                 {ok, Result} ->
-                    {200, Result};
+                    {200, emqx_logger_jsonfmt:best_effort_json_obj(Result)};
                 {error, {parse_error, Reason}} ->
                     {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
                 {error, nomatch} ->
                     {412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
+                {error, rule_not_found} ->
+                    {404, #{code => 'RULE_NOT_FOUND', message => <<"The rule could not be found">>}};
                 {error, Reason} ->
                     {400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
             end

+ 52 - 4
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -415,6 +415,15 @@ handle_action(RuleId, ActId, Selected, Envs) ->
                 rule_metrics, RuleId, 'actions.failed.out_of_service'
             ),
             trace_action(ActId, "out_of_service", #{}, warning);
+        error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason ->
+            ?EMQX_TRACE_STOP_ACTION(Explanation) = Reason,
+            trace_action(
+                ActId,
+                "action_stopped_after_template_rendering",
+                #{reason => Explanation}
+            ),
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
         Err:Reason:ST ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
@@ -475,7 +484,18 @@ do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -
     trace_action(Action, "call_action_function"),
     %% the function can also throw 'out_of_service'
     Args = maps:get(args, Action, []),
-    Result = Mod:Func(Selected, Envs, Args),
+    PrevProcessMetadata =
+        case logger:get_process_metadata() of
+            undefined -> #{};
+            D -> D
+        end,
+    Result =
+        try
+            logger:update_process_metadata(#{action_id => Action}),
+            Mod:Func(Selected, Envs, Args)
+        after
+            logger:set_process_metadata(PrevProcessMetadata)
+        end,
     {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
     inc_action_metrics(IncCtx, Result),
     Result.
@@ -747,21 +767,49 @@ do_inc_action_metrics(
     {error, {unrecoverable_error, _} = Reason}
 ) ->
     TraceContext1 = maps:remove(action_id, TraceContext),
-    trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext1)),
+    FormatterRes = #emqx_trace_format_func_data{
+        function = fun trace_formatted_result/1,
+        data = {ActId, Reason}
+    },
+    trace_action(ActId, "action_failed", maps:merge(#{reason => FormatterRes}, TraceContext1)),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
     emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
 do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) ->
     TraceContext1 = maps:remove(action_id, TraceContext),
+    FormatterRes = #emqx_trace_format_func_data{
+        function = fun trace_formatted_result/1,
+        data = {ActId, R}
+    },
     case is_ok_result(R) of
         false ->
-            trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext1)),
+            trace_action(
+                ActId,
+                "action_failed",
+                maps:merge(#{reason => FormatterRes}, TraceContext1)
+            ),
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
         true ->
-            trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext1)),
+            trace_action(
+                ActId,
+                "action_success",
+                maps:merge(#{result => FormatterRes}, TraceContext1)
+            ),
             emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
     end.
 
+trace_formatted_result({{bridge_v2, Type, _Name}, R}) ->
+    ConnectorType = emqx_action_info:action_type_to_connector_type(Type),
+    ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
+    emqx_resource:call_format_query_result(ResourceModule, R);
+trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) ->
+    BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
+    ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type),
+    ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
+    emqx_resource:call_format_query_result(ResourceModule, R);
+trace_formatted_result({_, R}) ->
+    R.
+
 is_ok_result(ok) ->
     true;
 is_ok_result({async_return, R}) ->

+ 11 - 1
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -26,12 +26,22 @@
 
 apply_rule(
     RuleId,
+    Parameters
+) ->
+    case emqx_rule_engine:get_rule(RuleId) of
+        {ok, Rule} ->
+            do_apply_rule(Rule, Parameters);
+        not_found ->
+            {error, rule_not_found}
+    end.
+
+do_apply_rule(
+    Rule,
     #{
         context := Context,
         stop_action_after_template_rendering := StopAfterRender
     }
 ) ->
-    {ok, Rule} = emqx_rule_engine:get_rule(RuleId),
     InTopic = get_in_topic(Context),
     EventTopics = maps:get(from, Rule, []),
     case lists:all(fun is_publish_topic/1, EventTopics) of

+ 116 - 17
apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl

@@ -26,7 +26,24 @@
 -define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        emqx_common_test_helpers:all(?MODULE),
+        {group, republish},
+        {group, console_print}
+    ].
+
+groups() ->
+    [
+        {republish, [], basic_tests()},
+        {console_print, [], basic_tests()}
+    ].
+
+basic_tests() ->
+    [
+        t_basic_apply_rule_trace_ruleid,
+        t_basic_apply_rule_trace_clientid,
+        t_basic_apply_rule_trace_ruleid_stop_after_render
+    ].
 
 init_per_suite(Config) ->
     application:load(emqx_conf),
@@ -50,6 +67,12 @@ init_per_suite(Config) ->
     emqx_mgmt_api_test_util:init_suite(),
     [{apps, Apps} | Config].
 
+init_per_group(GroupName, Config) ->
+    [{group_name, GroupName} | Config].
+
+end_per_group(_GroupName, Config) ->
+    Config.
+
 end_per_suite(Config) ->
     Apps = ?config(apps, Config),
     emqx_mgmt_api_test_util:end_suite(),
@@ -67,28 +90,58 @@ end_per_testcase(_TestCase, _Config) ->
     ok.
 
 t_basic_apply_rule_trace_ruleid(Config) ->
-    basic_apply_rule_test_helper(Config, ruleid, false).
+    basic_apply_rule_test_helper(get_action(Config), ruleid, false).
 
 t_basic_apply_rule_trace_clientid(Config) ->
-    basic_apply_rule_test_helper(Config, clientid, false).
+    basic_apply_rule_test_helper(get_action(Config), clientid, false).
 
 t_basic_apply_rule_trace_ruleid_stop_after_render(Config) ->
-    basic_apply_rule_test_helper(Config, ruleid, true).
+    basic_apply_rule_test_helper(get_action(Config), ruleid, true).
 
-basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
+get_action(Config) ->
+    case ?config(group_name, Config) of
+        republish ->
+            republish_action();
+        console_print ->
+            console_print_action();
+        _ ->
+            make_http_bridge(Config)
+    end.
+
+make_http_bridge(Config) ->
     HTTPServerConfig = ?config(http_server, Config),
     emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig),
     #{status := connected} = emqx_bridge_v2:health_check(
         http, emqx_bridge_http_test_lib:bridge_name()
     ),
+    BridgeName = ?config(bridge_name, Config),
+    emqx_bridge_resource:bridge_id(http, BridgeName).
+
+republish_action() ->
+    #{
+        <<"args">> =>
+            #{
+                <<"mqtt_properties">> => #{},
+                <<"payload">> => <<"MY PL">>,
+                <<"qos">> => 0,
+                <<"retain">> => false,
+                <<"topic">> => <<"rule_apply_test_SUITE">>,
+                <<"user_properties">> => <<>>
+            },
+        <<"function">> => <<"republish">>
+    }.
+
+console_print_action() ->
+    #{<<"function">> => <<"console">>}.
+
+basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
     %% Create Rule
     RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
     SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
     {ok, #{<<"id">> := RuleId}} =
-        emqx_bridge_testlib:create_rule_and_action_http(
-            http,
+        emqx_bridge_testlib:create_rule_and_action(
+            Action,
             RuleTopic,
-            Config,
             #{sql => SQL}
         ),
     ClientId = <<"c_emqx">>,
@@ -117,10 +170,7 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
         <<"context">> => Context,
         <<"stop_action_after_template_rendering">> => StopAfterRender
     },
-    emqx_trace:check(),
-    ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType),
     Now = erlang:system_time(second) - 10,
-    {ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)),
     ?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)),
     ?retry(
         _Interval0 = 200,
@@ -130,9 +180,14 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
             io:format("THELOG:~n~s", [Bin]),
             ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
             ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])),
-            ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
-            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])),
-            ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
+            case Action of
+                A when is_binary(A) ->
+                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
+                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]));
+                _ ->
+                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"call_action_function">>]))
+            end,
+            ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>]))
         end
     ),
     case StopAfterRender of
@@ -155,7 +210,8 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
                 begin
                     Bin = read_rule_trace_file(TraceName, TraceType, Now),
                     io:format("THELOG3:~n~s", [Bin]),
-                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>]))
+                    ?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>])),
+                    do_final_log_check(Action, Bin)
                 end
             )
     end,
@@ -176,6 +232,51 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
     emqx_trace:delete(TraceName),
     ok.
 
+do_final_log_check(Action, Bin0) when is_binary(Action) ->
+    %% The last line in the Bin should be the action_success entry
+    Bin1 = string:trim(Bin0),
+    LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
+    LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
+    %% Check that lazy formatting of the action result works correctly
+    ?assertMatch(
+        #{
+            <<"level">> := <<"debug">>,
+            <<"meta">> :=
+                #{
+                    <<"action_info">> :=
+                        #{
+                            <<"name">> := <<"emqx_bridge_http_test_lib">>,
+                            <<"type">> := <<"http">>
+                        },
+                    <<"clientid">> := <<"c_emqx">>,
+                    <<"result">> :=
+                        #{
+                            <<"response">> :=
+                                #{
+                                    <<"body">> := <<"hello">>,
+                                    <<"headers">> :=
+                                        #{
+                                            <<"content-type">> := <<"text/plain">>,
+                                            <<"date">> := _,
+                                            <<"server">> := _
+                                        },
+                                    <<"status">> := 200
+                                },
+                            <<"result">> := <<"ok">>
+                        },
+                    <<"rule_id">> := _,
+                    <<"rule_trigger_time">> := _,
+                    <<"stop_action_after_render">> := false,
+                    <<"trace_tag">> := <<"ACTION">>
+                },
+            <<"msg">> := <<"action_success">>,
+            <<"time">> := _
+        },
+        LastEntryJSON
+    );
+do_final_log_check(_, _) ->
+    ok.
+
 create_trace(TraceName, TraceType, TraceValue) ->
     Now = erlang:system_time(second) - 10,
     Start = Now,
@@ -239,8 +340,6 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
         SQL
     ),
     create_trace(Name, ruleid, RuleID),
-    emqx_trace:check(),
-    ok = emqx_trace_handler_SUITE:filesync(Name, ruleid),
     Now = erlang:system_time(second) - 10,
     %% Stop
     ParmsStopAfterRender = apply_rule_parms(true, Name),