Przeglądaj źródła

fix: rule trace formatting, republish and console stop after rendering

* Better rule trace formatting for many trace entries
* The republish and console actions have got working stop after
  rendering functionality
Kjell Winblad 1 rok temu
rodzic
commit
ea7633c484

+ 8 - 1
apps/emqx/src/emqx_logger_jsonfmt.erl

@@ -229,6 +229,13 @@ best_effort_json_obj(Map, Config) ->
             do_format_msg("~p", [Map], Config)
     end.
 
+json_value(true, _Config) ->
+    true;
+json_value(false, _Config) ->
+    false;
+json_value(V, Config) ->
+    json(V, Config).
+
 json(A, _) when is_atom(A) -> atom_to_binary(A, utf8);
 json(I, _) when is_integer(I) -> I;
 json(F, _) when is_float(F) -> F;
@@ -317,7 +324,7 @@ json_kv(K0, V, Config) ->
     K = json_key(K0),
     case is_map(V) of
         true -> {K, best_effort_json_obj(V, Config)};
-        false -> {K, json(V, Config)}
+        false -> {K, json_value(V, Config)}
     end.
 
 json_key(A) when is_atom(A) -> json_key(atom_to_binary(A, utf8));

+ 34 - 11
apps/emqx/src/emqx_trace/emqx_trace.erl

@@ -31,7 +31,8 @@
     log/4,
     rendered_action_template/2,
     make_rendered_action_template_trace_context/1,
