Explorar o código

feat(otel): trace puback(QoS=1) pubrec/pubrel/pubcomp(QoS=2)

JimMoen hai 1 ano
pai
achega
bf1aeebb86

+ 179 - 78
apps/emqx/src/emqx_channel.erl

@@ -158,7 +158,8 @@
 -define(chan_terminating, chan_terminating).
 -define(RAND_CLIENTID_BYTES, 16).
 
--define(msg_deliver, '$trace.deliver.attrs').
+-define(DELIVER_TRACE_ATTRS, '$deliver_trace_attrs').
+-define(OUTGOING_TRACE_ATTRS, '$outgoing_trace_attrs').
 
 -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
 
@@ -423,7 +424,7 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
 handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
     case emqx_packet:check(Packet) of
         ok ->
-            emqx_external_trace:msg_publish(
+            emqx_external_trace:client_publish(
                 Packet,
                 %% More info can be added in future, but for now only clientid is used
                 init_trace_attrs(Packet, Channel),
@@ -433,81 +434,45 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
             handle_out(disconnect, ReasonCode, Channel)
     end;
 handle_in(
-    ?PUBACK_PACKET(PacketId, _ReasonCode, Properties),
-    Channel =
-        #channel{clientinfo = ClientInfo, session = Session}
+    ?PACKET(?PUBACK) = Packet,
+    Channel
 ) ->
-    case emqx_session:puback(ClientInfo, PacketId, Session) of
-        {ok, Msg, [], NSession} ->
-            ok = after_message_acked(ClientInfo, Msg, Properties),
-            {ok, Channel#channel{session = NSession}};
-        {ok, Msg, Publishes, NSession} ->
-            ok = after_message_acked(ClientInfo, Msg, Properties),
-            handle_out(publish, Publishes, Channel#channel{session = NSession});
-        {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
-            ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
-            ok = emqx_metrics:inc('packets.puback.inuse'),
-            {ok, Channel};
-        {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?SLOG(warning, #{msg => "puback_packetId_not_found", packetId => PacketId}),
-            ok = emqx_metrics:inc('packets.puback.missed'),
-            {ok, Channel}
-    end;
+    emqx_external_trace:client_puback(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        init_trace_attrs(Packet, Channel),
+        fun(PacketWithTrace) -> process_puback(PacketWithTrace, Channel) end
+    );
 handle_in(
-    %% TODO: Why discard the Reason Code?
-    ?PUBREC_PACKET(PacketId, _ReasonCode, Properties),
-    Channel =
-        #channel{clientinfo = ClientInfo, session = Session}
+    ?PACKET(?PUBREC) = Packet,
+    Channel
 ) ->
-    case emqx_session:pubrec(ClientInfo, PacketId, Session) of
-        {ok, Msg, NSession} ->
-            ok = after_message_acked(ClientInfo, Msg, Properties),
-            NChannel = Channel#channel{session = NSession},
-            handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
-        {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
-            ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
-            ok = emqx_metrics:inc('packets.pubrec.inuse'),
-            handle_out(pubrel, {PacketId, RC}, Channel);
-        {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
-            ok = emqx_metrics:inc('packets.pubrec.missed'),
-            handle_out(pubrel, {PacketId, RC}, Channel)
-    end;
+    emqx_external_trace:client_pubrec(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        init_trace_attrs(Packet, Channel),
+        fun(PacketWithTrace) -> process_pubrec(PacketWithTrace, Channel) end
+    );
 handle_in(
-    ?PUBREL_PACKET(PacketId, _ReasonCode),
-    Channel = #channel{
-        clientinfo = ClientInfo,
-        session = Session
-    }
+    ?PACKET(?PUBREL) = Packet,
+    Channel
 ) ->
-    case emqx_session:pubrel(ClientInfo, PacketId, Session) of
-        {ok, NSession} ->
-            NChannel = Channel#channel{session = NSession},
-            handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
-        {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?SLOG(warning, #{msg => "pubrel_packetId_not_found", packetId => PacketId}),
-            ok = emqx_metrics:inc('packets.pubrel.missed'),
-            handle_out(pubcomp, {PacketId, RC}, Channel)
-    end;
+    emqx_external_trace:client_pubrel(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        init_trace_attrs(Packet, Channel),
+        fun(PacketWithTrace) -> process_pubrel(PacketWithTrace, Channel) end
+    );
 handle_in(
-    ?PUBCOMP_PACKET(PacketId, _ReasonCode),
-    Channel = #channel{
-        clientinfo = ClientInfo, session = Session
-    }
+    ?PACKET(?PUBCOMP) = Packet,
+    Channel
 ) ->
-    case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
-        {ok, [], NSession} ->
-            {ok, Channel#channel{session = NSession}};
-        {ok, Publishes, NSession} ->
-            handle_out(publish, Publishes, Channel#channel{session = NSession});
-        {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
-            ok = emqx_metrics:inc('packets.pubcomp.inuse'),
-            {ok, Channel};
-        {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
-            ?SLOG(warning, #{msg => "pubcomp_packetId_not_found", packetId => PacketId}),
-            ok = emqx_metrics:inc('packets.pubcomp.missed'),
-            {ok, Channel}
-    end;
+    emqx_external_trace:client_pubcomp(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        init_trace_attrs(Packet, Channel),
+        fun(PacketWithTrace) -> process_pubcomp(PacketWithTrace, Channel) end
+    );
 handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Channel) ->
     emqx_external_trace:client_subscribe(
         Packet,
@@ -791,6 +756,108 @@ puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
 puback_reason_code(_PacketId, _Msg, disconnect) ->
     disconnect.
 
+%%--------------------------------------------------------------------
+%% Process PUBACK
+%%--------------------------------------------------------------------
+
+process_puback(
+    ?PUBACK_PACKET(PacketId, _ReasonCode, Properties),
+    Channel =
+        #channel{clientinfo = ClientInfo, session = Session}
+) ->
+    case emqx_session:puback(ClientInfo, PacketId, Session) of
+        {ok, Msg, [], NSession} ->
+            ok = after_message_acked(ClientInfo, Msg, Properties),
+            {ok, Channel#channel{session = NSession}};
+        {ok, Msg, Publishes, NSession} ->
+            ok = after_message_acked(ClientInfo, Msg, Properties),
+            handle_out(publish, Publishes, Channel#channel{session = NSession});
+        {error, ?RC_PROTOCOL_ERROR} ->
+            handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
+        {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
+            ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}),
+            ok = emqx_metrics:inc('packets.puback.inuse'),
+            {ok, Channel};
+        {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
+            ?SLOG(warning, #{msg => "puback_packetId_not_found", packetId => PacketId}),
+            ok = emqx_metrics:inc('packets.puback.missed'),
+            {ok, Channel}
+    end.
+
+%%--------------------------------------------------------------------
+%% Process PUBREC
+%%--------------------------------------------------------------------
+
+process_pubrec(
+    %% TODO: Why discard the Reason Code?
+    ?PUBREC_PACKET(PacketId, _ReasonCode, Properties),
+    Channel =
+        #channel{clientinfo = ClientInfo, session = Session}
+) ->
+    case emqx_session:pubrec(ClientInfo, PacketId, Session) of
+        {ok, Msg, NSession} ->
+            ok = after_message_acked(ClientInfo, Msg, Properties),
+            NChannel = Channel#channel{session = NSession},
+            handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
+        {error, RC = ?RC_PROTOCOL_ERROR} ->
+            handle_out(disconnect, RC, Channel);
+        {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
+            ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}),
+            ok = emqx_metrics:inc('packets.pubrec.inuse'),
+            handle_out(pubrel, {PacketId, RC}, Channel);
+        {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
+            ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}),
+            ok = emqx_metrics:inc('packets.pubrec.missed'),
+            handle_out(pubrel, {PacketId, RC}, Channel)
+    end.
+
+%%--------------------------------------------------------------------
+%% Process PUBREL
+%%--------------------------------------------------------------------
+
+process_pubrel(
+    ?PUBREL_PACKET(PacketId, _ReasonCode),
+    Channel = #channel{
+        clientinfo = ClientInfo,
+        session = Session
+    }
+) ->
+    case emqx_session:pubrel(ClientInfo, PacketId, Session) of
+        {ok, NSession} ->
+            NChannel = Channel#channel{session = NSession},
+            handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
+        {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
+            ?SLOG(warning, #{msg => "pubrel_packetId_not_found", packetId => PacketId}),
+            ok = emqx_metrics:inc('packets.pubrel.missed'),
+            handle_out(pubcomp, {PacketId, RC}, Channel)
+    end.
+
+%%--------------------------------------------------------------------
+%% Process PUBCOMP
+%%--------------------------------------------------------------------
+
+process_pubcomp(
+    ?PUBCOMP_PACKET(PacketId, _ReasonCode),
+    Channel = #channel{
+        clientinfo = ClientInfo, session = Session
+    }
+) ->
+    case emqx_session:pubcomp(ClientInfo, PacketId, Session) of
+        {ok, [], NSession} ->
+            {ok, Channel#channel{session = NSession}};
+        {ok, Publishes, NSession} ->
+            handle_out(publish, Publishes, Channel#channel{session = NSession});
+        {error, ?RC_PROTOCOL_ERROR} ->
+            handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
+        {error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
+            ok = emqx_metrics:inc('packets.pubcomp.inuse'),
+            {ok, Channel};
+        {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
+            ?SLOG(warning, #{msg => "pubcomp_packetId_not_found", packetId => PacketId}),
+            ok = emqx_metrics:inc('packets.pubcomp.missed'),
+            {ok, Channel}
+    end.
+
 -compile({inline, [after_message_acked/3]}).
 after_message_acked(ClientInfo, Msg, PubAckProps) ->
     ok = emqx_metrics:inc('messages.acked'),
@@ -1030,9 +1097,8 @@ handle_deliver(
     {ok, {event, updated}, Channel#channel{session = NSession}};
 handle_deliver(Delivers, Channel) ->
     Delivers1 = emqx_external_trace:msg_deliver(
-        ?EXT_TRACE_START,
         Delivers,
-        init_trace_attrs(?msg_deliver, Channel)
+        init_trace_attrs(?DELIVER_TRACE_ATTRS, Channel)
     ),
     do_handle_deliver(Delivers1, Channel).
 
@@ -1175,13 +1241,33 @@ handle_out(publish, Publishes, Channel) ->
     {Packets, NChannel} = do_deliver(Publishes, Channel),
     {ok, ?REPLY_OUTGOING(Packets), NChannel};
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
-    {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel};
+    {ok,
+        start_outgoing_trace(
+            ?PUBACK_PACKET(PacketId, ReasonCode),
+            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+        ),
+        Channel};
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
-    {ok, ?PUBREC_PACKET(PacketId, ReasonCode), Channel};
+    {ok,
+        start_outgoing_trace(
+            ?PUBREC_PACKET(PacketId, ReasonCode),
+            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+        ),
+        Channel};
 handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
-    {ok, ?PUBREL_PACKET(PacketId, ReasonCode), Channel};
+    {ok,
+        start_outgoing_trace(
+            ?PUBREL_PACKET(PacketId, ReasonCode),
+            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+        ),
+        Channel};
 handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
-    {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel};
+    {ok,
+        start_outgoing_trace(
+            ?PUBCOMP_PACKET(PacketId, ReasonCode),
+            init_trace_attrs(?OUTGOING_TRACE_ATTRS, Channel)
+        ),
+        Channel};
 handle_out(suback, {PacketId, ReasonCodes}, Channel = ?IS_MQTT_V5) ->
     return_sub_unsub_ack(?SUBACK_PACKET(PacketId, ReasonCodes), Channel);
 handle_out(suback, {PacketId, ReasonCodes}, Channel) ->
@@ -1667,6 +1753,11 @@ init_trace_attrs(
     Channel
 ) ->
     maps:from_list(info([clientid], Channel));
+init_trace_attrs(
+    ?PACKET(?PUBACK, _PktVar),
+    Channel
+) ->
+    maps:from_list(info([clientid], Channel));
 init_trace_attrs(
     ?PACKET(?SUBSCRIBE, _PktVar),
     Channel
@@ -1678,10 +1769,17 @@ init_trace_attrs(
 ) ->
     maps:from_list(info([clientid], Channel));
 init_trace_attrs(
-    ?msg_deliver,
+    ?DELIVER_TRACE_ATTRS,
     Channel
 ) ->
-    maps:from_list(info([clientid], Channel)).
+    maps:from_list(info([clientid], Channel));
+init_trace_attrs(
+    ?OUTGOING_TRACE_ATTRS,
+    Channel
+) ->
+    maps:from_list(info([clientid, username], Channel));
+init_trace_attrs(_, _) ->
+    #{}.
 
 %%--------------------------------------------------------------------
 %% Enrich MQTT Connect Info
@@ -3033,6 +3131,9 @@ proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
 proto_ver(_, _) ->
     ?MQTT_PROTO_V4.
 
+start_outgoing_trace(Packet, Attrs) ->
+    emqx_external_trace:outgoing(?EXT_TRACE_START, Packet, Attrs).
+
 %%--------------------------------------------------------------------
 %% For CT tests
 %%--------------------------------------------------------------------

+ 1 - 1
apps/emqx/src/emqx_connection.erl

@@ -848,7 +848,7 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
 
 handle_outgoing(Packets, State) ->
     Res = do_handle_outgoing(Packets, State),
-    emqx_external_trace:msg_deliver(
+    emqx_external_trace:outgoing(
         ?EXT_TRACE_STOP,
         Packets,
         _Attrs = #{}

+ 117 - 17
apps/emqx/src/emqx_external_trace.erl

@@ -53,7 +53,27 @@
 
 %% Message Processing Spans
 %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DISPATCH -> DELIVER(to Subscribers)
--callback msg_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback client_puback(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback client_pubrec(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback client_pubrel(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback client_pubcomp(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
@@ -82,15 +102,25 @@
     Res :: term().
 
 -callback msg_deliver(
-    TraceAction,
     list(Deliver) | Packet | list(Packet),
     Attrs
 ) -> Deliver when
-    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
     Deliver :: emqx_types:deliver(),
     Packet :: emqx_types:packet(),
     Attrs :: attrs().
 
+-callback outgoing(
+    TraceAction,
+    Packet,
+    Attrs
+) ->
+    Res
+when
+    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+
 %% --------------------------------------------------------------------
 %% Span enrichments APIs
 
@@ -125,12 +155,21 @@
     client_unsubscribe/3,
     client_authn/3,
     client_authz/3,
-    msg_publish/3,
+    client_publish/3,
+    client_puback/3,
+    client_pubrec/3,
+    client_pubrel/3,
+    client_pubcomp/3,
     msg_route/3,
     msg_dispatch/3,
     msg_forward/3,
     msg_handle_forward/3,
-    msg_deliver/3
+    msg_deliver/2,
+
+    %% Start Span when Reply PACKETs generated
+    %% as when `emqx_channel:handle_out/3` called.
+    %% Stop when `emqx_channel:handle_outgoing/3` returned
+    outgoing/3
 ]).
 
 -export([
@@ -238,14 +277,52 @@ client_authz(Packet, InitAttrs, ProcessFun) ->
 
 %% TODO:
 %% split to:
-%% `msg_publish/3` for legacy_mode
+%% `client_publish/3` for legacy_mode
 %% `trace_client_publish/3` for end_to_end_mode
-%% @doc Trace message processing from publisher
--spec msg_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+
+%% @doc Trace message PUBLISH (QoS=0, QoS=1, QoS=2)
+%% Client(Publisher) -> Broker: PUBLISH
+-spec client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-msg_publish(Packet, InitAttrs, ProcessFun) ->
+client_publish(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Trace message PUBACK (QoS=1)
+%% Client(Subscriber) -> Broker: PUBACK
+-spec client_puback(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+client_puback(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Trace message PUBREC (QoS=2)
+%% Client(Subscriber) -> Broker: PUBREC
+-spec client_pubrec(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+client_pubrec(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Trace message PUBREL (QoS=2)
+%% Client(Publisher) -> Broker: PUBREL
+-spec client_pubrel(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+client_pubrel(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Trace message PUBCOMP (QoS=2)
+%% Client(Subscriber) -> Broker: PUBCOMP
+-spec client_pubcomp(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+client_pubcomp(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
 -spec msg_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
@@ -262,6 +339,11 @@ msg_route(Delivery, InitAttrs, ProcessFun) ->
 msg_dispatch(Delivery, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
 
+%% @doc Trace message forwarding
+%% `Span' is the smallest unit in tracing and `CANNOT' be propagated across nodes.
+%%  Divide the message forwarding process into two spans: `message.forward` and `message.handle_forward`.
+%% The span `message.forward` starts on the publisher process and ends on the same one.
+%% Broker(Publisher) -> Broker(Subscriber): FORWARD on Broker(Publisher)
 -spec msg_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     Delivery :: emqx_types:delivery(),
     InitAttrs :: attrs(),
@@ -269,6 +351,9 @@ msg_dispatch(Delivery, InitAttrs, ProcessFun) ->
 msg_forward(Delivery, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
 
+%% @doc Trace message forward handling
+%% The span `message.handle_forward` starts on the RPC process and ends on the same one.
+%% Broker(Publisher) -> Broker(Subscriber): HANDLE FORWARD on Broker(Subscriber)
 -spec msg_handle_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     InitAttrs :: attrs(),
     Delivery :: emqx_types:delivery(),
@@ -283,18 +368,33 @@ msg_handle_forward(Delivery, InitAttrs, ProcessFun) ->
 
 %% @doc Start Trace message delivery to subscriber
 -spec msg_deliver(
-    TraceAction,
     list(Deliver) | Packet | list(Packet),
     Attrs
 ) -> Deliver when
-    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
     Deliver :: emqx_types:deliver(),
     Packet :: emqx_types:packet(),
     Attrs :: attrs().
-msg_deliver(TraceAction, DeliverOrPackets, Attrs) ->
+msg_deliver(DeliverOrPackets, Attrs) ->
+    ?with_provider(
+        ?FUNCTION_NAME(DeliverOrPackets, Attrs),
+        DeliverOrPackets
+    ).
+
+-spec outgoing(
+    TraceAction,
+    Packet,
+    Attrs
+) ->
+    Res
+when
+    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+outgoing(TraceAction, Packet, Attrs) ->
     ?with_provider(
-        ?FUNCTION_NAME(TraceAction, DeliverOrPackets, Attrs),
-        deliver_without_provider(TraceAction, DeliverOrPackets)
+        ?FUNCTION_NAME(TraceAction, Packet, Attrs),
+        res_without_provider(TraceAction, Packet)
     ).
 
 %% --------------------------------------------------------------------
@@ -320,9 +420,9 @@ add_span_event(EventName, AttrsOrMeta) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-deliver_without_provider(?EXT_TRACE_START, DeliverOrPackets) ->
-    DeliverOrPackets;
-deliver_without_provider(?EXT_TRACE_STOP, _Packets) ->
+res_without_provider(?EXT_TRACE_START, Any) ->
+    Any;
+res_without_provider(?EXT_TRACE_STOP, _Packets) ->
     ok.
 
 %% TODO:

+ 11 - 3
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -24,13 +24,21 @@
 -define(CLIENT_SUBSCRIBE_SPAN_NAME, <<"client.subscribe">>).
 -define(CLIENT_UNSUBSCRIBE_SPAN_NAME, <<"client.unsubscribe">>).
 
--define(MSG_PUBLISH_SPAN_NAME, <<"message.publish">>).
+-define(EMQX_PUBACK_SPAN_NAME, <<"emqx.puback">>).
+-define(EMQX_PUBREC_SPAN_NAME, <<"emqx.pubrec">>).
+-define(EMQX_PUBREL_SPAN_NAME, <<"emqx.pubrel">>).
+-define(EMQX_PUBCOMP_SPAN_NAME, <<"emqx.pubcomp">>).
+
+-define(CLIENT_PUBLISH_SPAN_NAME, <<"client.publish">>).
+-define(CLIENT_PUBACK_SPAN_NAME, <<"client.puback">>).
+-define(CLIENT_PUBREC_SPAN_NAME, <<"client.pubrec">>).
+-define(CLIENT_PUBREL_SPAN_NAME, <<"client.pubrel">>).
+-define(CLIENT_PUBCOMP_SPAN_NAME, <<"client.pubcomp">>).
+
 -define(MSG_ROUTE_SPAN_NAME, <<"message.route">>).
 -define(MSG_DISPATCH_SPAN_NAME, <<"message.dispatch">>).
 -define(MSG_FORWARD_SPAN_NAME, <<"message.forward">>).
 -define(MSG_HANDLE_FORWARD_SPAN_NAME, <<"message.handle_forward">>).
 -define(MSG_DELIVER_SPAN_NAME, <<"message.deliver">>).
 
-%% -define(MSG_SEND_SPAN_NAME, <<"message.send">>).
-
 -endif.

+ 354 - 40
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -36,14 +36,22 @@
     client_authn/3,
     client_authz/3,
 
-    %% Message Processing Spans
+    %% Message Processing Spans (From Client)
     %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DISPATCH -> DELIVER(to Subscribers)
-    msg_publish/3,
+    client_publish/3,
+    client_puback/3,
+    client_pubrec/3,
+    client_pubrel/3,
+    client_pubcomp/3,
     msg_route/3,
     msg_dispatch/3,
     msg_forward/3,
     msg_handle_forward/3,
-    msg_deliver/3
+    msg_deliver/2,
+
+    %% Start Span when `emqx_channel:handle_out/3` called.
+    %% Stop when `emqx_channel:handle_outgoing/3` returned
+    outgoing/3
 ]).
 
 %% --------------------------------------------------------------------
@@ -56,10 +64,11 @@
 
 -include("emqx_otel_trace.hrl").
 -include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
 -include_lib("emqx/include/emqx_external_trace.hrl").
--include_lib("opentelemetry_api/include/opentelemetry.hrl").
 -include_lib("opentelemetry_api/include/otel_tracer.hrl").
+-include_lib("opentelemetry_api/include/opentelemetry.hrl").
 
 -define(EMQX_OTEL_CTX, otel_ctx).
 
@@ -276,7 +285,7 @@ client_authz(Packet, _Attrs, ProcessFun) ->
         end
     ).
 
--spec msg_publish(
+-spec client_publish(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -286,23 +295,107 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-msg_publish(Packet, Attrs, ProcessFun) ->
+client_publish(Packet, Attrs, ProcessFun) ->
+    %% XXX: should trace for durable sessions?
     RootCtx = otel_ctx:new(),
     SpanCtx = otel_tracer:start_span(
         RootCtx,
         ?current_tracer,
-        ?MSG_PUBLISH_SPAN_NAME,
+        ?CLIENT_PUBLISH_SPAN_NAME,
         #{attributes => gen_attrs(Packet, Attrs)}
     ),
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
+    %% XXX: Remove otel_ctx attach
+    %% use maybe_attach_for_outgoing/2 only
     _ = otel_ctx:attach(Ctx),
+    %% Attach before ProcessFun evaluated
+    attach_by_maybe_outgoing(Packet, Ctx),
     try
         ProcessFun(Packet)
     after
+        %% XXX: Remove otel_ctx detach and `?end_span()`
+        %% end current span by
+        %% otel_span:end_span(otel_tracer:current_span_ctx(Ctx));
         _ = ?end_span(),
         clear()
     end.
 
+%% to `Publisher'
+attach_by_maybe_outgoing(?PACKET(?PUBACK), _Ctx) ->
+    ok;
+attach_by_maybe_outgoing(?PUBREC_PACKET(PacketId), Ctx) ->
+    attach_internal_ctx({?PUBREC, PacketId}, Ctx);
+attach_by_maybe_outgoing(?PACKET(?PUBCOMP), _Ctx) ->
+    ok;
+%% to `Subscriber'
+attach_by_maybe_outgoing(?PUBLISH_PACKET(?QOS_0), _Ctx) ->
+    ok;
+attach_by_maybe_outgoing(?PUBLISH_PACKET(?QOS_1, PacketId), Ctx) ->
+    attach_internal_ctx({?PUBACK, PacketId}, Ctx),
+    ok;
+attach_by_maybe_outgoing(?PUBLISH_PACKET(?QOS_2, PacketId), Ctx) ->
+    attach_internal_ctx({?PUBREC, PacketId}, Ctx),
+    ok;
+attach_by_maybe_outgoing(?PUBREL_PACKET(PacketId), Ctx) ->
+    attach_internal_ctx({?PUBCOMP, PacketId}, Ctx),
+    ok;
+attach_by_maybe_outgoing(Packet, Ctx) ->
+    %% TODO: SUBACK, UNSUBACK
+    ?SLOG(error, #{msg => "Unknown Packet", packet => Packet, ctx => Ctx}),
+    ok.
+
+-spec client_puback(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+client_puback(Packet, Attrs, ProcessFun) ->
+    client_incoming(Packet, Attrs, ProcessFun).
+
+-spec client_pubrec(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+client_pubrec(Packet, Attrs, ProcessFun) ->
+    client_incoming(Packet, Attrs, ProcessFun).
+
+-spec client_pubrel(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+client_pubrel(Packet, Attrs, ProcessFun) ->
+    client_incoming(Packet, Attrs, ProcessFun).
+
+-spec client_pubcomp(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+client_pubcomp(Packet, Attrs, ProcessFun) ->
+    client_incoming(Packet, Attrs, ProcessFun).
+
 -spec msg_route(
     Delivery,
     Attrs,
@@ -401,25 +494,20 @@ msg_handle_forward(Delivery, Attrs, Fun) ->
             )
     end.
 
-%% --------------------------------------------------------------------
-%% Legacy trace API
-%% --------------------------------------------------------------------
-
+%% NOTE:
+%% Span starts in Delivers(Msg) and stops when outgoing(Packets)
+%% Only for `PUBLISH(Qos=0|1|2)`
 -spec msg_deliver(
-    TraceAction,
     list(Deliver) | Packet | list(Packet),
     Attrs
 ) ->
-    %% Attach Ctx into Delivers (Publisher Process)
+    %% Delivers with Ctx Attached
     list(Deliver)
-    %% Detach Ctx from Packets (Subscriber Processes)
-    | ok
 when
-    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
     Deliver :: emqx_types:deliver(),
     Packet :: emqx_types:packet(),
     Attrs :: attrs().
-msg_deliver(?EXT_TRACE_START, Delivers, Attrs) ->
+msg_deliver(Delivers, Attrs) ->
     lists:map(
         fun({deliver, Topic, Msg} = Deliver) ->
             case get_ctx(Msg) of
@@ -438,23 +526,224 @@ msg_deliver(?EXT_TRACE_START, Delivers, Attrs) ->
             end
         end,
         Delivers
-    );
-msg_deliver(?EXT_TRACE_STOP, Packets, _Attrs) when
-    is_list(Packets)
+    ).
+
+-spec outgoing(
+    TraceAction,
+    Packet,
+    Attrs
+) ->
+    Res
+when
+    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+outgoing(?EXT_TRACE_START, Packet, Attrs) ->
+    %% Note: the function case only for
+    %% `PUBACK`, `PUBREC`, `PUBREL`, `PUBCOMP`
+    start_outgoing_trace(Packet, Attrs);
+outgoing(?EXT_TRACE_STOP, Any, Attrs) ->
+    stop_outgoing_trace(Any, Attrs).
+
+%% --------------------------------------------------------------------
+%% Internal Helpers
+%% --------------------------------------------------------------------
+
+%% ====================
+%% Broker -> Client(`Publisher'):
+start_outgoing_trace(?PUBACK_PACKET(PacketId) = Packet, Attrs) ->
+    start_outgoing_trace(Packet, Attrs, detach_internal_ctx({?PUBACK, PacketId}));
+start_outgoing_trace(?PUBREC_PACKET(PacketId) = Packet, Attrs) ->
+    start_outgoing_trace(Packet, Attrs, detach_internal_ctx({?PUBREC, PacketId}));
+start_outgoing_trace(?PUBCOMP_PACKET(PacketId) = Packet, Attrs) ->
+    start_outgoing_trace(Packet, Attrs, detach_internal_ctx({?PUBREL, PacketId}));
+%% ====================
+%% Broker -> Client(`Subscriber'):
+start_outgoing_trace(?PUBREL_PACKET(PacketId) = Packet, Attrs) ->
+    start_outgoing_trace(Packet, Attrs, detach_internal_ctx({?PUBREC, PacketId}));
+%% The Incoming span is still being recorded and Ctx has not been erased
+%% when the following outgoing spans starting.
+%% `SUBACK' / `UNSUBACK' / `PUBACK'
+start_outgoing_trace(Packet, Attrs) ->
+    start_outgoing_trace(Packet, Attrs, otel_ctx:get_current()).
+
+start_outgoing_trace(Packet, Attrs, ParentCtx) ->
+    SpanCtx = otel_tracer:start_span(
+        ParentCtx,
+        ?current_tracer,
+        outgoing_span_name(Packet),
+        #{attributes => gen_attrs(Packet, Attrs)}
+    ),
+    NCtx = otel_tracer:set_current_span(ParentCtx, SpanCtx),
+    _NPacketWithCtx = put_ctx(NCtx, Packet).
+
+%% Ctx attached in Delivers in `msg_deliver/2` and
+%% transformed to Packet when outgoing
+stop_outgoing_trace(Anys, Attrs) when is_list(Anys) ->
+    lists:foreach(fun(Any) -> stop_outgoing_trace(Any, Attrs) end, Anys);
+stop_outgoing_trace(Packet, _Attrs) when is_record(Packet, mqtt_packet) ->
+    %% Maybe pending for next Packet
+    %% The current outgoing Packet SHOULD NOT be modified
+    ok = outgoing_maybe_pending_next(Packet),
+    end_span(get_ctx(Packet));
+stop_outgoing_trace(Any, _Attrs) ->
+    end_span(get_ctx(Any)).
+
+%% -compile({inline, [end_span/1]}).
+end_span(Ctx) when
+    is_map(Ctx)
 ->
-    lists:foreach(
-        fun(Packet) ->
-            case get_ctx(Packet) of
-                Ctx when is_map(Ctx) ->
-                    otel_span:end_span(otel_tracer:current_span_ctx(Ctx));
-                _ ->
-                    ok
-            end
-        end,
-        Packets
+    otel_span:end_span(otel_tracer:current_span_ctx(Ctx));
+end_span(_) ->
+    ok.
+
+%% Note:
+%% When replying PUBACK/PUBREC/PUBCOMP to the `Publisher' or sending PUBLISH/PUBREL to the `Subscriber',
+%% the current `PacketId' is stored and pending-trace begins.
+%% At this time, a new span begins and the span ends after receiving and processing the reply
+%% from Client(might be Publisher or Subscriber).
+
+%% -compile(
+%%     {inline, [
+%%         start_next_packet_trace/1,
+%%         stop_packet_trace/2
+%%     ]}
+%% ).
+%% ====================
+%% Broker -> Client(`Publisher'):
+
+outgoing_maybe_pending_next(?PACKET(?PUBACK)) ->
+    %% PUBACK (QoS=1), Ignore
+    ok;
+outgoing_maybe_pending_next(?PUBREC_PACKET(PacketId) = OutgoingPubrecPacket) ->
+    %% PUBREC (QoS=2), Pending PUBREL
+    start_pending_trace(?PUBREL, PacketId, get_ctx(OutgoingPubrecPacket));
+outgoing_maybe_pending_next(?PACKET(?PUBCOMP)) ->
+    %% PUBCOMP (QoS=2), Ignore
+    ok;
+%% ====================
+%% Broker -> Client(`Subscriber'):
+outgoing_maybe_pending_next(?PUBLISH_PACKET(?QOS_0)) ->
+    %% PUBLISH (QoS=0), Ignore
+    ok;
+outgoing_maybe_pending_next(?PUBLISH_PACKET(?QOS_1, PacketId) = Packet) ->
+    %% PUBLISH (QoS=1), Pending PUBACK
+    start_pending_trace(?PUBACK, PacketId, get_ctx(Packet));
+outgoing_maybe_pending_next(?PUBLISH_PACKET(?QOS_2, PacketId) = Packet) ->
+    %% PUBLISH (QoS=2), Pending PUBREC
+    start_pending_trace(?PUBREC, PacketId, get_ctx(Packet));
+outgoing_maybe_pending_next(?PACKET(?PUBREL, PacketId) = Packet) ->
+    %% PUBREL (QoS=2), Pending PUBCOMP
+    start_pending_trace(?PUBCOMP, PacketId, get_ctx(Packet));
+%% ====================
+outgoing_maybe_pending_next(_) ->
+    %% TODO: Pending AUTH
+    ok.
+
+start_pending_trace(PendingType, PacketId, ParentCtx) ->
+    PendingCtxKey = internal_extra_key(PendingType, PacketId),
+    PendingSpanCtx = otel_tracer:start_span(
+        ParentCtx,
+        ?current_tracer,
+        pending_span_name(PendingType),
+        #{}
+    ),
+    NCtx = otel_tracer:set_current_span(ParentCtx, PendingSpanCtx),
+    _ = attach_internal_ctx(PendingCtxKey, NCtx),
+    ok.
+
+client_incoming(?PACKET(PendingType, PktVar) = Packet, _Attrs, ProcessFun) ->
+    try
+        ProcessFun(Packet)
+    after
+        end_pending_client_packet(
+            internal_extra_key(PendingType, PktVar)
+        )
+    end.
+
+end_pending_client_packet(PendingCtxKey) ->
+    case detach_internal_ctx(PendingCtxKey) of
+        Ctx when is_map(Ctx) ->
+            _ = end_span(Ctx),
+            earse_internal_ctx(PendingCtxKey),
+            ok;
+        _ ->
+            %% TODO:
+            %% erlang:get() for all internal_extra_key and erase all unknown PendingType
+            earse_internal_ctx(PendingCtxKey),
+            ok
+    end,
+    ok.
+
+%%--------------------------------------------------------------------
+
+attach_internal_ctx({Type, PktVarOrPacketId}, Ctx) ->
+    attach_internal_ctx(internal_extra_key(Type, PktVarOrPacketId), Ctx);
+attach_internal_ctx(Key, Ctx) ->
+    erlang:put(Key, Ctx).
+
+detach_internal_ctx({Type, PktVarOrPacketId}) ->
+    detach_internal_ctx(internal_extra_key(Type, PktVarOrPacketId));
+detach_internal_ctx(Key) ->
+    erlang:get(Key).
+
+earse_internal_ctx({Type, PktVarOrPacketId}) ->
+    earse_internal_ctx(internal_extra_key(Type, PktVarOrPacketId));
+earse_internal_ctx(Key) ->
+    erlang:erase(Key).
+
+%% -compile(
+%%     {inline, [
+%%         internal_extra_key_tuple/2,
+%%         emqx_outgoing_span_name/1,
+%%         client_incoming_span_name/1
+%%     ]}
+%% ).
+
+internal_extra_key(Type, PktVar) when is_tuple(PktVar) ->
+    internal_extra_key(
+        Type,
+        emqx_packet:info(packet_id, PktVar)
     );
-msg_deliver(?EXT_TRACE_STOP, Packet, Attrs) ->
-    msg_deliver(?EXT_TRACE_STOP, [Packet], Attrs).
+internal_extra_key(Type, PacketId) ->
+    {
+        ?MQTT_INTERNAL_EXTRA,
+        emqx_packet:type_name(Type),
+        PacketId
+    }.
+
+%% Outgoing Packet Span Name
+%% Broker -> Client(`Subscriber'):
+outgoing_span_name(?PACKET(?PUBREL)) ->
+    %% PUBREL (QoS=2)
+    ?EMQX_PUBREL_SPAN_NAME;
+%% Broker -> Client(`Publisher'):
+outgoing_span_name(?PACKET(?PUBACK)) ->
+    %% PUBACK (QoS=1)
+    ?EMQX_PUBACK_SPAN_NAME;
+outgoing_span_name(?PACKET(?PUBREC)) ->
+    %% PUBREC (QoS=2)
+    ?EMQX_PUBREC_SPAN_NAME;
+outgoing_span_name(?PACKET(?PUBCOMP)) ->
+    %% PUBCOMP (QoS=2)
+    ?EMQX_PUBCOMP_SPAN_NAME.
+
+%% Incoming Packet Span Name
+%% Client(`Publisher') -> Broker:
+pending_span_name(?PUBREL) ->
+    %% PUBREL (QoS=2)
+    ?CLIENT_PUBREL_SPAN_NAME;
+%% Client(`Subscriber') -> Broker:
+pending_span_name(?PUBACK) ->
+    %% PUBACK (QoS=1)
+    ?CLIENT_PUBACK_SPAN_NAME;
+pending_span_name(?PUBREC) ->
+    %% PUBREC (QoS=2)
+    ?CLIENT_PUBREC_SPAN_NAME;
+pending_span_name(?PUBCOMP) ->
+    %% PUBCOMP (QoS=2)
+    ?CLIENT_PUBCOMP_SPAN_NAME.
 
 %%--------------------------------------------------------------------
 %% Span Attributes API
@@ -478,6 +767,10 @@ add_span_event(_EventName, ?EXT_TRACE_ATTRS_META(_Meta)) ->
     %% add_span_event(_EventName, meta_to_attrs(_Meta));
     ok;
 add_span_event(EventName, Attrs) ->
+    %% TODO
+    %% The otel ctx is in Packet or Delivery
+    %% not in the current process dictionary
+    %% get it by internal_extra_key(Packet)
     true = ?add_event(EventName, Attrs),
     ok.
 
@@ -549,6 +842,12 @@ gen_attrs(
         'message.publish.qos' => QoS,
         'message.publish.from' => maps:get(clientid, Attrs, undefined)
     };
+gen_attrs(?PACKET(?PUBACK, PktVar), Attrs) ->
+    #{
+        'emqx.puback.reason_code' => emqx_packet:info(reason_code, PktVar),
+        'emqx.puback.to_clientid' => maps:get(clientid, Attrs, undefined),
+        'emqx.puback.to_username' => maps:get(username, Attrs, undefined)
+    };
 gen_attrs(#message{} = Msg, Attrs) ->
     #{
         %% XXX: maybe use `to_topic_filter` as the subscribed
@@ -561,8 +860,14 @@ gen_attrs(#message{} = Msg, Attrs) ->
     };
 gen_attrs(#delivery{}, Attrs) ->
     Attrs;
-gen_attrs(_, Attrs) ->
-    Attrs.
+gen_attrs(_, Attrs) when is_map(Attrs) ->
+    Attrs;
+gen_attrs(_, InvalidAttrs) ->
+    ?SLOG(warning, #{
+        msg => "invalid_attributes",
+        attrs => InvalidAttrs
+    }),
+    #{}.
 
 serialize_topic_filters(?PACKET(?SUBSCRIBE, PktVar)) ->
     TFs = [Name || {Name, _SubOpts} <- emqx_packet:info(topic_filters, PktVar)],
@@ -594,9 +899,14 @@ put_ctx(
     OtelCtx,
     #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet
 ) ->
-    Extra = maps:get(?MQTT_INTERNAL_EXTRA, Props, #{}),
-    Props1 = Props#{?MQTT_INTERNAL_EXTRA => Extra#{?EMQX_OTEL_CTX => OtelCtx}},
-    Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = Props1}};
+    NProps = to_properties(OtelCtx, Props),
+    Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = NProps}};
+put_ctx(
+    OtelCtx,
+    #mqtt_packet{variable = #mqtt_packet_puback{properties = Props} = PubAckPacket} = Packet
+) ->
+    NProps = to_properties(OtelCtx, Props),
+    Packet#mqtt_packet{variable = PubAckPacket#mqtt_packet_puback{properties = NProps}};
 put_ctx(
     _OtelCtx,
     Any
@@ -608,9 +918,9 @@ get_ctx(#message{extra = Extra}) ->
 get_ctx(#delivery{message = #message{extra = Extra}}) ->
     from_extra(Extra);
 get_ctx(#mqtt_packet{
-    variable = #mqtt_packet_publish{properties = #{?MQTT_INTERNAL_EXTRA := Extra}}
-}) ->
-    from_extra(Extra);
+    variable = PktVar
+}) when is_tuple(PktVar) ->
+    from_extra(maps:get(?MQTT_INTERNAL_EXTRA, emqx_packet:info(properties, PktVar), #{}));
 get_ctx(_) ->
     undefined.
 
@@ -619,6 +929,10 @@ from_extra(#{?EMQX_OTEL_CTX := OtelCtx}) ->
 from_extra(_) ->
     undefined.
 
+to_properties(OtelCtx, Props) ->
+    Extra = maps:get(?MQTT_INTERNAL_EXTRA, Props, #{}),
+    Props#{?MQTT_INTERNAL_EXTRA => Extra#{?EMQX_OTEL_CTX => OtelCtx}}.
+
 clear() ->
     otel_ctx:clear().