Просмотр исходного кода

Merge pull request #12916 from kjellwinblad/kjell/fix_traces/EMQX-12025

Improve log structure for HTTP action and add after render trace to other actions
Kjell Winblad 1 год назад
Родитель
Сommit
cad5d8bc31
32 измененных файлов с 387 добавлено и 110 удалено
  1. 6 0
      apps/emqx/include/emqx_trace.hrl
  2. 56 4
      apps/emqx/src/emqx_trace/emqx_trace.erl
  3. 15 0
      apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl
  4. 13 0
      apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl
  5. 5 4
      apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl
  6. 8 1
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl
  7. 21 7
      apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl
  8. 14 0
      apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl
  9. 8 6
      apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl
  10. 19 11
      apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl
  11. 13 8
      apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl
  12. 8 6
      apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl
  13. 6 0
      apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl
  14. 5 0
      apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl
  15. 6 1
      apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl
  16. 9 4
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl
  17. 38 16
      apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl
  18. 6 2
      apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl
  19. 4 2
      apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl
  20. 11 2
      apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl
  21. 33 10
      apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl
  22. 10 1
      apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl
  23. 5 1
      apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl
  24. 7 1
      apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl
  25. 3 0
      apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl
  26. 2 0
      apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
  27. 16 4
      apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl
  28. 1 1
      apps/emqx_mysql/src/emqx_mysql.app.src
  29. 2 0
      apps/emqx_mysql/src/emqx_mysql.erl
  30. 11 3
      apps/emqx_oracle/src/emqx_oracle.erl
  31. 11 3
      apps/emqx_postgresql/src/emqx_postgresql.erl
  32. 15 12
      apps/emqx_rule_engine/src/emqx_rule_runtime.erl

+ 6 - 0
apps/emqx/include/emqx_trace.hrl

@@ -38,4 +38,10 @@
 -define(SHARD, ?COMMON_SHARD).
 -define(MAX_SIZE, 30).
 
+-define(EMQX_TRACE_STOP_ACTION(REASON),
+    {unrecoverable_error, {action_stopped_after_template_rendering, REASON}}
+).
+
+-define(EMQX_TRACE_STOP_ACTION_MATCH, ?EMQX_TRACE_STOP_ACTION(_)).
+
 -endif.

+ 56 - 4
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -29,7 +29,9 @@
     unsubscribe/2,
     log/3,
     log/4,
-    rendered_action_template/2
+    rendered_action_template/2,
+    make_rendered_action_template_trace_context/1,
+    rendered_action_template_with_ctx/2
 ]).
 
 -export([
@@ -70,6 +72,12 @@
 -export_type([ruleid/0]).
 -type ruleid() :: binary().
 
+-export_type([rendered_action_template_ctx/0]).
+-opaque rendered_action_template_ctx() :: #{
+    trace_ctx := map(),
+    action_id := any()
+}.
+
 publish(#message{topic = <<"$SYS/", _/binary>>}) ->
     ignore;
 publish(#message{from = From, topic = Topic, payload = Payload}) when
@@ -87,7 +95,7 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) ->
 unsubscribe(Topic, SubOpts) ->
     ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
 
-rendered_action_template(ActionID, RenderResult) ->
+rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) ->
     TraceResult = ?TRACE(
         "QUERY_RENDER",
         "action_template_rendered",
@@ -107,11 +115,55 @@ rendered_action_template(ActionID, RenderResult) ->
                 )
             ),
             MsgBin = unicode:characters_to_binary(StopMsg),
-            error({unrecoverable_error, {action_stopped_after_template_rendering, MsgBin}});
+            error(?EMQX_TRACE_STOP_ACTION(MsgBin));
         _ ->
             ok
     end,
-    TraceResult.
+    TraceResult;
+rendered_action_template(_ActionID, _RenderResult) ->
+    %% We do nothing if we don't get a valid Action ID. This can happen when
+    %% called from connectors that are used for actions as well as authz and
+    %% authn.
+    ok.
+
+%% The following two functions are used for connectors that don't do the
+%% rendering in the main process (the one that called on_*query). In this case
+%% we need to pass the trace context to the sub process that do the rendering
+%% so that the result of the rendering can be traced correctly. It is also
+%% important to  ensure that the error that can be thrown from
+%% rendered_action_template_with_ctx is handled in the appropriate way in the
+%% sub process.
+-spec make_rendered_action_template_trace_context(any()) -> rendered_action_template_ctx().
+make_rendered_action_template_trace_context(ActionID) ->
+    MetaData =
+        case logger:get_process_metadata() of
+            undefined -> #{};
+            M -> M
+        end,
+    #{trace_ctx => MetaData, action_id => ActionID}.
+
+-spec rendered_action_template_with_ctx(rendered_action_template_ctx(), Result :: term()) -> term().
+rendered_action_template_with_ctx(
+    #{
+        trace_ctx := LogMetaData,
+        action_id := ActionID
+    },
+    RenderResult
+) ->
+    OldMetaData =
+        case logger:get_process_metadata() of
+            undefined -> #{};
+            M -> M
+        end,
+    try
+        logger:set_process_metadata(LogMetaData),
+        emqx_trace:rendered_action_template(
+            ActionID,
+            RenderResult
+        )
+    after
+        logger:set_process_metadata(OldMetaData)
+    end.
 
 log(List, Msg, Meta) ->
     log(debug, List, Msg, Meta).

+ 15 - 0
apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl

@@ -48,6 +48,21 @@ prepare_log_map(LogMap, PEncode) ->
     NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
     maps:from_list(NewKeyValuePairs).
 
+prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) ->
+    %% A cusom formatter is provided with the value
+    try
+        NewV = Formatter(V),
+        prepare_key_value(K, NewV, PEncode)
+    catch
+        _:_ ->
+            {K, V}
+    end;
+prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when
+    is_integer(Status), is_list(Headers), is_binary(Body)
+->
+    %% This is unlikely anything else then info about a HTTP request so we make
+    %% it more structured
+    prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode);
 prepare_key_value(payload = K, V, PEncode) ->
     NewV =
         try