-    rendered_action_template_with_ctx/2
+    rendered_action_template_with_ctx/2,
+    is_rule_trace_active/0
 ]).
 
 -export([
@@ -96,6 +97,16 @@ unsubscribe(Topic, SubOpts) ->
     ?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
 
 rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) ->
+    do_rendered_action_template(ActionID, RenderResult);
+rendered_action_template(#{mod := _, func := _} = ActionID, RenderResult) ->
+    do_rendered_action_template(ActionID, RenderResult);
+rendered_action_template(_ActionID, _RenderResult) ->
+    %% We do nothing if we don't get a valid Action ID. This can happen when
+    %% called from connectors that are used for actions as well as authz and
+    %% authn.
+    ok.
+
+do_rendered_action_template(ActionID, RenderResult) ->
     TraceResult = ?TRACE(
         "QUERY_RENDER",
         "action_template_rendered",
@@ -108,23 +119,25 @@ rendered_action_template(<<"action:", _/binary>> = ActionID, RenderResult) ->
         #{stop_action_after_render := true} ->
             %% We throw an unrecoverable error to stop action before the
             %% resource is called/modified
-            StopMsg = lists:flatten(
+            ActionIDStr =
+                case ActionID of
+                    Bin when is_binary(Bin) ->
+                        Bin;
+                    Term ->
+                        ActionIDFormatted = io_lib:format("~tw", [Term]),
+                        unicode:characters_to_binary(ActionIDFormatted)
+                end,
+            StopMsg =
                 io_lib:format(
                     "Action ~ts stopped after template rendering due to test setting.",
-                    [ActionID]
-                )
-            ),
+                    [ActionIDStr]
+                ),
             MsgBin = unicode:characters_to_binary(StopMsg),
             error(?EMQX_TRACE_STOP_ACTION(MsgBin));
         _ ->
             ok
     end,
-    TraceResult;
-rendered_action_template(_ActionID, _RenderResult) ->
-    %% We do nothing if we don't get a valid Action ID. This can happen when
-    %% called from connectors that are used for actions as well as authz and
-    %% authn.
-    ok.
+    TraceResult.
 
 %% The following two functions are used for connectors that don't do the
 %% rendering in the main process (the one that called on_*query). In this case
@@ -165,6 +178,16 @@ rendered_action_template_with_ctx(
         logger:set_process_metadata(OldMetaData)
     end.
 
+is_rule_trace_active() ->
+    case logger:get_process_metadata() of
+        #{rule_id := RID} when is_binary(RID) ->
+            true;
+        #{rule_ids := RIDs} when map_size(RIDs) > 0 ->
+            true;
+        _ ->
+            false
+    end.
+
 log(List, Msg, Meta) ->
     log(debug, List, Msg, Meta).
 

+ 41 - 5
apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl

@@ -57,12 +57,46 @@ prepare_key_value(K, {Formatter, V}, PEncode) when is_function(Formatter, 1) ->
         _:_ ->
             {K, V}
     end;
-prepare_key_value(K, {ok, Status, Headers, Body}, PEncode) when
-    is_integer(Status), is_list(Headers), is_binary(Body)
+prepare_key_value(K, {ok, {Formatter, V}}, PEncode) when is_function(Formatter, 1) ->
+    %% Unwrap
+    prepare_key_value(K, {Formatter, V}, PEncode);
+prepare_key_value(host, {I1, I2, I3, I4}, _PEncode) when
+    is_integer(I1),
+    is_integer(I2),
+    is_integer(I3),
+    is_integer(I4)
 ->
-    %% This is unlikely anything else then info about a HTTP request so we make
-    %% it more structured
-    prepare_key_value(K, #{status => Status, headers => Headers, body => Body}, PEncode);
+    %% We assume this is an IP address
+    {host,
+        unicode:characters_to_binary([
+            integer_to_binary(I1),
+            <<".">>,
+            integer_to_binary(I2),
+            <<".">>,
+            integer_to_binary(I3),
+            <<".">>,
+            integer_to_binary(I4)
+        ])};
+prepare_key_value(K, {ok, StatusCode, Headers}, PEncode) when
+    is_integer(StatusCode), StatusCode >= 200, StatusCode < 300, is_list(Headers)
+->
+    prepare_key_value(K, {ok, StatusCode, Headers, <<"">>}, PEncode);
+prepare_key_value(K, {ok, StatusCode, Headers, Body}, PEncode) when
+    is_integer(StatusCode), StatusCode >= 200, StatusCode < 300, is_list(Headers)
+->
+    %% We assume this is that response of an HTTP request
+    prepare_key_value(
+        K,
+        #{
+            result => ok,
+            response => #{
+                status => StatusCode,
+                headers => Headers,
+                body => Body
+            }
+        },
+        PEncode
+    );
 prepare_key_value(payload = K, V, PEncode) ->
     NewV =
         try
@@ -137,6 +171,8 @@ format_map_set_to_list(Map) ->
     ],
     lists:sort(Items).
 
+format_action_info(#{mod := _Mod, func := _Func} = FuncCall) ->
+    FuncCall;
 format_action_info(V) ->
     [<<"action">>, Type, Name | _] = binary:split(V, <<":">>, [global]),
     #{

+ 57 - 11
apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl

@@ -232,6 +232,7 @@ on_start(
         port => Port,
         connect_timeout => ConnectTimeout,
         base_path => BasePath,
+        scheme => Scheme,
         request => preprocess_request(maps:get(request, Config, undefined))
     },
     case start_pool(InstId, PoolOpts) of
@@ -359,7 +360,7 @@ on_query(InstId, {Method, Request, Timeout}, State) ->
 on_query(
     InstId,
     {ActionId, KeyOrNum, Method, Request, Timeout, Retry},
-    #{base_path := BasePath, host := Host} = State
+    #{base_path := BasePath, host := Host, scheme := Scheme, port := Port} = State
 ) ->
     ?TRACE(
         "QUERY",
@@ -373,7 +374,7 @@ on_query(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
-    trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
+    trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
     Worker = resolve_pool_worker(State, KeyOrNum),
     Result0 = ehttpc:request(
         Worker,
@@ -413,6 +414,24 @@ on_query(
             Result
     end.
 
+maybe_trace_format_result(Res) ->
+    %% If rule tracing is active, then we know that the connector is used by an
+    %% action and that the result is used for tracing only. This is why we can
+    %% add a function to lazily format the trace entry.
+    case emqx_trace:is_rule_trace_active() of
+        true ->
+            {ok, {fun trace_format_result/1, Res}};
+        false ->
+            Res
+    end.
+
+trace_format_result({ok, Status, Headers, Body}) ->
+    #{status => Status, headers => Headers, body => Body};
+trace_format_result({ok, Status, Headers}) ->
+    #{status => Status, headers => Headers};
+trace_format_result(Result) ->
+    Result.
+
 %% BridgeV1 entrypoint
 on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
     case maps:get(request, State, undefined) of
@@ -469,7 +488,7 @@ on_query_async(
     InstId,
     {ActionId, KeyOrNum, Method, Request, Timeout},
     ReplyFunAndArgs,
-    #{base_path := BasePath, host := Host} = State
+    #{base_path := BasePath, host := Host, port := Port, scheme := Scheme} = State
 ) ->
     Worker = resolve_pool_worker(State, KeyOrNum),
     ?TRACE(
@@ -483,7 +502,7 @@ on_query_async(
         }
     ),
     NRequest = formalize_request(Method, BasePath, Request),
-    trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout),
+    trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout),
     MaxAttempts = maps:get(max_attempts, State, 3),
     Context = #{
         attempt => 1,
@@ -492,7 +511,8 @@ on_query_async(
         key_or_num => KeyOrNum,
         method => Method,
         request => NRequest,
-        timeout => Timeout
+        timeout => Timeout,
+        trace_metadata => logger:get_process_metadata()
     },
     ok = ehttpc:request_async(
         Worker,
@@ -503,17 +523,19 @@ on_query_async(
     ),
     {ok, Worker}.
 
-trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) ->
+trace_rendered_action_template(ActionId, Scheme, Host, Port, Method, NRequest, Timeout) ->
     case NRequest of
         {Path, Headers} ->
             emqx_trace:rendered_action_template(
                 ActionId,
                 #{
                     host => Host,
+                    port => Port,
                     path => Path,
                     method => Method,
                     headers => {fun emqx_utils_redact:redact_headers/1, Headers},
-                    timeout => Timeout
+                    timeout => Timeout,
+                    url => {fun render_url/1, {Scheme, Host, Port, Path}}
                 }
             );
         {Path, Headers, Body} ->
@@ -521,15 +543,33 @@ trace_rendered_action_template(ActionId, Host, Method, NRequest, Timeout) ->
                 ActionId,
                 #{
                     host => Host,
+                    port => Port,
                     path => Path,
                     method => Method,
                     headers => {fun emqx_utils_redact:redact_headers/1, Headers},
                     timeout => Timeout,
-                    body => {fun log_format_body/1, Body}
+                    body => {fun log_format_body/1, Body},
+                    url => {fun render_url/1, {Scheme, Host, Port, Path}}
                 }
             )
     end.
 
+render_url({Scheme, Host, Port, Path}) ->
+    SchemeStr =
+        case Scheme of
+            http ->
+                <<"http://">>;
+            https ->
+                <<"https://">>
+        end,
+    unicode:characters_to_binary([
+        SchemeStr,
+        Host,
+        <<":">>,
+        erlang:integer_to_binary(Port),
+        Path
+    ]).
+
 log_format_body(Body) ->
     unicode:characters_to_binary(Body).
 
@@ -807,9 +847,15 @@ to_bin(Str) when is_list(Str) ->
 to_bin(Atom) when is_atom(Atom) ->
     atom_to_binary(Atom, utf8).
 
-reply_delegator(Context, ReplyFunAndArgs, Result0) ->
+reply_delegator(
+    #{trace_metadata := TraceMetadata} = Context,
+    ReplyFunAndArgs,
+    Result0
+) ->
     spawn(fun() ->
+        logger:set_process_metadata(TraceMetadata),
         Result = transform_result(Result0),
+        logger:unset_process_metadata(),
         maybe_retry(Result, Context, ReplyFunAndArgs)
     end).
 
@@ -831,9 +877,9 @@ transform_result(Result) ->
         {error, _Reason} ->
             Result;
         {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            Result;
+            maybe_trace_format_result(Result);
         {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
-            Result;
+            maybe_trace_format_result(Result);
         {ok, _TooManyRequests = StatusCode = 429, Headers} ->
             {error, {recoverable_error, #{status_code => StatusCode, headers => Headers}}};
         {ok, _ServiceUnavailable = StatusCode = 503, Headers} ->

+ 18 - 2
apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl

@@ -125,11 +125,27 @@ on_query(
                 redis_bridge_connector_send_done,
                 #{instance_id => InstId, cmd => Cmd, batch => false, mode => sync, result => Result}
             ),
-            Result;
+            maybe_trace_format_result(Result);
         Error ->
             Error
     end.
 
+maybe_trace_format_result(Res) ->
+    %% If rule tracing is active, then we know that the connector is used by an
+    %% action and that the result is used for tracing only. This is why we can
+    %% add a function to lazily format the trace entry.
+    case emqx_trace:is_rule_trace_active() of
+        true ->
+            {ok, {fun trace_format_result/1, Res}};
+        false ->
+            Res
+    end.
+
+trace_format_result({ok, Msg}) ->
+    #{result => ok, message => Msg};
+trace_format_result(Res) ->
+    Res.
+
 on_batch_query(
     InstId, BatchData, _State = #{channels := Channels, conn_st := RedisConnSt}
 ) ->
@@ -156,7 +172,7 @@ on_batch_query(
                     result => Result
                 }
             ),
-            Result;
+            maybe_trace_format_result(Result);
         Error ->
             Error
     end.

+ 1 - 1
apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl

@@ -320,7 +320,7 @@ run_simple_upload(
     emqx_trace:rendered_action_template(ChannelID, #{
         bucket => Bucket,
         key => Key,
-        content => Content
+        content => {fun unicode:characters_to_binary/1, Content}
     }),
     case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
         ok ->

+ 17 - 1
apps/emqx_postgresql/src/emqx_postgresql.erl

@@ -693,6 +693,11 @@ handle_result({error, Error}) ->
     TranslatedError = translate_to_log_context(Error),
     {error, {unrecoverable_error, export_error(TranslatedError)}};
 handle_result(Res) ->
+    maybe_trace_format_result(Res).
+
+trace_format_result({ok, Cnt}) when is_integer(Cnt) ->
+    #{result => ok, affected_rows => Cnt};
+trace_format_result(Res) ->
     Res.
 
 handle_batch_result([{ok, Count} | Rest], Acc) ->
@@ -701,7 +706,18 @@ handle_batch_result([{error, Error} | _Rest], _Acc) ->
     TranslatedError = translate_to_log_context(Error),
     {error, {unrecoverable_error, export_error(TranslatedError)}};
 handle_batch_result([], Acc) ->
-    {ok, Acc}.
+    maybe_trace_format_result({ok, Acc}).
+
+maybe_trace_format_result(Res) ->
+    %% If rule tracing is active, then we know that the connector is used by an
+    %% action and that the result is used for tracing only. This is why we can
+    %% add a function to lazily format the trace entry.
+    case emqx_trace:is_rule_trace_active() of
+        true ->
+            {ok, {fun trace_format_result/1, Res}};
+        false ->
+            Res
+    end.
 
 translate_to_log_context({error, Reason}) ->
     translate_to_log_context(Reason);

+ 29 - 6
apps/emqx_rule_engine/src/emqx_rule_actions.erl

@@ -104,6 +104,20 @@ pre_process_action_args(_, Args) ->
 %%--------------------------------------------------------------------
 -spec console(map(), map(), map()) -> any().
 console(Selected, #{metadata := #{rule_id := RuleId}} = Envs, _Args) ->
+    case logger:get_process_metadata() of
+        #{action_id := ActionID} ->
+            emqx_trace:rendered_action_template(
+                ActionID,
+                #{
+                    selected => Selected,
+                    environment => Envs
+                }
+            );
+        _ ->
+            %% We may not have an action ID in the metadata if this is called
+            %% from a test case or similar
+            ok
+    end,
     ?ULOG(
         "[rule action] ~ts~n"
         "\tAction Data: ~p~n"
@@ -149,15 +163,24 @@ republish(
     PubProps0 = render_pub_props(UserPropertiesTemplate, Selected, Env),
     MQTTProps = render_mqtt_properties(MQTTPropertiesTemplate, Selected, Env),
     PubProps = maps:merge(PubProps0, MQTTProps),
+    TraceInfo = #{
+        flags => Flags,
+        topic => Topic,
+        payload => Payload,
+        pub_props => PubProps
+    },
+    case logger:get_process_metadata() of
+        #{action_id := ActionID} ->
+            emqx_trace:rendered_action_template(ActionID, TraceInfo);
+        _ ->
+            %% We may not have an action ID in the metadata if this is called
+            %% from a test case or similar
+            ok
+    end,
     ?TRACE(
         "RULE",
         "republish_message",
-        #{
-            flags => Flags,
-            topic => Topic,
-            payload => Payload,
-            pub_props => PubProps
-        }
+        TraceInfo
     ),
     safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps).
 

+ 21 - 1
apps/emqx_rule_engine/src/emqx_rule_runtime.erl

@@ -415,6 +415,15 @@ handle_action(RuleId, ActId, Selected, Envs) ->
                 rule_metrics, RuleId, 'actions.failed.out_of_service'
             ),
             trace_action(ActId, "out_of_service", #{}, warning);
+        error:?EMQX_TRACE_STOP_ACTION_MATCH = Reason ->
+            ?EMQX_TRACE_STOP_ACTION(Explanation) = Reason,
+            trace_action(
+                ActId,
+                "action_stopped_after_template_rendering",
+                #{reason => Explanation}
+            ),
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
+            emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
         Err:Reason:ST ->
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
             ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
@@ -475,7 +484,18 @@ do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) -
     trace_action(Action, "call_action_function"),
     %% the function can also throw 'out_of_service'
     Args = maps:get(args, Action, []),
-    Result = Mod:Func(Selected, Envs, Args),
+    PrevProcessMetadata =
+        case logger:get_process_metadata() of
+            undefined -> #{};
+            D -> D
+        end,
+    Result =
+        try
+            logger:update_process_metadata(#{action_id => Action}),
+            Mod:Func(Selected, Envs, Args)
+        after
+            logger:set_process_metadata(PrevProcessMetadata)
+        end,
     {_, IncCtx} = do_handle_action_get_trace_inc_metrics_context(RuleId, Action),
     inc_action_metrics(IncCtx, Result),
     Result.