Преглед изворни кода

fix: attach attrs in packet/message/dictionary when starting span

JimMoen пре 1 година
родитељ
комит
e06b1e3c62
2 измењених фајлова са 66 додато и 43 уклоњено
  1. 0 1
      apps/emqx/src/emqx_broker.erl
  2. 66 42
      apps/emqx_opentelemetry/src/emqx_otel_trace.erl

+ 0 - 1
apps/emqx/src/emqx_broker.erl

@@ -394,7 +394,6 @@ do_route2({To, Node}, Delivery) when Node =:= node() ->
 do_route2({To, Node}, Delivery) when is_atom(Node) ->
     {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
 do_route2({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
-    %% TODO: trace shared-sub dispatch
     {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
 
 aggre([]) ->

+ 66 - 42
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -70,7 +70,7 @@
 -include_lib("opentelemetry_api/include/otel_tracer.hrl").
 -include_lib("opentelemetry_api/include/opentelemetry.hrl").
 
--define(EMQX_OTEL_CTX, otel_ctx).
+-define(EMQX_OTEL_CTX, emqx_otel_ctx).
 -define(EMQX_OTEL_ATTRS, emqx_otel_attrs).
 
 %%--------------------------------------------------------------------
@@ -299,7 +299,7 @@ client_publish(Packet, Attrs, ProcessFun) ->
     %% Otel attach for next spans (client_authz, msg_route... etc )
     _ = otel_ctx:attach(Ctx),
     %% Attach in process dictionary for outgoing/awaiting packets
-    attach_outgoing(Packet, Ctx),
+    attach_outgoing(Packet, Ctx, Attrs),
     try
         ProcessFun(Packet)
     after
@@ -309,14 +309,20 @@ client_publish(Packet, Attrs, ProcessFun) ->
     end.
 
 %% TODO: CONNACK, AUTH, SUBACK, UNSUBACK, DISCONNECT
--compile({inline, [attach_outgoing/2, erase_outgoing/1]}).
-attach_outgoing(?PUBLISH_PACKET(?QOS_0), _Ctx) ->
+-compile({inline, [attach_outgoing/3, erase_outgoing/1]}).
+attach_outgoing(?PUBLISH_PACKET(?QOS_0), _Ctx, _Attrs) ->
     ok;
-attach_outgoing(?PUBLISH_PACKET(?QOS_1, PacketId), Ctx) ->
-    attach_internal_ctx({?PUBACK, PacketId}, Ctx),
+attach_outgoing(?PUBLISH_PACKET(?QOS_1, PacketId), Ctx, Attrs) ->
+    attach_internal_ctx(
+        {?PUBACK, PacketId},
+        #{?EMQX_OTEL_CTX => Ctx, ?EMQX_OTEL_ATTRS => Attrs}
+    ),
     ok;
-attach_outgoing(?PUBLISH_PACKET(?QOS_2, PacketId), Ctx) ->
-    attach_internal_ctx({?PUBREC, PacketId}, Ctx),
+attach_outgoing(?PUBLISH_PACKET(?QOS_2, PacketId), Ctx, Attrs) ->
+    attach_internal_ctx(
+        {?PUBREC, PacketId},
+        #{?EMQX_OTEL_CTX => Ctx, ?EMQX_OTEL_ATTRS => Attrs}
+    ),
     ok.
 
 erase_outgoing(?PUBLISH_PACKET(?QOS_0)) ->
@@ -467,18 +473,17 @@ when
     Attrs :: attrs().
 broker_publish(Delivers, Attrs) ->
     lists:map(
-        fun({deliver, Topic, Msg} = Deliver) ->
-            case get_ctx(Msg) of
+        fun({deliver, Topic, Msg0} = Deliver) ->
+            case get_ctx(Msg0) of
                 Ctx when is_map(Ctx) ->
+                    NAttrs = maps:merge(Attrs, emqx_external_trace:msg_attrs(Msg0)),
                     SpanCtx = otel_tracer:start_span(
-                        Ctx,
-                        ?current_tracer,
-                        ?BROKER_PUBLISH_SPAN_NAME,
-                        #{attributes => maps:merge(Attrs, emqx_external_trace:msg_attrs(Msg))}
+                        Ctx, ?current_tracer, ?BROKER_PUBLISH_SPAN_NAME, #{attributes => NAttrs}
                     ),
                     NCtx = otel_tracer:set_current_span(Ctx, SpanCtx),
-                    NMsg = put_ctx(NCtx, Msg),
-                    {deliver, Topic, NMsg};
+                    NMsg = put_attrs(NAttrs, Msg0),
+                    Msg = put_ctx(NCtx, NMsg),
+                    {deliver, Topic, Msg};
                 _ ->
                     Deliver
             end
@@ -519,6 +524,8 @@ start_outgoing_trace(?PUBCOMP_PACKET(PacketId) = Packet, Attrs) ->
 %% ====================
 %% Broker -> Client(`Subscriber'):
 start_outgoing_trace(?PUBREL_PACKET(PacketId) = Packet, Attrs) ->
+    %% Previous awaiting was PUBREC
+    %% the awaiting PUBREL Span's parent is outgoing PUBREC Span
     start_outgoing_trace(Packet, Attrs, detach_internal_ctx({?PUBREC, PacketId}));
 %% The Incoming span is still being recorded and Ctx has not been erased
 %% when the following outgoing spans starting.
@@ -526,16 +533,26 @@ start_outgoing_trace(?PUBREL_PACKET(PacketId) = Packet, Attrs) ->
 start_outgoing_trace(Packet, Attrs) ->
     start_outgoing_trace(Packet, Attrs, otel_ctx:get_current()).
 
-start_outgoing_trace(Packet, Attrs, ParentCtx) ->
+start_outgoing_trace(Packet, Attrs, #{
+    ?EMQX_OTEL_CTX := ParentCtx, ?EMQX_OTEL_ATTRS := AttachedAttrs
+}) ->
+    start_outgoing_trace(Packet, Attrs, ParentCtx, AttachedAttrs).
+
+start_outgoing_trace(
+    Packet,
+    Attrs,
+    ParentCtx,
+    AttachedAttrs
+) ->
     SpanCtx = otel_tracer:start_span(
         ParentCtx,
         ?current_tracer,
         outgoing_span_name(Packet),
-        #{attributes => Attrs}
+        #{attributes => maps:merge(Attrs, AttachedAttrs)}
     ),
     NCtx = otel_tracer:set_current_span(ParentCtx, SpanCtx),
-    Packet1WithCtx = put_ctx(NCtx, Packet),
-    _NPacketWithAttrsg = put_attrs(Attrs, Packet1WithCtx).
+    NPacket = put_attrs(Attrs, Packet),
+    _PacketWithCtx = put_ctx(NCtx, NPacket).
 
 %% Ctx attached in Delivers in `broker_publish/2` and
 %% transformed to Packet when outgoing
@@ -570,7 +587,7 @@ end_span(_) ->
 outgoing_maybe_awaiting_next(?PACKET(?PUBACK)) ->
     %% PUBACK (QoS=1), Ignore
     ok;
-outgoing_maybe_awaiting_next(?PUBREC_PACKET(PacketId) = Packet) ->
+outgoing_maybe_awaiting_next(?PUBREC_PACKET(PacketId, _ReasonCode) = Packet) ->
     %% PUBREC (QoS=2), Awaiting PUBREL
     start_awaiting_trace(?PUBREL, PacketId, Packet);
 outgoing_maybe_awaiting_next(?PACKET(?PUBCOMP)) ->
@@ -587,7 +604,7 @@ outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_1, PacketId) = Packet) ->
 outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_2, PacketId) = Packet) ->
     %% PUBLISH (QoS=2), Awaiting PUBREC
     start_awaiting_trace(?PUBREC, PacketId, Packet);
-outgoing_maybe_awaiting_next(?PACKET(?PUBREL, PacketId) = Packet) ->
+outgoing_maybe_awaiting_next(?PUBREL_PACKET(PacketId, _ReasonCode) = Packet) ->
     %% PUBREL (QoS=2), Awaiting PUBCOMP
     start_awaiting_trace(?PUBCOMP, PacketId, Packet);
 %% ====================
@@ -606,7 +623,7 @@ start_awaiting_trace(AwaitingType, PacketId, Packet) ->
         #{attributes => Attrs}
     ),
     NCtx = otel_tracer:set_current_span(ParentCtx, AwaitingSpanCtx),
-    _ = attach_internal_ctx(AwaitingCtxKey, NCtx),
+    _ = attach_internal_ctx(AwaitingCtxKey, #{?EMQX_OTEL_CTX => NCtx, ?EMQX_OTEL_ATTRS => Attrs}),
     ok.
 
 client_incoming(?PACKET(AwaitingType, PktVar) = Packet, Attrs, ProcessFun) ->
@@ -620,7 +637,7 @@ client_incoming(?PACKET(AwaitingType, PktVar) = Packet, Attrs, ProcessFun) ->
 
 end_awaiting_client_packet(AwaitingCtxKey, Attrs) ->
     case detach_internal_ctx(AwaitingCtxKey) of
-        Ctx when is_map(Ctx) ->
+        #{?EMQX_OTEL_CTX := Ctx} when is_map(Ctx) ->
             ok = add_span_attrs(Attrs, Ctx),
             _ = end_span(Ctx),
             earse_internal_ctx(AwaitingCtxKey),
@@ -635,10 +652,10 @@ end_awaiting_client_packet(AwaitingCtxKey, Attrs) ->
 
 %%--------------------------------------------------------------------
 
-attach_internal_ctx({Type, PktVarOrPacketId}, Ctx) ->
-    attach_internal_ctx(internal_extra_key(Type, PktVarOrPacketId), Ctx);
-attach_internal_ctx(Key, Ctx) ->
-    erlang:put(Key, Ctx).
+attach_internal_ctx({Type, PktVarOrPacketId}, Value) ->
+    attach_internal_ctx(internal_extra_key(Type, PktVarOrPacketId), Value);
+attach_internal_ctx(Key, Value) ->
+    erlang:put(Key, Value).
 
 detach_internal_ctx({Type, PktVarOrPacketId}) ->
     detach_internal_ctx(internal_extra_key(Type, PktVarOrPacketId));
@@ -755,19 +772,13 @@ put_ctx(
     Msg = #message{extra = Extra}
 ) when is_map(Extra) ->
     Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}};
-%% extra field has not being used previously and defaulted to an empty list, it's safe to overwrite it
-put_ctx(
-    OtelCtx,
-    Msg
-) when is_record(Msg, message) ->
-    Msg#message{extra = #{?EMQX_OTEL_CTX => OtelCtx}};
 %% ====================
 %% Trace Context in delivery
 put_ctx(OtelCtx, #delivery{message = Msg} = Delivery) ->
     NMsg = put_ctx(OtelCtx, Msg),
     Delivery#delivery{message = NMsg};
 %% ====================
-%% Trace Context in packet
+%% Trace Context in packet PUBLISH or PUBACK(PUBREC, PUBREL, PUBCOMP distinguish by type and qos)
 put_ctx(
     OtelCtx,
     #mqtt_packet{
@@ -784,6 +795,8 @@ put_ctx(
 ) ->
     NProps = to_properties(?EMQX_OTEL_CTX, OtelCtx, Props),
     Packet#mqtt_packet{variable = PubAckPacket#mqtt_packet_puback{properties = NProps}};
+%% ====================
+%% ignore
 put_ctx(
     _OtelCtx,
     Any
@@ -808,13 +821,15 @@ get_ctx(#mqtt_packet{
 get_ctx(_) ->
     undefined.
 
-from_extra(Key, Map) ->
-    maps:get(Key, Map, undefined).
-
-to_properties(Key, OtelCtx, Props) ->
-    Extra = maps:get(?MQTT_INTERNAL_EXTRA, Props, #{}),
-    Props#{?MQTT_INTERNAL_EXTRA => Extra#{Key => OtelCtx}}.
-
+%% ====================
+%% Trace Attributes in message
+put_attrs(
+    Attrs,
+    Msg = #message{extra = Extra}
+) when is_map(Extra) ->
+    Msg#message{extra = Extra#{?EMQX_OTEL_ATTRS => Attrs}};
+%% ====================
+%% Trace Attributes in packet
 put_attrs(
     Attrs,
     #mqtt_packet{
@@ -831,6 +846,8 @@ put_attrs(
 ) ->
     NProps = to_properties(?EMQX_OTEL_ATTRS, Attrs, Props),
     Packet#mqtt_packet{variable = PubAckPacket#mqtt_packet_puback{properties = NProps}};
+%% ====================
+%% ignore
 put_attrs(_Attrs, Any) ->
     Any.
 
@@ -846,6 +863,13 @@ get_attrs(#mqtt_packet{
         )
     ).
 
+from_extra(Key, Map) ->
+    maps:get(Key, Map, undefined).
+
+to_properties(Key, Value, Props) ->
+    Extra = maps:get(?MQTT_INTERNAL_EXTRA, Props, #{}),
+    Props#{?MQTT_INTERNAL_EXTRA => Extra#{Key => Value}}.
+
 clear() ->
     otel_ctx:clear(),
     case logger:get_process_metadata() of