+ 13 - 0
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -223,6 +223,11 @@ do_single_query(InstId, Request, Async, #{pool_name := PoolName} = State) ->
         }
     ),
     {PreparedKeyOrCQL1, Data} = proc_cql_params(Type, PreparedKeyOrCQL, Params, State),
+    emqx_trace:rendered_action_template(PreparedKeyOrCQL, #{
+        type => Type,
+        key_or_cql => PreparedKeyOrCQL1,
+        data => Data
+    }),
     Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrCQL1, Data),
     handle_result(Res).
 
@@ -261,6 +266,14 @@ do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) ->
             state => State
         }
     ),
+    ChannelID =
+        case Requests of
+            [{CID, _} | _] -> CID;
+            _ -> none
+        end,
+    emqx_trace:rendered_action_template(ChannelID, #{
+        cqls => CQLs
+    }),
     Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
     handle_result(Res).
 

+ 5 - 4
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -386,7 +386,7 @@ on_query(
     SimplifiedRequestType = query_type(RequestType),
     Templates = get_templates(RequestType, State),
     SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
-    ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL),
+    ClickhouseResult = execute_sql_in_clickhouse_server(RequestType, PoolName, SQL),
     transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
 
 get_templates(ChannId, State) ->
@@ -398,7 +398,7 @@ get_templates(ChannId, State) ->
     end.
 
 get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) ->
-    emqx_placeholder:proc_tmpl(PreparedSQL, Data);
+    emqx_placeholder:proc_tmpl(PreparedSQL, Data, #{return => full_binary});
 get_sql(_, _, SQL) ->
     SQL.
 
@@ -425,7 +425,7 @@ on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) ->
     %% Create batch insert SQL statement
     SQL = objects_to_sql(ObjectsToInsert, Templates),
     %% Do the actual query in the database
-    ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL),
+    ResultFromClickhouse = execute_sql_in_clickhouse_server(ChannId, PoolName, SQL),
     %% Transform the result to a better format
     transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
 
@@ -464,7 +464,8 @@ objects_to_sql(_, _) ->
 
 %% This function is used by on_query/3 and on_batch_query/3 to send a query to
 %% the database server and receive a result
-execute_sql_in_clickhouse_server(PoolName, SQL) ->
+execute_sql_in_clickhouse_server(Id, PoolName, SQL) ->
+    emqx_trace:rendered_action_template(Id, #{rendered_sql => SQL}),
     ecpool:pick_and_do(
         PoolName,
         {?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]},

+ 8 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -9,6 +9,7 @@
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 
@@ -246,12 +247,16 @@ do_query(
         table := Table,
         templates := Templates
     } = ChannelState,
+    TraceRenderedCTX =
+        emqx_trace:make_rendered_action_template_trace_context(ChannelId),
     Result =
         case ensuare_dynamo_keys(Query, ChannelState) of
             true ->
                 ecpool:pick_and_do(
                     PoolName,
-                    {emqx_bridge_dynamo_connector_client, query, [Table, QueryTuple, Templates]},
+                    {emqx_bridge_dynamo_connector_client, query, [
+                        Table, QueryTuple, Templates, TraceRenderedCTX
+                    ]},
                     no_handover
                 );
             _ ->
@@ -259,6 +264,8 @@ do_query(
         end,
 
     case Result of
+        {error, ?EMQX_TRACE_STOP_ACTION(_)} = Error ->
+            Error;
         {error, Reason} ->
             ?tp(
                 dynamo_connector_query_return,

+ 21 - 7
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector_client.erl

@@ -10,7 +10,7 @@
 -export([
     start_link/1,
     is_connected/2,
-    query/4
+    query/5
 ]).
 
 %% gen_server callbacks
@@ -40,8 +40,8 @@ is_connected(Pid, Timeout) ->
             {false, Error}
     end.
 
-query(Pid, Table, Query, Templates) ->
-    gen_server:call(Pid, {query, Table, Query, Templates}, infinity).
+query(Pid, Table, Query, Templates, TraceRenderedCTX) ->
+    gen_server:call(Pid, {query, Table, Query, Templates, TraceRenderedCTX}, infinity).
 
 %%--------------------------------------------------------------------
 %% @doc
@@ -77,14 +77,14 @@ handle_call(is_connected, _From, State) ->
                 {false, Error}
         end,
     {reply, IsConnected, State};
-handle_call({query, Table, Query, Templates}, _From, State) ->
-    Result = do_query(Table, Query, Templates),
+handle_call({query, Table, Query, Templates, TraceRenderedCTX}, _From, State) ->
+    Result = do_query(Table, Query, Templates, TraceRenderedCTX),
     {reply, Result, State};
 handle_call(_Request, _From, State) ->
     {reply, ok, State}.
 
 handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) ->
-    Result = do_query(Table, Query, Templates),
+    Result = do_query(Table, Query, Templates, {fun(_, _) -> ok end, none}),
     ReplyFun(Context, Result),
     {noreply, State};
 handle_cast(_Request, State) ->
@@ -102,15 +102,29 @@ code_change(_OldVsn, State, _Extra) ->
 %%%===================================================================
 %%% Internal functions
 %%%===================================================================
-do_query(Table, Query0, Templates) ->
+do_query(Table, Query0, Templates, TraceRenderedCTX) ->
     try
         Query = apply_template(Query0, Templates),
+        emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
+            table => Table,
+            query => {fun trace_format_query/1, Query}
+        }),
         execute(Query, Table)
     catch
+        error:{unrecoverable_error, Reason} ->
+            {error, {unrecoverable_error, Reason}};
         _Type:Reason ->
             {error, {unrecoverable_error, {invalid_request, Reason}}}
     end.
 
+trace_format_query({Type, Data}) ->
+    #{type => Type, data => Data};
+trace_format_query([_ | _] = Batch) ->
+    BatchData = [trace_format_query(Q) || Q <- Batch],
+    #{type => batch, data => BatchData};
+trace_format_query(Query) ->
+    Query.
+
 %% some simple query commands for authn/authz or test
 execute({insert_item, Msg}, Table) ->
     Item = convert_to_item(Msg),

