Просмотр исходного кода

fix: put attrs in packet as awaiting spans initial attrs

JimMoen 1 год назад
Родитель
Сommit
df67389562
1 измененных файлов с 64 добавлено и 21 удалено
  1. 64 21
      apps/emqx_opentelemetry/src/emqx_otel_trace.erl

+ 64 - 21
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -531,7 +531,8 @@ start_outgoing_trace(Packet, Attrs, ParentCtx) ->
         #{attributes => Attrs}
     ),
     NCtx = otel_tracer:set_current_span(ParentCtx, SpanCtx),
-    _NPacketWithCtx = put_ctx(NCtx, Packet).
+    Packet1WithCtx = put_ctx(NCtx, Packet),
+    _NPacketWithAttrsg = put_attrs(Attrs, Packet1WithCtx).
 
 %% Ctx attached in Delivers in `broker_publish/2` and
 %% transformed to Packet when outgoing
@@ -566,9 +567,9 @@ end_span(_) ->
 outgoing_maybe_awaiting_next(?PACKET(?PUBACK)) ->
     %% PUBACK (QoS=1), Ignore
     ok;
-outgoing_maybe_awaiting_next(?PUBREC_PACKET(PacketId) = OutgoingPubrecPacket) ->
+outgoing_maybe_awaiting_next(?PUBREC_PACKET(PacketId) = Packet) ->
     %% PUBREC (QoS=2), Awaiting PUBREL
-    start_awaiting_trace(?PUBREL, PacketId, get_ctx(OutgoingPubrecPacket));
+    start_awaiting_trace(?PUBREL, PacketId, Packet);
 outgoing_maybe_awaiting_next(?PACKET(?PUBCOMP)) ->
     %% PUBCOMP (QoS=2), Ignore
     ok;
@@ -579,25 +580,27 @@ outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_0)) ->
     ok;
 outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_1, PacketId) = Packet) ->
     %% PUBLISH (QoS=1), Awaiting PUBACK
-    start_awaiting_trace(?PUBACK, PacketId, get_ctx(Packet));
+    start_awaiting_trace(?PUBACK, PacketId, Packet);
 outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_2, PacketId) = Packet) ->
     %% PUBLISH (QoS=2), Awaiting PUBREC
-    start_awaiting_trace(?PUBREC, PacketId, get_ctx(Packet));
+    start_awaiting_trace(?PUBREC, PacketId, Packet);
 outgoing_maybe_awaiting_next(?PACKET(?PUBREL, PacketId) = Packet) ->
     %% PUBREL (QoS=2), Awaiting PUBCOMP
-    start_awaiting_trace(?PUBCOMP, PacketId, get_ctx(Packet));
+    start_awaiting_trace(?PUBCOMP, PacketId, Packet);
 %% ====================
 outgoing_maybe_awaiting_next(_) ->
     %% TODO: Awaiting AUTH
     ok.
 
-start_awaiting_trace(AwaitingType, PacketId, ParentCtx) ->
+start_awaiting_trace(AwaitingType, PacketId, Packet) ->
     AwaitingCtxKey = internal_extra_key(AwaitingType, PacketId),
+    ParentCtx = get_ctx(Packet),
+    Attrs = get_attrs(Packet),
     AwaitingSpanCtx = otel_tracer:start_span(
         ParentCtx,
         ?current_tracer,
         awaiting_span_name(AwaitingType),
-        #{}
+        #{attributes => Attrs}
     ),
     NCtx = otel_tracer:set_current_span(ParentCtx, AwaitingSpanCtx),
     _ = attach_internal_ctx(AwaitingCtxKey, NCtx),
@@ -764,15 +767,19 @@ put_ctx(OtelCtx, #delivery{message = Msg} = Delivery) ->
 %% Trace Context in packet
 put_ctx(
     OtelCtx,
-    #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet
+    #mqtt_packet{
+        variable = #mqtt_packet_publish{properties = Props} = PubPacket
+    } = Packet
 ) ->
-    NProps = to_properties(OtelCtx, Props),
+    NProps = to_properties(?EMQX_OTEL_CTX, OtelCtx, Props),
     Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = NProps}};
 put_ctx(
     OtelCtx,
-    #mqtt_packet{variable = #mqtt_packet_puback{properties = Props} = PubAckPacket} = Packet
+    #mqtt_packet{
+        variable = #mqtt_packet_puback{properties = Props} = PubAckPacket
+    } = Packet
 ) ->
-    NProps = to_properties(OtelCtx, Props),
+    NProps = to_properties(?EMQX_OTEL_CTX, OtelCtx, Props),
     Packet#mqtt_packet{variable = PubAckPacket#mqtt_packet_puback{properties = NProps}};
 put_ctx(
     _OtelCtx,
@@ -781,24 +788,60 @@ put_ctx(
     Any.
 
 get_ctx(#message{extra = Extra}) ->
-    from_extra(Extra);
+    from_extra(?EMQX_OTEL_CTX, Extra);
 get_ctx(#delivery{message = #message{extra = Extra}}) ->
-    from_extra(Extra);
+    from_extra(?EMQX_OTEL_CTX, Extra);
 get_ctx(#mqtt_packet{
     variable = PktVar
 }) when is_tuple(PktVar) ->
-    from_extra(maps:get(?MQTT_INTERNAL_EXTRA, emqx_packet:info(properties, PktVar), #{}));
+    from_extra(
+        ?EMQX_OTEL_CTX,
+        maps:get(
+            ?MQTT_INTERNAL_EXTRA,
+            emqx_packet:info(properties, PktVar),
+            #{}
+        )
+    );
 get_ctx(_) ->
     undefined.
 
-from_extra(#{?EMQX_OTEL_CTX := OtelCtx}) ->
-    OtelCtx;
-from_extra(_) ->
-    undefined.
+from_extra(Key, Map) ->
+    maps:get(Key, Map, undefined).
 
-to_properties(OtelCtx, Props) ->
+to_properties(Key, OtelCtx, Props) ->
     Extra = maps:get(?MQTT_INTERNAL_EXTRA, Props, #{}),
-    Props#{?MQTT_INTERNAL_EXTRA => Extra#{?EMQX_OTEL_CTX => OtelCtx}}.
+    Props#{?MQTT_INTERNAL_EXTRA => Extra#{Key => OtelCtx}}.
+
+put_attrs(
+    Attrs,
+    #mqtt_packet{
+        variable = #mqtt_packet_publish{properties = Props} = PubPacket
+    } = Packet
+) ->
+    NProps = to_properties(?EMQX_OTEL_ATTRS, Attrs, Props),
+    Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = NProps}};
+put_attrs(
+    Attrs,
+    #mqtt_packet{
+        variable = #mqtt_packet_puback{properties = Props} = PubAckPacket
+    } = Packet
+) ->
+    NProps = to_properties(?EMQX_OTEL_ATTRS, Attrs, Props),
+    Packet#mqtt_packet{variable = PubAckPacket#mqtt_packet_puback{properties = NProps}};
+put_attrs(_Attrs, Any) ->
+    Any.
+
+get_attrs(#mqtt_packet{
+    variable = PktVar
+}) when is_tuple(PktVar) ->
+    from_extra(
+        ?EMQX_OTEL_ATTRS,
+        maps:get(
+            ?MQTT_INTERNAL_EXTRA,
+            emqx_packet:info(properties, PktVar),
+            #{}
+        )
+    ).
 
 clear() ->
     otel_ctx:clear().