Просмотр исходного кода

fix: enrich otel end2end trace attrs

JimMoen 1 год назад
Родитель
Сommit
c22636d776

+ 29 - 32
apps/emqx/src/emqx_broker.erl

@@ -340,14 +340,19 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
 %% Route
 %%--------------------------------------------------------------------
 
-route(Routes, Delivery, PersistRes) ->
+route(Routes, Delivery = #delivery{message = Msg}, PersistRes) ->
+    TraceRouteAttrs = #{
+        'route.from' => node(),
+        'route.matched_result' => emqx_utils_json:encode([
+            #{node => Node, route => TF}
+         || {TF, Node} <- Routes
+        ])
+    },
     emqx_external_trace:msg_route(
         Delivery,
-        #{
-            'message.route.msg_from_node' => node(),
-            'message.route.msgid' => emqx_guid:to_hexstr(emqx_message:id(Delivery#delivery.message))
-        },
+        TraceRouteAttrs,
         fun(DeliveryWithTrace) ->
+            add_route_attrs(Msg),
             do_route(Routes, DeliveryWithTrace, PersistRes)
         end
     ).
@@ -357,7 +362,10 @@ route(Routes, Delivery, PersistRes) ->
 do_route([], #delivery{message = Msg}, _PersistRes = []) ->
     ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
     ok = inc_dropped_cnt(Msg),
-    ok = add_route_attrs(Msg),
+    ?ext_trace_add_attrs(#{
+        'route.dropped.node' => node(),
+        'route.dropped.reason' => no_subscribers
+    }),
     [];
 do_route([], _Delivery, PersistRes = [_ | _]) ->
     PersistRes;
@@ -399,17 +407,15 @@ aggre([], true, Acc) ->
 do_forward_external(Delivery, RouteRes) ->
     emqx_external_broker:forward(Delivery) ++ RouteRes.
 
-forward(Node, To, Delivery, RpcMode) ->
+forward(Node, To, Delivery = #delivery{message = Msg}, RpcMode) ->
+    ForwardAttrs = #{
+        'forward.from' => node(),
+        'forward.to' => Node
+    },
+    MsgAttrs = emqx_external_trace:msg_attrs(Msg),
     emqx_external_trace:msg_forward(
         Delivery,
-        #{
-            'message.forward.to_topic' => To,
-            'message.forward.mode' => RpcMode,
-            'message.forward.node.from' => node(),
-            'message.forward.node.to' => Node
-            %% TODO:
-            %% 'message.forward.sender' => pid_to_clientid(Delivery#delivery.sender)
-        },
+        maps:merge(ForwardAttrs, MsgAttrs),
         fun(DeliveryWithTrace) ->
             do_forward(Node, To, DeliveryWithTrace, RpcMode)
         end
@@ -444,22 +450,19 @@ do_forward(Node, To, Delivery, sync) ->
 %% Handle message forwarding form remote nodes by
 %% `emqx_broker_proto_v1:forward/3` or
 %% `emqx_broker_proto_v1:forward_async/3`
-dispatch(Topic, Delivery) ->
+dispatch(Topic, Delivery = #delivery{message = Msg}) ->
     emqx_external_trace:msg_handle_forward(
         Delivery,
-        #{},
+        emqx_external_trace:msg_attrs(Msg),
         fun(DeliveryWithTrace) ->
             dispatch_local(Topic, DeliveryWithTrace)
         end
     ).
 
-dispatch_local(Topic, Delivery) ->
+dispatch_local(Topic, Delivery = #delivery{message = Msg}) ->
     emqx_external_trace:msg_dispatch(
         Delivery,
-        #{
-            'message.dispatch.from' => pid_to_binary(Delivery#delivery.sender),
-            'message.dispatch.to_topic' => Topic
-        },
+        emqx_external_trace:msg_attrs(Msg),
         fun(DeliveryWithTrace) ->
             do_dispatch(Topic, DeliveryWithTrace)
         end
@@ -491,14 +494,12 @@ inc_dropped_cnt(Msg) ->
 
 -compile({inline, [add_route_attrs/1]}).
 add_route_attrs(Msg) ->
+    %% TODO? : maybe a switch for sys messages
     case emqx_message:is_sys(Msg) of
         true ->
             ok;
         false ->
-            ?ext_trace_add_attrs(#{
-                'message.route.dropped.node' => node(),
-                'message.route.dropped.reason' => no_subscribers
-            }),
+            ?ext_trace_add_attrs(emqx_external_trace:msg_attrs(Msg)),
             ok
     end.
 
@@ -745,7 +746,8 @@ do_dispatch2(SubPid, Topic, Msg) when is_pid(SubPid) ->
         true ->
             SubPid ! {deliver, Topic, Msg},
             ?ext_trace_add_attrs(#{
-                'message.dispatch.to_subscriber' => pid_to_binary(SubPid)
+                'dispatch.to_clientid' => maps:get(subid, get_subopts(SubPid, Topic), undefined),
+                'dispatch.to_topic' => Topic
             }),
             1;
         false ->
@@ -807,8 +809,3 @@ regular_sync_route(add, Topic) ->
     emqx_router:do_add_route(Topic, node());
 regular_sync_route(delete, Topic) ->
     emqx_router:do_delete_route(Topic, node()).
-
-pid_to_binary(Pid) when is_pid(Pid) ->
-    iolist_to_binary(pid_to_list(Pid));
-pid_to_binary(_) ->
-    <<>>.

+ 108 - 111
apps/emqx/src/emqx_channel.erl

@@ -363,7 +363,7 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
 handle_in(?PACKET(?CONNECT) = Packet, Channel) ->
     emqx_external_trace:client_connect(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        connect_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) ->
             process_connect(PacketWithTrace, Channel)
         end
@@ -426,8 +426,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
         ok ->
             emqx_external_trace:client_publish(
                 Packet,
-                %% More info can be added in future, but for now only clientid is used
-                init_trace_attrs(Packet, Channel),
+                basic_trace_attrs(Channel),
                 fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
             );
         {error, ReasonCode} ->
@@ -439,8 +438,7 @@ handle_in(
 ) ->
     emqx_external_trace:client_puback(
         Packet,
-        %% More info can be added in future, but for now only clientid is used
-        init_trace_attrs(Packet, Channel),
+        basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_puback(PacketWithTrace, Channel) end
     );
 handle_in(
@@ -449,8 +447,7 @@ handle_in(
 ) ->
     emqx_external_trace:client_pubrec(
         Packet,
-        %% More info can be added in future, but for now only clientid is used
-        init_trace_attrs(Packet, Channel),
+        basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_pubrec(PacketWithTrace, Channel) end
     );
 handle_in(
@@ -459,8 +456,7 @@ handle_in(
 ) ->
     emqx_external_trace:client_pubrel(
         Packet,
-        %% More info can be added in future, but for now only clientid is used
-        init_trace_attrs(Packet, Channel),
+        basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_pubrel(PacketWithTrace, Channel) end
     );
 handle_in(
@@ -469,14 +465,15 @@ handle_in(
 ) ->
     emqx_external_trace:client_pubcomp(
         Packet,
-        %% More info can be added in future, but for now only clientid is used
-        init_trace_attrs(Packet, Channel),
+        basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_pubcomp(PacketWithTrace, Channel) end
     );
 handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Channel) ->
     emqx_external_trace:client_subscribe(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        (basic_trace_attrs(Channel))#{
+            'client.subscribe.topics' => serialize_topic_filters(Packet)
+        },
         fun(PacketWithTrace) -> process_subscribe(PacketWithTrace, Channel) end
     );
 handle_in(
@@ -485,7 +482,9 @@ handle_in(
 ) ->
     emqx_external_trace:client_unsubscribe(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        (basic_trace_attrs(Channel))#{
+            'client.unsubscribe.topics' => serialize_topic_filters(Packet)
+        },
         fun(PacketWithTrace) -> process_unsubscribe(PacketWithTrace, Channel) end
     );
 handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
@@ -493,12 +492,16 @@ handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
     NChannel = Channel#channel{keepalive = NKeepalive},
     {ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)};
 handle_in(
-    ?DISCONNECT_PACKET() = Packet,
+    ?PACKET(?DISCONNECT, PktVar) = Packet,
     Channel
 ) ->
     emqx_external_trace:client_disconnect(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        (basic_trace_attrs(Channel))#{
+            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
+            'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
+            'client.disconnect.reason_code' => emqx_packet:info(reason_code, PktVar)
+        },
         fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end
     );
 handle_in(?AUTH_PACKET(), Channel) ->
@@ -598,6 +601,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
     of
         {ok, NPacket, NChannel} ->
             Msg = packet_to_message(NPacket, NChannel),
+            ?ext_trace_add_attrs(emqx_external_trace:msg_attrs(Msg)),
             do_publish(PacketId, Msg, NChannel);
         {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
             ?SLOG_THROTTLE(
@@ -1098,7 +1102,7 @@ handle_deliver(
 handle_deliver(Delivers, Channel) ->
     Delivers1 = emqx_external_trace:msg_deliver(
         Delivers,
-        init_trace_attrs(?DELIVER_TRACE_ATTRS, Channel)
+        basic_trace_attrs(Channel)
     ),
     do_handle_deliver(Delivers1, Channel).
 
@@ -1244,28 +1248,28 @@ handle_out(puback, {PacketId, ReasonCode}, Channel) ->
     {ok,
         start_outgoing_trace(
             ?PUBACK_PACKET(PacketId, ReasonCode),
-            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+            basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
     {ok,
         start_outgoing_trace(
             ?PUBREC_PACKET(PacketId, ReasonCode),
-            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+            basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
     {ok,
         start_outgoing_trace(
             ?PUBREL_PACKET(PacketId, ReasonCode),
-            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+            basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
     {ok,
         start_outgoing_trace(
             ?PUBCOMP_PACKET(PacketId, ReasonCode),
-            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+            basic_trace_attrs(Channel)
         ),
         Channel};
 handle_out(suback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
@@ -1720,66 +1724,57 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
     emqx_olp:backoff(Zone),
     ok.
 
-%% Typically it's the init trace attrs
-%% And it's designed for indexing the trace events
-%% It is designed to be used to index all related traces
-%% that have __different TraceIDs__(OF COURSE DIFFERENT).
-%% TODO?: implement as init trace attrs callback
-%% More info can be added in future, but for now only clientid is used
-
-init_trace_attrs(
-    ?PACKET(?CONNECT, PktVar),
+%% Client Channel info not be available before `process_connect/2`
+%% The initial attrs should be extracted from packet and update them during `process_connect/2`
+connect_trace_attrs(
+    ?PACKET(?CONNECT, #mqtt_packet_connect{
+        proto_name = ProtoName,
+        proto_ver = ProtoVer,
+        is_bridge = IsBridge,
+        clean_start = CleanStart,
+        will_flag = WillFlag,
+        will_qos = WillQos,
+        will_retain = WillRetain,
+        keepalive = KeepAlive,
+        properties = Properties,
+        clientid = ClientId,
+        will_props = WillProps,
+        will_topic = WillTopic,
+        will_payload = _,
+        username = Username,
+        password = _
+    }),
     Channel
 ) ->
-    %% TODO: more attrs
     #{
-        clientid => emqx_packet:info(clientid, PktVar),
-        username => emqx_packet:info(username, PktVar),
-        sockname => info(sockname, Channel),
-        peername => info(peername, Channel)
-    };
-init_trace_attrs(
-    ?PACKET(?DISCONNECT, PktVar),
-    Channel
-) ->
+        'client.clientid' => ClientId,
+        'client.username' => Username,
+        'client.proto_name' => ProtoName,
+        'client.proto_ver' => ProtoVer,
+        'client.is_bridge' => IsBridge,
+        'client.clean_start' => CleanStart,
+        'client.will_flag' => WillFlag,
+        'client.will_qos' => WillQos,
+        'client.will_retain' => WillRetain,
+        'client.keepalive' => KeepAlive,
+        'client.conn_props' => emqx_utils_json:encode(Properties),
+        'client.will_props' => emqx_utils_json:encode(WillProps),
+        'client.will_topic' => WillTopic,
+        'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
+        'client.peername' => emqx_utils:ntoa(info(peername, Channel))
+    }.
+
+basic_trace_attrs(Channel) ->
     #{
-        clientid => info(clientid, Channel),
-        peername => info(peername, Channel),
-        sockname => info(sockname, Channel),
-        reason_code => emqx_packet:info(reason_code, PktVar)
-    };
-init_trace_attrs(
-    ?PACKET(?PUBLISH, _PktVar),
-    Channel
-) ->
-    maps:from_list(info([clientid], Channel));
-init_trace_attrs(
-    ?PACKET(?PUBACK, _PktVar),
-    Channel
-) ->
-    maps:from_list(info([clientid], Channel));
-init_trace_attrs(
-    ?PACKET(?SUBSCRIBE, _PktVar),
-    Channel
-) ->
-    maps:from_list(info([clientid], Channel));
-init_trace_attrs(
-    ?PACKET(?UNSUBSCRIBE, _PktVar),
-    Channel
-) ->
-    maps:from_list(info([clientid], Channel));
-init_trace_attrs(
-    ?DELIVER_TRACE_ATTRS,
-    Channel
-) ->
-    maps:from_list(info([clientid], Channel));
-init_trace_attrs(
-    ?OUTGOING_TRACE_ATTRS,
-    Channel
-) ->
-    maps:from_list(info([clientid, username], Channel));
-init_trace_attrs(_, _) ->
-    #{}.
+        'client.clientid' => info(clientid, Channel),
+        'client.username' => info(username, Channel)
+    }.
+
+serialize_topic_filters(?PACKET(?SUBSCRIBE, PktVar)) ->
+    TFs = [SubOpts#{topic => Name} || {Name, SubOpts} <- emqx_packet:info(topic_filters, PktVar)],
+    emqx_utils_json:encode(TFs);
+serialize_topic_filters(?PACKET(?UNSUBSCRIBE, PktVar)) ->
+    emqx_utils_json:encode(emqx_packet:info(topic_filters, PktVar)).
 
 %%--------------------------------------------------------------------
 %% Enrich MQTT Connect Info
@@ -1912,7 +1907,9 @@ maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId}) when
 ->
     {ok, ClientInfo};
 maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
-    {ok, ClientInfo#{clientid => emqx_utils:rand_id(?RAND_CLIENTID_BYTES)}};
+    RandClientId = emqx_utils:rand_id(?RAND_CLIENTID_BYTES),
+    ?ext_trace_add_attrs(#{'client.clientid' => RandClientId}),
+    {ok, ClientInfo#{clientid => RandClientId}};
 maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
     {ok, ClientInfo#{clientid => ClientId}}.
 
@@ -2061,11 +2058,16 @@ authenticate(?PACKET(?AUTH) = Packet, Channel) ->
     %% TODO: extended authentication sub-span
     process_authenticate(Packet, Channel);
 authenticate(Packet, Channel) ->
+    %% Authenticate by CONNECT Packet
     emqx_external_trace:client_authn(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        #{
+            'client.clientid' => info(clientid, Channel),
+            'client.username' => info(username, Channel)
+        },
         fun(PacketWithTrace) ->
             Res = process_authenticate(PacketWithTrace, Channel),
+            %% TODO: which authenticator is used
             ?ext_trace_add_attrs(authn_attrs(Res)),
             Res
         end
@@ -2339,11 +2341,9 @@ authz_action(#message{qos = QoS}) ->
 check_pub_authz(Packet, Channel) ->
     emqx_external_trace:client_authz(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        (basic_trace_attrs(Channel))#{'authz.action_type' => publish},
         fun(PacketWithTrace) ->
-            Res = do_check_pub_authz(PacketWithTrace, Channel),
-            ?ext_trace_add_attrs(pub_authz_attrs(Res)),
-            Res
+            _Res = do_check_pub_authz(PacketWithTrace, Channel)
         end
     ).
 
@@ -2355,18 +2355,21 @@ do_check_pub_authz(
 ) ->
     Action = authz_action(Packet),
     case emqx_access_control:authorize(ClientInfo, Action, Topic) of
-        allow -> ok;
-        deny -> {error, ?RC_NOT_AUTHORIZED}
+        allow ->
+            ?ext_trace_add_attrs(#{
+                'authz.publish.topic' => Topic,
+                'authz.publish.result' => allow
+            }),
+            ok;
+        deny ->
+            ?ext_trace_add_attrs(#{
+                'authz.publish.topic' => Topic,
+                'authz.publish.result' => deny,
+                'authz.reason_code' => ?RC_NOT_AUTHORIZED
+            }),
+            {error, ?RC_NOT_AUTHORIZED}
     end.
 
-pub_authz_attrs(ok) ->
-    #{'client.publish.authz.result' => success};
-pub_authz_attrs({error, RC}) ->
-    #{
-        'client.publish.authz.result' => failure,
-        'client.publish.authz.reason' => emqx_reason_codes:name(RC)
-    }.
-
 %%--------------------------------------------------------------------
 %% Check Pub Caps
 
@@ -2397,11 +2400,9 @@ check_subscribe(SubPkt, _Channel) ->
 check_sub_authzs(Packet, Channel) ->
     emqx_external_trace:client_authz(
         Packet,
-        init_trace_attrs(Packet, Channel),
+        (basic_trace_attrs(Channel))#{'authz.action_type' => subscribe},
         fun(PacketWithTrace) ->
-            Res = do_check_sub_authzs(PacketWithTrace, Channel),
-            ?ext_trace_add_attrs(sub_authz_attrs(Res)),
-            Res
+            _Res = do_check_sub_authzs(PacketWithTrace, Channel)
         end
     ).
 
@@ -2419,8 +2420,16 @@ do_check_sub_authzs(
     DenyAction = emqx:get_config([authorization, deny_action], ignore),
     case DenyAction =:= disconnect andalso HasAuthzDeny of
         true ->
+            ?ext_trace_add_attrs(#{
+                'authz.deny_action' => disconnect,
+                'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
+            }),
             {error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel};
         false ->
+            ?ext_trace_add_attrs(#{
+                'authz.deny_action' => ignore,
+                'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
+            }),
             {ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}
     end.
 
@@ -2430,7 +2439,7 @@ do_check_sub_authzs2(TopicFilters, ClientInfo) ->
 do_check_sub_authzs2(_ClientInfo, [], Acc) ->
     lists:reverse(Acc);
 do_check_sub_authzs2(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc) ->
-    %% subsclibe authz check only cares the real topic filter when shared-sub
+    %% subscribe authz check only cares the real topic filter when shared-sub
     %% e.g. only check <<"t/#">> for <<"$share/g/t/#">>
     Action = authz_action(TopicFilter),
     case
@@ -2450,21 +2459,9 @@ do_check_sub_authzs2(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc)
             do_check_sub_authzs2(ClientInfo, More, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
     end.
 
-sub_authz_attrs({error, {disconnect, RC}, _Channel}) ->
-    #{
-        'client.subscribe.authz.results' => denied,
-        'client.subscribe.authz.deny_action' => disconnect,
-        'client.subscribe.authz.reason_code' => emqx_reason_codes:name(RC)
-    };
-sub_authz_attrs({ok, ?SUBSCRIBE_PACKET(_PacketId, _SubProps, CheckResult), _Channel}) ->
-    #{
-        'client.subscribe.authz.deny_action' => ignore,
-        'client.subscribe.authz.result' => result(CheckResult)
-    }.
-
-result(CheckResult) ->
+trace_authz_result_attrs(CheckResult) ->
     emqx_utils_json:encode([
-        #{topic_filter => TopicFilter, reason_code => RC}
+        #{topic => TopicFilter, reason_code => RC}
      || {{TopicFilter, _SubOpts}, RC} <- CheckResult
     ]).
 

+ 25 - 1
apps/emqx/src/emqx_external_trace.erl

@@ -16,6 +16,7 @@
 -module(emqx_external_trace).
 
 -include("emqx_external_trace.hrl").
+-include_lib("emqx_utils/include/emqx_message.hrl").
 
 %% --------------------------------------------------------------------
 %% Trace in Rich mode callbacks
@@ -174,7 +175,8 @@ when
 
 -export([
     add_span_attrs/1,
-    add_span_event/2
+    add_span_event/2,
+    msg_attrs/1
 ]).
 
 -export([
@@ -196,6 +198,15 @@ when
     end
 ).
 
+-define(with_provider_attrs(IfRegistered),
+    case persistent_term:get(?PROVIDER, undefined) of
+        undefined ->
+            #{};
+        _Provider ->
+            IfRegistered
+    end
+).
+
 %%--------------------------------------------------------------------
 %% provider API
 %%--------------------------------------------------------------------
@@ -416,6 +427,19 @@ add_span_event(EventName, AttrsOrMeta) ->
     _ = catch ?with_provider(?FUNCTION_NAME(EventName, AttrsOrMeta), ok),
     ok.
 
+msg_attrs(Msg = #message{}) ->
+    ?with_provider_attrs(#{
+        'message.msgid' => emqx_guid:to_hexstr(Msg#message.id),
+        'message.qos' => Msg#message.qos,
+        'message.from' => Msg#message.from,
+        'message.topic' => Msg#message.topic,
+        'message.retain' => maps:get(retain, Msg#message.flags, false),
+        'message.pub_props' => emqx_utils_json:encode(
+            maps:get(properties, Msg#message.headers, #{})
+        ),
+        'message.payload_size' => size(Msg#message.payload)
+    }).
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 24 - 119
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -139,9 +139,7 @@ client_connect(Packet, Attrs, ProcessFun) ->
         RootCtx,
         ?current_tracer,
         ?CLIENT_CONNECT_SPAN_NAME,
-        #{
-            attributes => gen_attrs(Packet, Attrs)
-        }
+        #{attributes => Attrs}
     ),
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
     _ = otel_ctx:attach(Ctx),
@@ -168,9 +166,7 @@ client_disconnect(Packet, Attrs, ProcessFun) ->
         RootCtx,
         ?current_tracer,
         ?CLIENT_DISCONNECT_SPAN_NAME,
-        #{
-            attributes => gen_attrs(Packet, Attrs)
-        }
+        #{attributes => Attrs}
     ),
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
     _ = otel_ctx:attach(Ctx),
@@ -197,9 +193,7 @@ client_subscribe(Packet, Attrs, ProcessFun) ->
         RootCtx,
         ?current_tracer,
         ?CLIENT_SUBSCRIBE_SPAN_NAME,
-        #{
-            attributes => gen_attrs(Packet, Attrs)
-        }
+        #{attributes => Attrs}
     ),
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
     _ = otel_ctx:attach(Ctx),
@@ -226,9 +220,7 @@ client_unsubscribe(Packet, Attrs, ProcessFun) ->
         RootCtx,
         ?current_tracer,
         ?CLIENT_UNSUBSCRIBE_SPAN_NAME,
-        #{
-            attributes => gen_attrs(Packet, Attrs)
-        }
+        #{attributes => Attrs}
     ),
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
     _ = otel_ctx:attach(Ctx),
@@ -249,16 +241,12 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-client_authn(Packet, _Attrs, ProcessFun) ->
+client_authn(Packet, Attrs, ProcessFun) ->
     ?with_span(
         ?CLIENT_AUTHN_SPAN_NAME,
-        #{attributes => #{}},
+        #{attributes => Attrs},
         fun(_SpanCtx) ->
             ProcessFun(Packet)
-        %% TODO: add more attributes about: which authenticator resulted:
-        %% ignore|anonymous|ok|error
-        %% case ProcessFun(Packet) of
-        %%     xx -> xx,
         end
     ).
 
@@ -272,10 +260,10 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-client_authz(Packet, _Attrs, ProcessFun) ->
+client_authz(Packet, Attrs, ProcessFun) ->
     ?with_span(
         ?CLIENT_AUTHZ_SPAN_NAME,
-        #{attributes => #{}},
+        #{attributes => Attrs},
         fun(_SpanCtx) ->
             ProcessFun(Packet)
         %% TODO: add more attributes about: which authorizer resulted:
@@ -302,7 +290,7 @@ client_publish(Packet, Attrs, ProcessFun) ->
         RootCtx,
         ?current_tracer,
         ?CLIENT_PUBLISH_SPAN_NAME,
-        #{attributes => gen_attrs(Packet, Attrs)}
+        #{attributes => Attrs}
     ),
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
     %% Otel attach for next spans (client_authz, msg_route... etc )
@@ -406,7 +394,7 @@ msg_route(Delivery, Attrs, Fun) ->
         false ->
             ?with_span(
                 ?MSG_ROUTE_SPAN_NAME,
-                #{attributes => gen_attrs(Delivery, Attrs)},
+                #{attributes => Attrs},
                 fun(_SpanCtx) ->
                     Fun(put_ctx(otel_ctx:get_current(), Delivery))
                 end
@@ -455,7 +443,7 @@ msg_forward(Delivery, Attrs, Fun) ->
         false ->
             ?with_span(
                 ?MSG_FORWARD_SPAN_NAME,
-                #{attributes => gen_attrs(Delivery, Attrs)},
+                #{attributes => Attrs},
                 fun(_SpanCtx) ->
                     Fun(put_ctx(otel_ctx:get_current(), Delivery))
                 end
@@ -480,7 +468,7 @@ msg_handle_forward(Delivery, Attrs, Fun) ->
             otel_ctx:attach(get_ctx(Delivery)),
             ?with_span(
                 ?MSG_HANDLE_FORWARD_SPAN_NAME,
-                #{attributes => gen_attrs(Delivery, Attrs)},
+                #{attributes => Attrs},
                 fun(_SpanCtx) ->
                     Fun(put_ctx(otel_ctx:get_current(), Delivery))
                 end
@@ -509,7 +497,7 @@ msg_deliver(Delivers, Attrs) ->
                         Ctx,
                         ?current_tracer,
                         ?MSG_DELIVER_SPAN_NAME,
-                        #{attributes => gen_attrs(Msg, Attrs)}
+                        #{attributes => maps:merge(Attrs, emqx_external_trace:msg_attrs(Msg))}
                     ),
                     NCtx = otel_tracer:set_current_span(Ctx, SpanCtx),
                     NMsg = put_ctx(NCtx, Msg),
@@ -566,7 +554,7 @@ start_outgoing_trace(Packet, Attrs, ParentCtx) ->
         ParentCtx,
         ?current_tracer,
         outgoing_span_name(Packet),
-        #{attributes => gen_attrs(Packet, Attrs)}
+        #{attributes => Attrs}
     ),
     NCtx = otel_tracer:set_current_span(ParentCtx, SpanCtx),
     _NPacketWithCtx = put_ctx(NCtx, Packet).
@@ -646,18 +634,19 @@ start_pending_trace(PendingType, PacketId, ParentCtx) ->
     _ = attach_internal_ctx(PendingCtxKey, NCtx),
     ok.
 
-client_incoming(?PACKET(PendingType, PktVar) = Packet, _Attrs, ProcessFun) ->
+client_incoming(?PACKET(PendingType, PktVar) = Packet, Attrs, ProcessFun) ->
     try
         ProcessFun(Packet)
     after
         end_pending_client_packet(
-            internal_extra_key(PendingType, PktVar)
+            internal_extra_key(PendingType, PktVar), Attrs
         )
     end.
 
-end_pending_client_packet(PendingCtxKey) ->
+end_pending_client_packet(PendingCtxKey, Attrs) ->
     case detach_internal_ctx(PendingCtxKey) of
         Ctx when is_map(Ctx) ->
+            ok = add_span_attrs(Attrs, Ctx),
             _ = end_span(Ctx),
             earse_internal_ctx(PendingCtxKey),
             ok;
@@ -752,6 +741,12 @@ add_span_attrs(Attrs) ->
     true = ?set_attributes(Attrs),
     ok.
 
+add_span_attrs(Attrs, Ctx) ->
+    CurrentSpanCtx = otel_tracer:current_span_ctx(Ctx),
+    otel_span:set_attributes(CurrentSpanCtx, Attrs),
+    ok.
+
+%% TODO: remove me
 -spec add_span_event(EventName, Attrs) -> ok when
     EventName :: event_name(),
     Attrs :: attrs() | attrs_meta().
@@ -778,96 +773,6 @@ ignore_delivery(#delivery{message = #message{topic = Topic}}) ->
 is_sys(<<"$SYS/", _/binary>> = _Topic) -> true;
 is_sys(_Topic) -> false.
 
-%% TODO: refactor to use raw `Attrs` or `AttrsMeta`
-%% FIXME:
-%% function_clause for DISCONNECT packet
-%% XXX:
-%% emqx_packet:info/2 as utils
-
-%% gen_attrs(Packet, Attrs) ->
-gen_attrs(
-    ?PACKET(?CONNECT, PktVar),
-    #{
-        clientid := ClientId,
-        username := Username
-    } = _InitAttrs
-) ->
-    #{
-        'client.connect.clientid' => ClientId,
-        'client.connect.username' => Username,
-        'client.connect.proto_name' => emqx_packet:info(proto_name, PktVar),
-        'client.connect.proto_ver' => emqx_packet:info(proto_ver, PktVar)
-    };
-gen_attrs(?PACKET(?DISCONNECT, PktVar), InitAttrs) ->
-    #{
-        'client.disconnect.clientid' => maps:get(clientid, InitAttrs, undefined),
-        'client.disconnect.username' => maps:get(username, InitAttrs, undefined),
-        'client.disconnect.peername' => maps:get(peername, InitAttrs, undefined),
-        'client.disconnect.sockname' => maps:get(sockname, InitAttrs, undefined),
-        'client.disconnect.reason_code' => emqx_packet:info(reason_code, PktVar)
-    };
-gen_attrs(
-    ?PACKET(?SUBSCRIBE) = Packet,
-    #{clientid := ClientId} = _Attrs
-) ->
-    #{
-        'client.subscribe.clientid' => ClientId,
-        'client.subscribe.topic_filters' => serialize_topic_filters(Packet)
-    };
-gen_attrs(
-    ?PACKET(?UNSUBSCRIBE) = Packet,
-    #{clientid := ClientId} = _Attrs
-) ->
-    #{
-        'client.unsubscribe.clientid' => ClientId,
-        'client.unsubscribe.topic_filters' => serialize_topic_filters(Packet)
-    };
-gen_attrs(
-    ?PACKET(?PUBLISH, PktVar) =
-        #mqtt_packet{
-            header = #mqtt_packet_header{qos = QoS}
-        },
-    Attrs
-) ->
-    #{
-        'message.publish.to_topic' => emqx_packet:info(topic_name, PktVar),
-        %% msgid havn't generated here
-        'message.publish.qos' => QoS,
-        'message.publish.from' => maps:get(clientid, Attrs, undefined)
-    };
-gen_attrs(?PACKET(?PUBACK, PktVar), Attrs) ->
-    #{
-        'emqx.puback.reason_code' => emqx_packet:info(reason_code, PktVar),
-        'emqx.puback.to_clientid' => maps:get(clientid, Attrs, undefined),
-        'emqx.puback.to_username' => maps:get(username, Attrs, undefined)
-    };
-gen_attrs(#message{} = Msg, Attrs) ->
-    #{
-        %% XXX: maybe use `to_topic_filter` as the subscribed
-        'message.deliver.to_topic' => emqx_message:topic(Msg),
-        'message.deliver.from' => emqx_message:from(Msg),
-        %% Use HEX msgid as rule engine style
-        'message.deliver.msgid' => emqx_guid:to_hexstr(emqx_message:id(Msg)),
-        'message.deliver.qos' => emqx_message:qos(Msg),
-        'message.deliver.to' => maps:get(clientid, Attrs, undefined)
-    };
-gen_attrs(#delivery{}, Attrs) ->
-    Attrs;
-gen_attrs(_, Attrs) when is_map(Attrs) ->
-    Attrs;
-gen_attrs(_, InvalidAttrs) ->
-    ?SLOG(warning, #{
-        msg => "invalid_attributes",
-        attrs => InvalidAttrs
-    }),
-    #{}.
-
-serialize_topic_filters(?PACKET(?SUBSCRIBE, PktVar)) ->
-    TFs = [Name || {Name, _SubOpts} <- emqx_packet:info(topic_filters, PktVar)],
-    emqx_utils_json:encode(TFs);
-serialize_topic_filters(?PACKET(?UNSUBSCRIBE, PktVar)) ->
-    emqx_utils_json:encode(emqx_packet:info(topic_filters, PktVar)).
-
 %% ====================
 %% Trace Context in message
 put_ctx(