+ 14 - 0
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -284,6 +284,13 @@ do_send_requests_sync(ConnectorState, Requests, InstanceId) ->
     Method = post,
     ReqOpts = #{request_ttl => RequestTTL},
     Request = {prepared_request, {Method, Path, Body}, ReqOpts},
+    emqx_trace:rendered_action_template(MessageTag, #{
+        method => Method,
+        path => Path,
+        body => Body,
+        options => ReqOpts,
+        is_async => false
+    }),
     Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client),
     QueryMode = sync,
     handle_result(Result, Request, QueryMode, InstanceId).
@@ -312,6 +319,13 @@ do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) ->
     ReqOpts = #{request_ttl => RequestTTL},
     Request = {prepared_request, {Method, Path, Body}, ReqOpts},
     ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]},
+    emqx_trace:rendered_action_template(MessageTag, #{
+        method => Method,
+        path => Path,
+        body => Body,
+        options => ReqOpts,
+        is_async => true
+    }),
     emqx_bridge_gcp_pubsub_client:query_async(
         Request, ReplyFunAndArgs, Client
     ).

+ 8 - 6
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl

@@ -128,7 +128,7 @@ on_query(InstId, {Channel, Message}, State) ->
                 greptimedb_connector_send_query,
                 #{points => Points, batch => false, mode => sync}
             ),
-            do_query(InstId, Client, Points);
+            do_query(InstId, Channel, Client, Points);
         {error, ErrorPoints} ->
             ?tp(
                 greptimedb_connector_send_query_error,
@@ -152,7 +152,7 @@ on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) ->
                 greptimedb_connector_send_query,
                 #{points => Points, batch => true, mode => sync}
             ),
-            do_query(InstId, Client, Points);
+            do_query(InstId, Channel, Client, Points);
         {error, Reason} ->
             ?tp(
                 greptimedb_connector_send_query_error,
@@ -173,7 +173,7 @@ on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) ->
                 greptimedb_connector_send_query,
                 #{points => Points, batch => false, mode => async}
             ),
-            do_async_query(InstId, Client, Points, {ReplyFun, Args});
+            do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
         {error, ErrorPoints} = Err ->
             ?tp(
                 greptimedb_connector_send_query_error,
@@ -195,7 +195,7 @@ on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, S
                 greptimedb_connector_send_query,
                 #{points => Points, batch => true, mode => async}
             ),
-            do_async_query(InstId, Client, Points, {ReplyFun, Args});
+            do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
         {error, Reason} ->
             ?tp(
                 greptimedb_connector_send_query_error,
@@ -420,7 +420,8 @@ is_auth_key(_) ->
 
 %% -------------------------------------------------------------------------------------------------
 %% Query
-do_query(InstId, Client, Points) ->
+do_query(InstId, Channel, Client, Points) ->
+    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}),
     case greptimedb:write_batch(Client, Points) of
         {ok, #{response := {affected_rows, #{value := Rows}}}} ->
             ?SLOG(debug, #{
@@ -452,12 +453,13 @@ do_query(InstId, Client, Points) ->
             end
     end.
 
-do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
+do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
     ?SLOG(info, #{
         msg => "greptimedb_write_point_async",
         connector => InstId,
         points => Points
     }),
+    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}),
     WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
     ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
 

+ 19 - 11
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl

@@ -134,8 +134,11 @@ on_query(
     #{
         producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
     } = maps:get(ChannelID, Channels),
-    try to_record(PartitionKey, HRecordTemplate, Data) of
-        Record -> append_record(InstId, Producer, Record, false)
+    try
+        KeyAndRawRecord = to_key_and_raw_record(PartitionKey, HRecordTemplate, Data),
+        emqx_trace:rendered_action_template(ChannelID, #{record => KeyAndRawRecord}),
+        Record = to_record(KeyAndRawRecord),
+        append_record(InstId, Producer, Record, false)
     catch
         _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
     end.
@@ -148,8 +151,13 @@ on_batch_query(
     #{
         producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
     } = maps:get(ChannelID, Channels),
-    try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
-        Records -> append_record(InstId, Producer, Records, true)
+    try
+        KeyAndRawRecordList = to_multi_part_key_and_partition_key(
+            PartitionKey, HRecordTemplate, BatchList
+        ),
+        emqx_trace:rendered_action_template(ChannelID, #{records => KeyAndRawRecordList}),
+        Records = [to_record(Item) || Item <- KeyAndRawRecordList],
+        append_record(InstId, Producer, Records, true)
     catch
         _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
     end.
@@ -348,20 +356,20 @@ ensure_start_producer(ProducerName, ProducerOptions) ->
 produce_name(ActionId) ->
     list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)).
 
-to_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
+to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
     PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
     RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data),
-    to_record(PartitionKey, RawRecord).
+    #{partition_key => PartitionKey, raw_record => RawRecord}.
 
-to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) ->
-    to_record(binary_to_list(PartitionKey), RawRecord);
-to_record(PartitionKey, RawRecord) ->
+to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) when is_binary(PartitionKey) ->
+    to_record(#{partition_key => binary_to_list(PartitionKey), raw_record => RawRecord});
+to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) ->
     hstreamdb:to_record(PartitionKey, raw, RawRecord).
 
-to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
+to_multi_part_key_and_partition_key(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
     lists:map(
         fun({_, Data}) ->
-            to_record(PartitionKeyTmpl, HRecordTmpl, Data)
+            to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data)
         end,
         BatchList
     ).

+ 13 - 8
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -359,7 +359,7 @@ on_query(InstId, {Method, Request, Timeout}, State) ->
 on_query(
     InstId,
     {ActionId, KeyOrNum, Method, Request, Timeout, Retry},
-    #{base_path := BasePath} = State
+    #{base_path := BasePath, host := Host} = State
 ) ->
     ?TRACE(
         "QUERY",
@@ -373,7 +373,7 @@ on_query(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
-    trace_rendered_action_template(ActionId, Method, NRequest, Timeout),
+    trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
     Worker = resolve_pool_worker(State, KeyOrNum),
     Result0 = ehttpc:request(
         Worker,
@@ -469,7 +469,7 @@ on_query_async(
     InstId,
     {ActionId, KeyOrNum, Method, Request, Timeout},
     ReplyFunAndArgs,
-    #{base_path := BasePath} = State
+    #{base_path := BasePath, host := Host} = State
 ) ->
     Worker = resolve_pool_worker(State, KeyOrNum),
     ?TRACE(
@@ -483,7 +483,7 @@ on_query_async(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
-    trace_rendered_action_template(ActionId, Method, NRequest, Timeout),
+    trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
     MaxAttempts = maps:get(max_attempts, State, 3),
     Context = #{
         attempt => 1,
@@ -503,15 +503,16 @@ on_query_async(
     ),
     {ok, Worker}.
 
-trace_rendered_action_template(ActionId, Method, NRequest, Timeout) ->
+trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) ->
     case NRequest of
         {Path, Headers} ->
             emqx_trace:rendered_action_template(
                 ActionId,
                 #{
+                    host => Host,
                     path => Path,
                     method => Method,
-                    headers => emqx_utils_redact:redact_headers(Headers),
+                    headers => {fun emqx_utils_redact:redact_headers/1, Headers},
                     timeout => Timeout
                 }
             );
@@ -519,15 +520,19 @@ trace_rendered_action_template(ActionId, Method, NRequest, Timeout) ->
             emqx_trace:rendered_action_template(
                 ActionId,
                 #{
+                    host => Host,
                     path => Path,
                     method => Method,
-                    headers => emqx_utils_redact:redact_headers(Headers),
+                    headers => {fun emqx_utils_redact:redact_headers/1, Headers},
                     timeout => Timeout,
-                    body => Body
+                    body => {fun log_format_body/1, Body}
                 }
             )
     end.
 
+log_format_body(Body) ->
+    unicode:characters_to_binary(Body).
+
 resolve_pool_worker(State, undefined) ->
     resolve_pool_worker(State, self());
 resolve_pool_worker(#{pool_name := PoolName} = State, Key) ->

+ 8 - 6
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl

@@ -130,7 +130,7 @@ on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) ->
                 influxdb_connector_send_query,
                 #{points => Points, batch => false, mode => sync}
             ),
-            do_query(InstId, Client, Points);
+            do_query(InstId, Channel, Client, Points);
         {error, ErrorPoints} ->
             ?tp(
                 influxdb_connector_send_query_error,
@@ -152,7 +152,7 @@ on_batch_query(InstId, BatchData, #{channels := ChannelConf}) ->
                 influxdb_connector_send_query,
                 #{points => Points, batch => true, mode => sync}
             ),
-            do_query(InstId, Client, Points);
+            do_query(InstId, Channel, Client, Points);
         {error, Reason} ->
             ?tp(
                 influxdb_connector_send_query_error,
@@ -175,7 +175,7 @@ on_query_async(
                 influxdb_connector_send_query,
                 #{points => Points, batch => false, mode => async}
             ),
-            do_async_query(InstId, Client, Points, {ReplyFun, Args});
+            do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
         {error, ErrorPoints} = Err ->
             ?tp(
                 influxdb_connector_send_query_error,
@@ -200,7 +200,7 @@ on_batch_query_async(
                 influxdb_connector_send_query,
                 #{points => Points, batch => true, mode => async}
             ),
-            do_async_query(InstId, Client, Points, {ReplyFun, Args});
+            do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
         {error, Reason} ->
             ?tp(
                 influxdb_connector_send_query_error,
@@ -496,7 +496,8 @@ is_auth_key(_) ->
 
 %% -------------------------------------------------------------------------------------------------
 %% Query
-do_query(InstId, Client, Points) ->
+do_query(InstId, Channel, Client, Points) ->
+    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}),
     case influxdb:write(Client, Points) of
         ok ->
             ?SLOG(debug, #{
@@ -527,12 +528,13 @@ do_query(InstId, Client, Points) ->
             end
     end.
 
-do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
+do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
     ?SLOG(info, #{
         msg => "influxdb_write_point_async",
         connector => InstId,
         points => Points
     }),
+    emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}),
     WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
     {ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs).
 

+ 6 - 0
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -319,6 +319,9 @@ on_query(
             emqx_bridge_kafka_impl_producer_sync_query,
             #{headers_config => KafkaHeaders, instance_id => InstId}
         ),
+        emqx_trace:rendered_action_template(MessageTag, #{
+            message => KafkaMessage, send_type => sync
+        }),
         do_send_msg(sync, KafkaMessage, Producers, SyncTimeout)
     catch
         throw:{bad_kafka_header, _} = Error ->
@@ -376,6 +379,9 @@ on_query_async(
             emqx_bridge_kafka_impl_producer_async_query,
             #{headers_config => KafkaHeaders, instance_id => InstId}
         ),
+        emqx_trace:rendered_action_template(MessageTag, #{
+            message => KafkaMessage, send_type => async
+        }),
         do_send_msg(async, KafkaMessage, Producers, AsyncReplyFn)
     catch
         error:{invalid_partition_count, _Count, _Partitioner} ->

+ 5 - 0
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl

@@ -261,6 +261,11 @@ do_send_requests_sync(
         stream_name := StreamName
     } = maps:get(ChannelId, InstalledChannels),
     Records = render_records(Requests, Templates),
+    StructuredRecords = [
+        #{data => Data, partition_key => PartitionKey}
+     || {Data, PartitionKey} <- Records
+    ],
+    emqx_trace:rendered_action_template(ChannelId, StructuredRecords),
     Result = ecpool:pick_and_do(
         PoolName,
         {emqx_bridge_kinesis_connector_client, query, [Records, StreamName]},

+ 6 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -66,10 +66,15 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat
         payload_template := PayloadTemplate,
         collection_template := CollectionTemplate
     } = ChannelState0 = maps:get(Channel, Channels),
+    Collection = emqx_placeholder:proc_tmpl(CollectionTemplate, Message0),
     ChannelState = ChannelState0#{
-        collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
+        collection => Collection
     },
     Message = render_message(PayloadTemplate, Message0),
+    emqx_trace:rendered_action_template(Channel, #{
+        collection => Collection,
+        data => Message
+    }),
     Res = emqx_mongodb:on_query(
         InstanceId,
         {Channel, Message},

+ 9 - 4
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl

@@ -264,7 +264,7 @@ on_query(
     ),
     Channels = maps:get(installed_channels, State),
     ChannelConfig = maps:get(ChannelId, Channels),
-    handle_send_result(with_egress_client(PoolName, send, [Msg, ChannelConfig]));
+    handle_send_result(with_egress_client(ChannelId, PoolName, send, [Msg, ChannelConfig]));
 on_query(ResourceId, {_ChannelId, Msg}, #{}) ->
     ?SLOG(error, #{
         msg => "forwarding_unavailable",
@@ -283,7 +283,7 @@ on_query_async(
     Callback = {fun on_async_result/2, [CallbackIn]},
     Channels = maps:get(installed_channels, State),
     ChannelConfig = maps:get(ChannelId, Channels),
-    Result = with_egress_client(PoolName, send_async, [Msg, Callback, ChannelConfig]),
+    Result = with_egress_client(ChannelId, PoolName, send_async, [Msg, Callback, ChannelConfig]),
     case Result of
         ok ->
             ok;
@@ -300,8 +300,11 @@ on_query_async(ResourceId, {_ChannelId, Msg}, _Callback, #{}) ->
         reason => "Egress is not configured"
     }).
 
-with_egress_client(ResourceId, Fun, Args) ->
-    ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover).
+with_egress_client(ActionID, ResourceId, Fun, Args) ->
+    TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ActionID),
+    ecpool:pick_and_do(
+        ResourceId, {emqx_bridge_mqtt_egress, Fun, [TraceRenderedCTX | Args]}, no_handover
+    ).
 
 on_async_result(Callback, Result) ->
     apply_callback_function(Callback, handle_send_result(Result)).
@@ -337,6 +340,8 @@ classify_error({shutdown, _} = Reason) ->
     {recoverable_error, Reason};
 classify_error(shutdown = Reason) ->
     {recoverable_error, Reason};
+classify_error({unrecoverable_error, _Reason} = Error) ->
+    Error;
 classify_error(Reason) ->
     {unrecoverable_error, Reason}.
 

+ 38 - 16
apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl

@@ -22,8 +22,8 @@
 
 -export([
     config/1,
-    send/3,
-    send_async/4
+    send/4,
+    send_async/5
 ]).
 
 -type message() :: emqx_types:message() | map().
@@ -42,25 +42,40 @@
 config(#{remote := RC = #{}} = Conf) ->
     Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}.
 
--spec send(pid(), message(), egress()) -> ok.
-send(Pid, MsgIn, Egress) ->
-    emqtt:publish(Pid, export_msg(MsgIn, Egress)).
+-spec send(pid(), emqx_trace:rendered_action_template_ctx(), message(), egress()) ->
+    ok | {error, {unrecoverable_error, term()}}.
+send(Pid, TraceRenderedCTX, MsgIn, Egress) ->
+    try
+        emqtt:publish(Pid, export_msg(MsgIn, Egress, TraceRenderedCTX))
+    catch
+        error:{unrecoverable_error, Reason} ->
+            {error, {unrecoverable_error, Reason}}
+    end.
 
--spec send_async(pid(), message(), callback(), egress()) ->
-    ok | {ok, pid()}.
-send_async(Pid, MsgIn, Callback, Egress) ->
-    ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback),
-    {ok, Pid}.
+-spec send_async(pid(), emqx_trace:rendered_action_template_ctx(), message(), callback(), egress()) ->
+    {ok, pid()} | {error, {unrecoverable_error, term()}}.
+send_async(Pid, TraceRenderedCTX, MsgIn, Callback, Egress) ->
+    try
+        ok = emqtt:publish_async(
+            Pid, export_msg(MsgIn, Egress, TraceRenderedCTX), _Timeout = infinity, Callback
+        ),
+        {ok, Pid}
+    catch
+        error:{unrecoverable_error, Reason} ->
+            {error, {unrecoverable_error, Reason}}
+    end.
 
-export_msg(Msg, #{remote := Remote}) ->
-    to_remote_msg(Msg, Remote).
+export_msg(Msg, #{remote := Remote}, TraceRenderedCTX) ->
+    to_remote_msg(Msg, Remote, TraceRenderedCTX).
 
--spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) ->
+-spec to_remote_msg(
+    message(), emqx_bridge_mqtt_msg:msgvars(), emqx_trace:rendered_action_template_ctx()
+) ->
     remote_message().
-to_remote_msg(#message{flags = Flags} = Msg, Vars) ->
+to_remote_msg(#message{flags = Flags} = Msg, Vars, TraceRenderedCTX) ->
     {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
-    to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars);
-to_remote_msg(Msg = #{}, Remote) ->
+    to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars, TraceRenderedCTX);
+to_remote_msg(Msg = #{}, Remote, TraceRenderedCTX) ->
     #{
         topic := Topic,
         payload := Payload,
@@ -68,6 +83,13 @@ to_remote_msg(Msg = #{}, Remote) ->
         retain := Retain
     } = emqx_bridge_mqtt_msg:render(Msg, Remote),
     PubProps = maps:get(pub_props, Msg, #{}),
+    emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
+        qos => QoS,
+        retain => Retain,
+        topic => Topic,
+        props => PubProps,
+        payload => Payload
+    }),
     #mqtt_msg{
         qos = QoS,
         retain = Retain,

+ 6 - 2
apps/emqx_bridge_mysql/src/emqx_bridge_mysql_connector.erl

@@ -104,10 +104,12 @@ on_query(
     #{channels := Channels, connector_state := ConnectorState}
 ) when is_binary(Channel) ->
     ChannelConfig = maps:get(Channel, Channels),
+    MergedState0 = maps:merge(ConnectorState, ChannelConfig),
+    MergedState1 = MergedState0#{channel_id => Channel},
     Result = emqx_mysql:on_query(
         InstanceId,
         Request,
-        maps:merge(ConnectorState, ChannelConfig)
+        MergedState1
     ),
     ?tp(mysql_connector_on_query_return, #{instance_id => InstanceId, result => Result}),
     Result;
@@ -121,10 +123,12 @@ on_batch_query(
 ) when is_binary(element(1, Req)) ->
     Channel = element(1, Req),
     ChannelConfig = maps:get(Channel, Channels),
+    MergedState0 = maps:merge(ConnectorState, ChannelConfig),
+    MergedState1 = MergedState0#{channel_id => Channel},
     Result = emqx_mysql:on_batch_query(
         InstanceId,
         BatchRequest,
-        maps:merge(ConnectorState, ChannelConfig)
+        MergedState1
     ),
     ?tp(mysql_connector_on_batch_query_return, #{instance_id => InstanceId, result => Result}),
     Result;

+ 4 - 2
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -167,9 +167,10 @@ on_batch_query(
     BatchReq,
     #{channels := Channels} = State
 ) ->
+    [{ChannelId, _} | _] = BatchReq,
     case try_render_messages(BatchReq, Channels) of
         {ok, Datas} ->
-            do_query(InstanceId, Datas, State);
+            do_query(InstanceId, ChannelId, Datas, State);
         Error ->
             Error
     end.
@@ -222,12 +223,13 @@ on_get_channel_status(InstanceId, ChannelId, #{channels := Channels} = State) ->
 %% Helper fns
 %%========================================================================================
 
-do_query(InstanceId, Query, #{pool_name := PoolName} = State) ->
+do_query(InstanceId, ChannelID, Query, #{pool_name := PoolName} = State) ->
     ?TRACE(
         "QUERY",
         "opents_connector_received",
         #{connector => InstanceId, query => Query, state => State}
     ),
+    emqx_trace:rendered_action_template(ChannelID, #{query => Query}),
 
     ?tp(opents_bridge_on_query, #{instance_id => InstanceId}),
 

+ 11 - 2
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -196,6 +196,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
             {error, channel_not_found};
         {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} ->
             PulsarMessage = render_message(Message, MessageTmpl),
+            emqx_trace:rendered_action_template(ChannelId, #{
+                message => PulsarMessage,
+                sync_timeout => SyncTimeout,
+                is_async => false
+            }),
             try
                 pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
             catch
@@ -217,12 +222,16 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
             ?tp_span(
                 pulsar_producer_on_query_async,
                 #{instance_id => _InstanceId, message => Message},
-                on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn)
+                on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn)
             )
     end.
 
-on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) ->
+on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
     PulsarMessage = render_message(Message, MessageTmpl),
+    emqx_trace:rendered_action_template(ChannelId, #{
+        message => PulsarMessage,
+        is_async => true
+    }),
     pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
 
 %%-------------------------------------------------------------------------------------

+ 33 - 10
apps/emqx_bridge_rabbitmq/src/emqx_bridge_rabbitmq_connector.erl

@@ -9,6 +9,7 @@
 -include_lib("emqx_resource/include/emqx_resource.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -41,7 +42,7 @@
 -export([connect/1]).
 
 %% Internal callbacks
--export([publish_messages/4]).
+-export([publish_messages/5]).
 
 namespace() -> "rabbitmq".
 
@@ -214,9 +215,10 @@ on_query(ResourceID, {ChannelId, Data} = MsgReq, State) ->
     #{channels := Channels} = State,
     case maps:find(ChannelId, Channels) of
         {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
+            TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
             Res = ecpool:pick_and_do(
                 ResourceID,
-                {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq]]},
+                {?MODULE, publish_messages, [RabbitMQ, ProcParam, [MsgReq], TraceRenderedCTX]},
                 no_handover
             ),
             handle_result(Res);
@@ -234,9 +236,10 @@ on_batch_query(ResourceID, [{ChannelId, _Data} | _] = Batch, State) ->
     #{channels := Channels} = State,
     case maps:find(ChannelId, Channels) of
         {ok, #{param := ProcParam, rabbitmq := RabbitMQ}} ->
+            TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
             Res = ecpool:pick_and_do(
                 ResourceID,
-                {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch]},
+                {?MODULE, publish_messages, [RabbitMQ, ProcParam, Batch, TraceRenderedCTX]},
                 no_handover
             ),
             handle_result(Res);
@@ -255,7 +258,8 @@ publish_messages(
         wait_for_publish_confirmations := WaitForPublishConfirmations,
         publish_confirmation_timeout := PublishConfirmationTimeout
     },
-    Messages
+    Messages,
+    TraceRenderedCTX
 ) ->
     try
         publish_messages(
@@ -267,15 +271,18 @@ publish_messages(
             PayloadTmpl,
             Messages,
             WaitForPublishConfirmations,
-            PublishConfirmationTimeout
+            PublishConfirmationTimeout,
+            TraceRenderedCTX
         )
     catch
+        error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason ->
+            {error, Reason};
         %% if send a message to a non-existent exchange, RabbitMQ client will crash
         %% {shutdown,{server_initiated_close,404,<<"NOT_FOUND - no exchange 'xyz' in vhost '/'">>}
         %% so we catch and return {recoverable_error, Reason} to increase metrics
         _Type:Reason ->
             Msg = iolist_to_binary(io_lib:format("RabbitMQ: publish_failed: ~p", [Reason])),
-            erlang:error({recoverable_error, Msg})
+            {error, {recoverable_error, Msg}}
     end.
 
 publish_messages(
@@ -287,7 +294,8 @@ publish_messages(
     PayloadTmpl,
     Messages,
     WaitForPublishConfirmations,
-    PublishConfirmationTimeout
+    PublishConfirmationTimeout,
+    TraceRenderedCTX
 ) ->
     case maps:find(Conn, RabbitMQ) of
         {ok, Channel} ->
@@ -299,18 +307,33 @@ publish_messages(
                 exchange = Exchange,
                 routing_key = RoutingKey
             },
+            FormattedMsgs = [
+                format_data(PayloadTmpl, M)
+             || {_, M} <- Messages
+            ],
+            emqx_trace:rendered_action_template_with_ctx(TraceRenderedCTX, #{
+                messages => FormattedMsgs,
+                properties => #{
+                    headers => [],
+                    delivery_mode => DeliveryMode
+                },
+                method => #{
+                    exchange => Exchange,
+                    routing_key => RoutingKey
+                }
+            }),
             lists:foreach(
-                fun({_, MsgRaw}) ->
+                fun(Msg) ->
                     amqp_channel:cast(
                         Channel,
                         Method,
                         #amqp_msg{
-                            payload = format_data(PayloadTmpl, MsgRaw),
+                            payload = Msg,
                             props = MessageProperties
                         }
                     )
                 end,
-                Messages
+                FormattedMsgs
             ),
             case WaitForPublishConfirmations of
                 true ->

+ 10 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -107,7 +107,7 @@ on_query(InstId, {cmd, Cmd}, #{conn_st := RedisConnSt}) ->
     Result;
 on_query(
     InstId,
-    {_MessageTag, _Data} = Msg,
+    {MessageTag, _Data} = Msg,
     #{channels := Channels, conn_st := RedisConnSt}
 ) ->
     case try_render_message([Msg], Channels) of
@@ -116,6 +116,10 @@ on_query(
                 redis_bridge_connector_cmd,
                 #{cmd => Cmd, batch => false, mode => sync}
             ),
+            emqx_trace:rendered_action_template(
+                MessageTag,
+                #{command => Cmd, batch => false, mode => sync}
+            ),
             Result = query(InstId, {cmd, Cmd}, RedisConnSt),
             ?tp(
                 redis_bridge_connector_send_done,
@@ -135,6 +139,11 @@ on_batch_query(
                 redis_bridge_connector_send,
                 #{batch_data => BatchData, batch => true, mode => sync}
             ),
+            [{ChannelID, _} | _] = BatchData,
+            emqx_trace:rendered_action_template(
+                ChannelID,
+                #{commands => Cmds, batch => ture, mode => sync}
+            ),
             Result = query(InstId, {cmds, Cmds}, RedisConnSt),
             ?tp(
                 redis_bridge_connector_send_done,

+ 5 - 1
apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl

@@ -264,7 +264,11 @@ do_query(
 
     TopicKey = get_topic_key(Query, TopicTks),
     Data = apply_template(Query, Templates, DispatchStrategy),
-
+    emqx_trace:rendered_action_template(ChannelId, #{
+        topic_key => TopicKey,
+        data => Data,
+        request_timeout => RequestTimeout
+    }),
     Result = safe_do_produce(
         ChannelId, InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout
     ),

+ 7 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -168,13 +168,14 @@ init_channel_state(#{parameters := Parameters}) ->
 on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
     case maps:get(Tag, Channels, undefined) of
         ChannelState = #{} ->
-            run_simple_upload(InstId, Data, ChannelState, Config);
+            run_simple_upload(InstId, Tag, Data, ChannelState, Config);
         undefined ->
             {error, {unrecoverable_error, {invalid_message_tag, Tag}}}
     end.
 
 run_simple_upload(
     InstId,
+    ChannelID,
     Data,
     #{
         bucket := BucketTemplate,
@@ -188,6 +189,11 @@ run_simple_upload(
     Client = emqx_s3_client:create(Bucket, Config),
     Key = render_key(KeyTemplate, Data),
     Content = render_content(ContentTemplate, Data),
+    emqx_trace:rendered_action_template(ChannelID, #{
+        bucket => Bucket,
+        key => Key,
+        content => Content
+    }),
     case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
         ok ->
             ?tp(s3_bridge_connector_upload_ok, #{

+ 3 - 0
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -413,6 +413,9 @@ do_query(
     %% only insert sql statement for single query and batch query
     case apply_template(QueryTuple, Templates) of
         {?ACTION_SEND_MESSAGE, SQL} ->
+            emqx_trace:rendered_action_template(ChannelId, #{
+                sql => SQL
+            }),
             Result = ecpool:pick_and_do(
                 PoolName,
                 {?MODULE, worker_do_insert, [SQL, State]},

+ 2 - 0
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl

@@ -273,6 +273,8 @@ do_query(
     Result =
         case try_render_message(Query, Channels) of
             {ok, Msg} ->
+                [{ChannelID, _} | _] = Query,
+                emqx_trace:rendered_action_template(ChannelID, #{message => Msg}),
                 ecpool:pick_and_do(
                     PoolName,
                     {emqx_bridge_syskeeper_client, forward, [Msg, AckTimeout + ?EXTRA_CALL_TIMEOUT]},

+ 16 - 4
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -10,6 +10,7 @@
 
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("emqx_resource/include/emqx_resource.hrl").
@@ -32,7 +33,7 @@
 
 -export([connector_examples/1]).
 
--export([connect/1, do_get_status/1, execute/3, do_batch_insert/4]).
+-export([connect/1, do_get_status/1, execute/3, do_batch_insert/5]).
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
@@ -186,6 +187,7 @@ on_query(InstanceId, {ChannelId, Data}, #{channels := Channels} = State) ->
     case maps:find(ChannelId, Channels) of
         {ok, #{insert := Tokens, opts := Opts}} ->
             Query = emqx_placeholder:proc_tmpl(Tokens, Data),
+            emqx_trace:rendered_action_template(ChannelId, #{query => Query}),
             do_query_job(InstanceId, {?MODULE, execute, [Query, Opts]}, State);
         _ ->
             {error, {unrecoverable_error, {invalid_channel_id, InstanceId}}}
@@ -199,9 +201,10 @@ on_batch_query(
 ) ->
     case maps:find(ChannelId, Channels) of
         {ok, #{batch := Tokens, opts := Opts}} ->
+            TraceRenderedCTX = emqx_trace:make_rendered_action_template_trace_context(ChannelId),
             do_query_job(
                 InstanceId,
-                {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts]},
+                {?MODULE, do_batch_insert, [Tokens, BatchReq, Opts, TraceRenderedCTX]},
                 State
             );
         _ ->
@@ -338,9 +341,18 @@ do_query_job(InstanceId, Job, #{pool_name := PoolName} = State) ->
 execute(Conn, Query, Opts) ->
     tdengine:insert(Conn, Query, Opts).
 
-do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
+do_batch_insert(Conn, Tokens, BatchReqs, Opts, TraceRenderedCTX) ->
     SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
-    execute(Conn, SQL, Opts).
+    try
+        emqx_trace:rendered_action_template_with_ctx(
+            TraceRenderedCTX,
+            #{query => SQL}
+        ),
+        execute(Conn, SQL, Opts)
+    catch
+        error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason ->
+            {error, Reason}
+    end.
 
 aggregate_query(BatchTks, BatchReqs, Acc) ->
     lists:foldl(

+ 1 - 1
apps/emqx_mysql/src/emqx_mysql.app.src

@@ -1,6 +1,6 @@
 {application, emqx_mysql, [
     {description, "EMQX MySQL Database Connector"},
-    {vsn, "0.1.8"},
+    {vsn, "0.1.9"},
     {registered, []},
     {applications, [
         kernel,

+ 2 - 0
apps/emqx_mysql/src/emqx_mysql.erl

@@ -498,6 +498,8 @@ on_sql_query(
 ) ->
     LogMeta = #{connector => InstId, sql => SQLOrKey, state => State},
     ?TRACE("QUERY", "mysql_connector_received", LogMeta),
+    ChannelID = maps:get(channel_id, State, no_channel),
+    emqx_trace:rendered_action_template(ChannelID, #{sql => SQLOrKey}),
     Worker = ecpool:get_client(PoolName),
     case ecpool_worker:client(Worker) of
         {ok, Conn} ->

+ 11 - 3
apps/emqx_oracle/src/emqx_oracle.erl

@@ -210,7 +210,7 @@ on_query(
     }),
     Type = query,
     {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
-    Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
+    Res = on_sql_query(InstId, TypeOrKey, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
     handle_result(Res).
 
 on_batch_query(
@@ -244,7 +244,9 @@ on_batch_query(
                     Datas2 = [emqx_placeholder:proc_sql(TokenList, Data) || Data <- Datas],
                     St = maps:get(BinKey, Sts),
                     case
-                        on_sql_query(InstId, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2)
+                        on_sql_query(
+                            InstId, BinKey, PoolName, execute_batch, ?SYNC_QUERY_MODE, St, Datas2
+                        )
                     of
                         {ok, Results} ->
                             handle_batch_result(Results, 0);
@@ -281,7 +283,13 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{
             end
     end.
 
-on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
+on_sql_query(InstId, ChannelID, PoolName, Type, ApplyMode, NameOrSQL, Data) ->
+    emqx_trace:rendered_action_template(ChannelID, #{
+        type => Type,
+        apply_mode => ApplyMode,
+        name_or_sql => NameOrSQL,
+        data => Data
+    }),
     case ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, ApplyMode) of
         {error, Reason} = Result ->
             ?tp(

+ 11 - 3
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -304,7 +304,7 @@ on_query(
     }),
     Type = pgsql_query_type(TypeOrKey),
     {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
-    Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
+    Res = on_sql_query(TypeOrKey, InstId, PoolName, Type, NameOrSQL2, Data),
     ?tp(postgres_bridge_connector_on_query_return, #{instance_id => InstId, result => Res}),
     handle_result(Res).
 
@@ -337,7 +337,7 @@ on_batch_query(
         {_Statement, RowTemplate} ->
             PrepStatement = get_prepared_statement(BinKey, State),
             Rows = [render_prepare_sql_row(RowTemplate, Data) || {_Key, Data} <- BatchReq],
-            case on_sql_query(InstId, PoolName, execute_batch, PrepStatement, Rows) of
+            case on_sql_query(Key, InstId, PoolName, execute_batch, PrepStatement, Rows) of
                 {error, _Error} = Result ->
                     handle_result(Result);
                 {_Column, Results} ->
@@ -386,7 +386,15 @@ get_prepared_statement(Key, #{prepares := PrepStatements}) ->
     BinKey = to_bin(Key),
     maps:get(BinKey, PrepStatements).
 
-on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
+on_sql_query(Key, InstId, PoolName, Type, NameOrSQL, Data) ->
+    emqx_trace:rendered_action_template(
+        Key,
+        #{
+            statement_type => Type,
+            statement_or_name => NameOrSQL,
+            data => Data
+        }
+    ),
     try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
         {error, Reason} ->
             ?tp(

+ 15 - 12
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -18,6 +18,7 @@
 
 -include("rule_engine.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("emqx/include/emqx_trace.hrl").
 -include_lib("emqx_resource/include/emqx_resource_errors.hrl").
 
 -export([
@@ -141,22 +142,24 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
 
 set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) ->
     logger:update_process_metadata(#{
-        clientid => ClientID
-    }),
-    set_process_trace_metadata(RuleID, maps:remove(clientid, Columns));
+        clientid => ClientID,
+        rule_id => RuleID,
+        rule_trigger_time => rule_trigger_time(Columns)
+    });
 set_process_trace_metadata(RuleID, Columns) ->
-    EventTimestamp =
-        case Columns of
-            #{timestamp := Timestamp} ->
-                Timestamp;
-            _ ->
-                erlang:system_time(millisecond)
-        end,
     logger:update_process_metadata(#{
         rule_id => RuleID,
-        rule_trigger_time => EventTimestamp
+        rule_trigger_time => rule_trigger_time(Columns)
     }).
 
+rule_trigger_time(Columns) ->
+    case Columns of
+        #{timestamp := Timestamp} ->
+            Timestamp;
+        _ ->
+            erlang:system_time(millisecond)
+    end.
+
 reset_process_trace_metadata(#{clientid := _ClientID}) ->
     Meta = logger:get_process_metadata(),
     Meta1 = maps:remove(clientid, Meta),
@@ -722,7 +725,7 @@ inc_action_metrics(TraceCtx, Result) ->
 
 do_inc_action_metrics(
     #{rule_id := RuleId, action_id := ActId} = TraceContext,
-    {error, {unrecoverable_error, {action_stopped_after_template_rendering, Explanation}} = _Reason}
+    {error, ?EMQX_TRACE_STOP_ACTION(Explanation) = _Reason}
 ) ->
     TraceContext1 = maps:remove(action_id, TraceContext),
     trace_action(