Browse Source

feat: client end-to-end tracing by OpenTelemetry

JimMoen 1 year ago
parent
commit
ca0c8b0a21

+ 49 - 0
apps/emqx/include/emqx_external_trace.hrl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_EXT_TRACE_HRL).
+-define(EMQX_EXT_TRACE_HRL, true).
+
+%% --------------------------------------------------------------------
+%% Macros
+
+-define(ATTRS_META, attrs_meta).
+
+-define(EXT_TRACE_ATTRS_META(Meta),
+    {emqx_ext_trace, ?ATTRS_META, Meta}
+).
+
+-define(ext_trace_add_attrs(META),
+    emqx_external_trace:add_span_attrs(META)
+).
+
+-define(ext_trace_add_event(EVENT_NAME, TRACE_ATTRS),
+    emqx_external_trace:add_span_event(
+        EVENT_NAME,
+        TRACE_ATTRS
+    )
+).
+
+%% --------------------------------------------------------------------
+%% types
+
+-type attrs() :: #{atom() => _}.
+
+-type attrs_meta() :: {emqx_ext_trace, ?ATTRS_META, any()}.
+
+-type event_name() :: opentelemetry:event_name().
+
+-endif.

+ 178 - 100
apps/emqx/src/emqx_channel.erl

@@ -24,6 +24,7 @@
 -include("emqx_access_control.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include("emqx_external_trace.hrl").
 
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
@@ -157,6 +158,13 @@
 -define(chan_terminating, chan_terminating).
 -define(RAND_CLIENTID_BYTES, 16).
 
+-define(trace_connect, '$trace.connect.attrs').
+-define(trace_disconnect, '$trace.disconnect.attrs').
+-define(trace_subscribe, '$trace.subscribe.attrs').
+-define(trace_unsubscribe, '$trace.unsubscribe.attrs').
+-define(trace_publish, '$trace.publish.attrs').
+-define(trace_deliver, '$trace.deliver.attrs').
+
 -dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}).
 
 %%--------------------------------------------------------------------
@@ -348,47 +356,17 @@ take_conn_info_fields(Fields, ClientInfo, ConnInfo) ->
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
     ?IS_CONNECTED_OR_REAUTHENTICATING(ConnState)
 ->
+    %% TODO: trace these two cases
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
     handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
-handle_in(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
-    case
-        pipeline(
-            [
-                fun overload_protection/2,
-                fun enrich_conninfo/2,
-                fun run_conn_hooks/2,
-                fun check_connect/2,
-                fun enrich_client/2,
-                %% set_log_meta should happen after enrich_client
-                %% because client ID assign and override
-                fun set_log_meta/2,
-                fun check_banned/2,
-                fun count_flapping_event/2
-            ],
-            ConnPkt,
-            Channel#channel{conn_state = connecting}
-        )
-    of
-        {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
-            ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
-            NChannel1 = NChannel#channel{
-                alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
-            },
-            case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
-                {ok, Properties, NChannel2} ->
-                    %% only store will_msg after successful authn
-                    %% fix for: https://github.com/emqx/emqx/issues/8886
-                    NChannel3 = NChannel2#channel{will_msg = emqx_packet:will_msg(NConnPkt)},
-                    process_connect(Properties, NChannel3);
-                {continue, Properties, NChannel2} ->
-                    handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
-                {error, ReasonCode} ->
-                    handle_out(connack, ReasonCode, NChannel1)
-            end;
-        {error, ReasonCode, NChannel} ->
-            handle_out(connack, ReasonCode, NChannel)
-    end;
+handle_in(?CONNECT_PACKET() = Packet, Channel) ->
+    emqx_external_trace:trace_client_connect(
+        Packet,
+        trace_info(?trace_connect, Channel),
+        fun(PacketWithTrace) -> process_connect(PacketWithTrace, Channel) end
+    );
+%% TODO: trace CONNECT with AUTH
 handle_in(
     Packet = ?AUTH_PACKET(ReasonCode, _Properties),
     Channel = #channel{conn_state = ConnState}
@@ -404,9 +382,9 @@ handle_in(
             {ok, NProperties, NChannel} ->
                 case ConnState of
                     connecting ->
-                        process_connect(NProperties, NChannel);
+                        post_process_connect(NProperties, NChannel);
                     reauthenticating ->
-                        process_connect(NProperties, NChannel);
+                        post_process_connect(NProperties, NChannel);
                     _ ->
                         handle_out(
                             auth,
@@ -447,7 +425,7 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
             emqx_external_trace:trace_process_publish(
                 Packet,
                 %% More info can be added in future, but for now only clientid is used
-                trace_info(Channel),
+                trace_info(?trace_publish, Channel),
                 fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
             );
         {error, ReasonCode} ->
@@ -529,55 +507,37 @@ handle_in(
             ok = emqx_metrics:inc('packets.pubcomp.missed'),
             {ok, Channel}
     end;
-handle_in(SubPkt = ?SUBSCRIBE_PACKET(PacketId, _Properties, _TopicFilters0), Channel0) ->
-    Pipe = pipeline(
-        [
-            fun check_subscribe/2,
-            fun enrich_subscribe/2,
-            %% TODO && FIXME (EMQX-10786): mount topic before authz check.
-            fun check_sub_authzs/2,
-            fun check_sub_caps/2
-        ],
-        SubPkt,
-        Channel0
-    ),
-    case Pipe of
-        {ok, NPkt = ?SUBSCRIBE_PACKET(_PacketId, TFChecked), Channel} ->
-            {TFSubedWithNRC, NChannel} = process_subscribe(run_sub_hooks(NPkt, Channel), Channel),
-            ReasonCodes = gen_reason_codes(TFChecked, TFSubedWithNRC),
-            handle_out(suback, {PacketId, ReasonCodes}, NChannel);
-        {error, {disconnect, RC}, Channel} ->
-            %% funcs in pipeline always cause action: `disconnect`
-            %% And Only one ReasonCode in DISCONNECT packet
-            handle_out(disconnect, RC, Channel)
-    end;
+handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Channel) ->
+    emqx_external_trace:trace_client_subscribe(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        trace_info(?trace_subscribe, Channel),
+        fun(PacketWithTrace) -> process_subscribe(PacketWithTrace, Channel) end
+    );
 handle_in(
-    Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
-    Channel = #channel{clientinfo = ClientInfo}
+    Packet = ?UNSUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters),
+    Channel
 ) ->
-    case emqx_packet:check(Packet) of
-        ok ->
-            TopicFilters1 = run_hooks(
-                'client.unsubscribe',
-                [ClientInfo, Properties],
-                parse_raw_topic_filters(TopicFilters)
-            ),
-            {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel),
-            handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
-        {error, ReasonCode} ->
-            handle_out(disconnect, ReasonCode, Channel)
-    end;
+    emqx_external_trace:trace_client_unsubscribe(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        trace_info(?trace_unsubscribe, Channel),
+        fun(PacketWithTrace) -> process_unsubscribe(PacketWithTrace, Channel) end
+    );
 handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
     {ok, NKeepalive} = emqx_keepalive:check(Keepalive),
     NChannel = Channel#channel{keepalive = NKeepalive},
     {ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)};
 handle_in(
-    ?DISCONNECT_PACKET(ReasonCode, Properties),
-    Channel = #channel{conninfo = ConnInfo}
+    ?DISCONNECT_PACKET() = Packet,
+    Channel
 ) ->
-    NConnInfo = ConnInfo#{disconn_props => Properties},
-    NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = NConnInfo}),
-    process_disconnect(ReasonCode, Properties, NChannel);
+    emqx_external_trace:trace_client_disconnect(
+        Packet,
+        %% More info can be added in future, but for now only clientid is used
+        trace_info(?trace_disconnect, Channel),
+        fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end
+    );
 handle_in(?AUTH_PACKET(), Channel) ->
     handle_out(disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR, Channel);
 handle_in({frame_error, Reason}, Channel) ->
@@ -590,7 +550,48 @@ handle_in(Packet, Channel) ->
 %% Process Connect
 %%--------------------------------------------------------------------
 
-process_connect(
+process_connect(?CONNECT_PACKET(ConnPkt) = Packet, Channel) ->
+    case
+        pipeline(
+            [
+                fun overload_protection/2,
+                fun enrich_conninfo/2,
+                fun run_conn_hooks/2,
+                fun check_connect/2,
+                fun enrich_client/2,
+                %% set_log_meta should happen after enrich_client
+                %% because client ID assign and override
+                fun set_log_meta/2,
+                fun check_banned/2,
+                fun count_flapping_event/2
+            ],
+            ConnPkt,
+            Channel#channel{conn_state = connecting}
+        )
+    of
+        {ok, NConnPkt, NChannel = #channel{clientinfo = ClientInfo}} ->
+            ?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
+            NChannel1 = NChannel#channel{
+                alias_maximum = init_alias_maximum(NConnPkt, ClientInfo)
+            },
+            case authenticate(?CONNECT_PACKET(NConnPkt), NChannel1) of
+                {ok, Properties, NChannel2} ->
+                    %% only store will_msg after successful authn
+                    %% fix for: https://github.com/emqx/emqx/issues/8886
+                    NChannel3 = NChannel2#channel{will_msg = emqx_packet:will_msg(NConnPkt)},
+                    %% TODO: enrich event attrs
+                    ?ext_trace_add_event('client.connect.authenticate', #{result => success}),
+                    post_process_connect(Properties, NChannel3);
+                {continue, Properties, NChannel2} ->
+                    handle_out(auth, {?RC_CONTINUE_AUTHENTICATION, Properties}, NChannel2);
+                {error, ReasonCode} ->
+                    handle_out(connack, ReasonCode, NChannel1)
+            end;
+        {error, ReasonCode, NChannel} ->
+            handle_out(connack, ReasonCode, NChannel)
+    end.
+
+post_process_connect(
     AckProps,
     Channel = #channel{
         conninfo = ConnInfo,
@@ -806,14 +807,40 @@ after_message_acked(ClientInfo, Msg, PubAckProps) ->
 %% Process Subscribe
 %%--------------------------------------------------------------------
 
-process_subscribe(TopicFilters, Channel) ->
-    process_subscribe(TopicFilters, Channel, []).
+process_subscribe(SubPkt = ?SUBSCRIBE_PACKET(PacketId, _Properties, _TopicFilters0), Channel0) ->
+    Pipe = pipeline(
+        [
+            fun check_subscribe/2,
+            fun enrich_subscribe/2,
+            %% TODO && FIXME (EMQX-10786): mount topic before authz check.
+            fun check_sub_authzs/2,
+            fun check_sub_caps/2
+        ],
+        SubPkt,
+        Channel0
+    ),
+    case Pipe of
+        {ok, NPkt = ?SUBSCRIBE_PACKET(_PacketId, TFChecked), Channel} ->
+            {TFSubedWithNRC, NChannel} = post_process_subscribe(
+                run_sub_hooks(NPkt, Channel), Channel
+            ),
+            ReasonCodes = gen_reason_codes(TFChecked, TFSubedWithNRC),
+            handle_out(suback, {PacketId, ReasonCodes}, NChannel);
+        {error, {disconnect, RC}, Channel} ->
+            %% funcs in pipeline always cause action: `disconnect`
+            %% And Only one ReasonCode in DISCONNECT packet
+            handle_out(disconnect, RC, Channel)
+    end.
 
-process_subscribe([], Channel, Acc) ->
+-compile({inline, [post_process_subscribe/2]}).
+post_process_subscribe(TopicFilters, Channel) ->
+    post_process_subscribe(TopicFilters, Channel, []).
+
+post_process_subscribe([], Channel, Acc) ->
     {lists:reverse(Acc), Channel};
-process_subscribe([Filter = {TopicFilter, SubOpts} | More], Channel, Acc) ->
+post_process_subscribe([Filter = {TopicFilter, SubOpts} | More], Channel, Acc) ->
     {NReasonCode, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel),
-    process_subscribe(More, NChannel, [{Filter, NReasonCode} | Acc]).
+    post_process_subscribe(More, NChannel, [{Filter, NReasonCode} | Acc]).
 
 do_subscribe(
     TopicFilter,
@@ -871,15 +898,32 @@ do_gen_reason_codes(
 %% Process Unsubscribe
 %%--------------------------------------------------------------------
 
--compile({inline, [process_unsubscribe/3]}).
-process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
-    process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
+process_unsubscribe(
+    Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters),
+    Channel = #channel{clientinfo = ClientInfo}
+) ->
+    case emqx_packet:check(Packet) of
+        ok ->
+            TopicFilters1 = run_hooks(
+                'client.unsubscribe',
+                [ClientInfo, Properties],
+                parse_raw_topic_filters(TopicFilters)
+            ),
+            {ReasonCodes, NChannel} = post_process_unsubscribe(TopicFilters1, Properties, Channel),
+            handle_out(unsuback, {PacketId, ReasonCodes}, NChannel);
+        {error, ReasonCode} ->
+            handle_out(disconnect, ReasonCode, Channel)
+    end.
 
-process_unsubscribe([], _UnSubProps, Channel, Acc) ->
+-compile({inline, [post_process_unsubscribe/3]}).
+post_process_unsubscribe(TopicFilters, UnSubProps, Channel) ->
+    post_process_unsubscribe(TopicFilters, UnSubProps, Channel, []).
+
+post_process_unsubscribe([], _UnSubProps, Channel, Acc) ->
     {lists:reverse(Acc), Channel};
-process_unsubscribe([{TopicFilter, SubOpts} | More], UnSubProps, Channel, Acc) ->
+post_process_unsubscribe([{TopicFilter, SubOpts} | More], UnSubProps, Channel, Acc) ->
     {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel),
-    process_unsubscribe(More, UnSubProps, NChannel, [RC | Acc]).
+    post_process_unsubscribe(More, UnSubProps, NChannel, [RC | Acc]).
 
 do_unsubscribe(
     TopicFilter,
@@ -909,8 +953,16 @@ maybe_clean_will_msg(?RC_SUCCESS, Channel = #channel{session = Session0}) ->
 maybe_clean_will_msg(_ReasonCode, Channel) ->
     Channel.
 
-%% MQTT-v5.0: 3.14.2.2.2 Session Expiry Interval
 process_disconnect(
+    ?DISCONNECT_PACKET(ReasonCode, Properties),
+    Channel = #channel{conninfo = ConnInfo}
+) ->
+    NConnInfo = ConnInfo#{disconn_props => Properties},
+    NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = NConnInfo}),
+    post_process_disconnect(ReasonCode, Properties, NChannel).
+
+%% MQTT-v5.0: 3.14.2.2.2 Session Expiry Interval
+post_process_disconnect(
     _ReasonCode,
     #{'Session-Expiry-Interval' := Interval},
     Channel = #channel{conninfo = #{expiry_interval := 0}}
@@ -918,7 +970,7 @@ process_disconnect(
     Interval > 0
 ->
     handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
-process_disconnect(ReasonCode, Properties, Channel) ->
+post_process_disconnect(ReasonCode, Properties, Channel) ->
     NChannel = maybe_update_expiry_interval(Properties, Channel),
     {ok, {close, disconnect_reason(ReasonCode)}, NChannel}.
 
@@ -981,7 +1033,7 @@ handle_deliver(
     %% we need to update stats here, as the stats_timer is canceled after disconnected
     {ok, {event, updated}, Channel#channel{session = NSession}};
 handle_deliver(Delivers, Channel) ->
-    Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(Channel)),
+    Delivers1 = emqx_external_trace:start_trace_send(Delivers, trace_info(?trace_deliver, Channel)),
     do_handle_deliver(Delivers1, Channel).
 
 do_handle_deliver(
@@ -1327,7 +1379,7 @@ handle_info({subscribe, TopicFilters}, Channel) ->
     {_TopicFiltersWithRC, NChannel} = process_subscribe(NTopicFilters, Channel),
     {ok, NChannel};
 handle_info({unsubscribe, TopicFilters}, Channel) ->
-    {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel),
+    {_RC, NChannel} = post_process_unsubscribe(TopicFilters, #{}, Channel),
     {ok, NChannel};
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
@@ -1577,8 +1629,34 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
     emqx_olp:backoff(Zone),
     ok.
 
-trace_info(Channel) ->
-    %% More info can be added in future, but for now only clientid is used
+%% Typically it's the init trace attrs
+%% And it's designed for indexing the trace events
+%% It is designed to be used to index all related traces
+%% that have __different TraceIDs__(OF COURSE DIFFERENT).
+%% TODO: implement as init trace attrs callback
+
+%% More info can be added in future, but for now only clientid is used
+trace_info(?trace_connect, Channel) ->
+    maps:from_list(
+        info(
+            [
+                clientid,
+                peername
+            ],
+            Channel
+        )
+    );
+%% TODO:
+%% For now, only clientid is used
+trace_info(?trace_disconnect, Channel) ->
+    maps:from_list(info([clientid], Channel));
+trace_info(?trace_subscribe, Channel) ->
+    maps:from_list(info([clientid], Channel));
+trace_info(?trace_unsubscribe, Channel) ->
+    maps:from_list(info([clientid], Channel));
+trace_info(?trace_publish, Channel) ->
+    maps:from_list(info([clientid], Channel));
+trace_info(?trace_deliver, Channel) ->
     maps:from_list(info([clientid], Channel)).
 
 %%--------------------------------------------------------------------

+ 142 - 18
apps/emqx/src/emqx_external_trace.erl

@@ -15,17 +15,80 @@
 %%--------------------------------------------------------------------
 -module(emqx_external_trace).
 
--callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+-include("emqx_external_trace.hrl").
+
+%% --------------------------------------------------------------------
+%% Trace in Rich mode callbacks
+
+%% Client Connect/Disconnect
+-callback trace_client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback trace_client_authn(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback trace_client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback trace_client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback trace_client_authz(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+-callback trace_client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+
+%% --------------------------------------------------------------------
+%% Span enrichments APIs
+
+-callback add_span_attrs(Attrs) -> ok when
+    Attrs :: attrs() | attrs_meta().
+
+-callback add_span_event(EventName, Attrs) -> ok when
+    EventName :: event_name(),
+    Attrs :: attrs() | attrs_meta().
+
+%% --------------------------------------------------------------------
+%% Legacy mode callbacks
+
+-callback trace_process_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
-    ChannelInfo :: channel_info(),
+    InitAttrs :: attrs(),
     Res :: term().
 
--callback start_trace_send(list(emqx_types:deliver()), channel_info()) ->
-    list(emqx_types:deliver()).
+-callback start_trace_send(Delivers, Attrs) -> Delivers when
+    Delivers :: [emqx_types:deliver()],
+    Attrs :: attrs().
+
+-callback end_trace_send(Packet | [Packet]) -> ok when
+    Packet :: emqx_types:packet().
 
--callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
+%% --------------------------------------------------------------------
 
--type channel_info() :: #{atom() => _}.
+-export([
+    trace_client_connect/3,
+    trace_client_disconnect/3,
+    trace_client_subscribe/3,
+    trace_client_unsubscribe/3
+]).
+
+-export([
+    add_span_attrs/1,
+    add_span_event/2
+]).
 
 -export([
     provider/0,
@@ -36,8 +99,6 @@
     end_trace_send/1
 ]).
 
--export_type([channel_info/0]).
-
 -define(PROVIDER, {?MODULE, trace_provider}).
 
 -define(with_provider(IfRegistered, IfNotRegistered),
@@ -77,22 +138,82 @@ provider() ->
     persistent_term:get(?PROVIDER, undefined).
 
 %%--------------------------------------------------------------------
-%% trace API
+%% Trace in Rich mode API
 %%--------------------------------------------------------------------
 
--spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+%% @doc Start a trace event for Client CONNECT
+-spec trace_client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_client_connect(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Start a trace event for Client DISCONNECT
+-spec trace_client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_client_disconnect(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Start a trace event for Client SUBSCRIBE
+-spec trace_client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
-    ChannelInfo :: channel_info(),
+    InitAttrs :: attrs(),
     Res :: term().
-trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, ChannelInfo, ProcessFun), ProcessFun(Packet)).
+trace_client_subscribe(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
--spec start_trace_send(list(emqx_types:deliver()), channel_info()) ->
-    list(emqx_types:deliver()).
-start_trace_send(Delivers, ChannelInfo) ->
-    ?with_provider(?FUNCTION_NAME(Delivers, ChannelInfo), Delivers).
+%% @doc Start a trace event for Client UNSUBSCRIBE
+-spec trace_client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_client_unsubscribe(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% --------------------------------------------------------------------
+%% Span enrichments APIs
+%% --------------------------------------------------------------------
+
+%% @doc Enrich trace attributes
+-spec add_span_attrs(AttrsOrMeta) -> ok when
+    AttrsOrMeta :: attrs() | attrs_meta().
+add_span_attrs(AttrsOrMeta) ->
+    _ = catch ?with_provider(?FUNCTION_NAME(AttrsOrMeta), ok),
+    ok.
+
+%% @doc Add trace event
+-spec add_span_event(EventName, AttrsOrMeta) -> ok when
+    EventName :: event_name(),
+    AttrsOrMeta :: attrs() | attrs_meta().
+add_span_event(EventName, AttrsOrMeta) ->
+    _ = catch ?with_provider(?FUNCTION_NAME(EventName, AttrsOrMeta), ok),
+    ok.
 
--spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
+%%--------------------------------------------------------------------
+%% Legacy trace API
+%%--------------------------------------------------------------------
+
+%% @doc Trace message processing from publisher
+-spec trace_process_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+    Packet :: emqx_types:packet(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_process_publish(Packet, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
+
+%% @doc Start Trace message delivery to subscriber
+-spec start_trace_send(Delivers, Attrs) -> Delivers when
+    Delivers :: [emqx_types:deliver()],
+    Attrs :: attrs().
+start_trace_send(Delivers, Attrs) ->
+    ?with_provider(?FUNCTION_NAME(Delivers, Attrs), Delivers).
+
+%% @doc End Trace message delivery
+-spec end_trace_send(Packet | [Packet]) -> ok when
+    Packet :: emqx_types:packet().
 end_trace_send(Packets) ->
     ?with_provider(?FUNCTION_NAME(Packets), ok).
 
@@ -100,6 +221,9 @@ end_trace_send(Packets) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+%% TODO:
+%% enrich_trace_attrs/1 and add_trace_event/2
+%% might be optional for providers
 is_valid_provider(Module) ->
     lists:all(
         fun({F, A}) -> erlang:function_exported(Module, F, A) end,

+ 1 - 1
apps/emqx/test/emqx_channel_SUITE.erl

@@ -458,7 +458,7 @@ t_handle_in_expected_packet(_) ->
 t_process_connect(_) ->
     mock_cm_open_session(),
     {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
-        emqx_channel:process_connect(#{}, channel(#{conn_state => idle})).
+        emqx_channel:post_process_connect(#{}, channel(#{conn_state => idle})).
 
 t_process_publish_qos0(_) ->
     ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),

+ 25 - 0
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -0,0 +1,25 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_OTEL_TRACE_HRL).
+-define(EMQX_OTEL_TRACE_HRL, true).
+
+-define(CLIENT_CONNECT_SPAN_NAME, <<"client.connect">>).
+-define(CLIENT_DISCONNECT_SPAN_NAME, <<"client.disconnect">>).
+-define(CLIENT_SUBSCRIBE_SPAN_NAME, <<"client.subscribe">>).
+-define(CLIENT_UNSUBSCRIBE_SPAN_NAME, <<"client.unsubscribe">>).
+
+-endif.

+ 1 - 1
apps/emqx_opentelemetry/src/emqx_opentelemetry.app.src

@@ -1,6 +1,6 @@
 {application, emqx_opentelemetry, [
     {description, "OpenTelemetry for EMQX Broker"},
-    {vsn, "0.2.7"},
+    {vsn, "0.2.8"},
     {registered, []},
     {mod, {emqx_otel_app, []}},
     {applications, [

+ 249 - 25
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -25,14 +25,38 @@
 
 -export([toggle_registered/1]).
 
+%% --------------------------------------------------------------------
+%% Rich Trace Mode callbacks
+
+-export([
+    trace_client_connect/3,
+    %% trace authentification
+    trace_client_disconnect/3,
+    trace_client_subscribe/3,
+    trace_client_unsubscribe/3
+]).
+
+%% --------------------------------------------------------------------
+%% Span enrichments APIs
+
+-export([
+    add_span_attrs/1,
+    add_span_event/2
+]).
+
+%% --------------------------------------------------------------------
+%% Legacy Mode callbacks
+
 -export([
     trace_process_publish/3,
     start_trace_send/2,
     end_trace_send/1
 ]).
 
+-include("emqx_otel_trace.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/emqx_mqtt.hrl").
+-include_lib("emqx/include/emqx_external_trace.hrl").
 -include_lib("opentelemetry_api/include/otel_tracer.hrl").
 
 -define(EMQX_OTEL_CTX, otel_ctx).
@@ -41,6 +65,8 @@
 -define(TRACE_ALL_KEY, {?MODULE, trace_all}).
 -define(TRACE_ALL, persistent_term:get(?TRACE_ALL_KEY, false)).
 
+-define(trace_connect, '$trace.connect.attrs').
+
 %%--------------------------------------------------------------------
 %% config
 %%--------------------------------------------------------------------
@@ -89,21 +115,156 @@ stop() ->
     safe_stop_default_tracer().
 
 %%--------------------------------------------------------------------
-%% trace API
+%% Rich mode trace API
 %%--------------------------------------------------------------------
 
--spec trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+-spec trace_client_connect(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_client_connect(Packet, Attrs, ProcessFun) ->
+    RootCtx = otel_ctx:new(),
+    NAttrs = maps:merge(packet_attributes(Packet), channel_attributes(?trace_connect, Attrs)),
+    SpanCtx = otel_tracer:start_span(
+        RootCtx,
+        ?current_tracer,
+        ?CLIENT_CONNECT_SPAN_NAME,
+        #{
+            attributes => NAttrs
+        }
+    ),
+    Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
+    %% put ctx to packet, so it can be further propagated
+    _ = otel_ctx:attach(Ctx),
+    try
+        ProcessFun(Packet)
+    after
+        _ = ?end_span(),
+        clear()
+    end.
+
+-spec trace_client_disconnect(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_client_disconnect(Packet, Attrs, ProcessFun) ->
+    RootCtx = otel_ctx:new(),
+    %% FIXME: channel attributes fields
+    NAttrs = maps:merge(packet_attributes(Packet), channel_attributes(Attrs)),
+    SpanCtx = otel_tracer:start_span(
+        RootCtx,
+        ?current_tracer,
+        ?CLIENT_DISCONNECT_SPAN_NAME,
+        #{
+            attributes => NAttrs
+        }
+    ),
+    Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
+    %% put ctx to packet, so it can be further propagated
+    _ = otel_ctx:attach(Ctx),
+    try
+        ProcessFun(Packet)
+    after
+        _ = ?end_span(),
+        clear()
+    end.
+
+-spec trace_client_subscribe(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_client_subscribe(Packet, Attrs, ProcessFun) ->
+    RootCtx = otel_ctx:new(),
+    NAttrs = maps:merge(packet_attributes(Packet), channel_attributes(Attrs)),
+    SpanCtx = otel_tracer:start_span(
+        RootCtx,
+        ?current_tracer,
+        ?CLIENT_SUBSCRIBE_SPAN_NAME,
+        #{
+            attributes => NAttrs
+        }
+    ),
+    Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
+    %% put ctx to packet, so it can be further propagated
+    _ = otel_ctx:attach(Ctx),
+    try
+        ProcessFun(Packet)
+    after
+        _ = ?end_span(),
+        clear()
+    end.
+
+-spec trace_client_unsubscribe(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
+    Packet :: emqx_types:packet(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_client_unsubscribe(Packet, Attrs, ProcessFun) ->
+    RootCtx = otel_ctx:new(),
+    NAttrs = maps:merge(packet_attributes(Packet), channel_attributes(Attrs)),
+    SpanCtx = otel_tracer:start_span(
+        RootCtx,
+        ?current_tracer,
+        ?CLIENT_UNSUBSCRIBE_SPAN_NAME,
+        #{
+            attributes => NAttrs
+        }
+    ),
+    Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
+    %% put ctx to packet, so it can be further propagated
+    _ = otel_ctx:attach(Ctx),
+    try
+        ProcessFun(Packet)
+    after
+        _ = ?end_span(),
+        clear()
+    end.
+
+%% --------------------------------------------------------------------
+%% Legacy trace API
+%% --------------------------------------------------------------------
+
+-spec trace_process_publish(
+    Packet,
+    Attrs,
+    fun((Packet) -> Res)
+) ->
+    Res
+when
     Packet :: emqx_types:packet(),
-    ChannelInfo :: emqx_external_trace:channel_info(),
+    Attrs :: attrs(),
     Res :: term().
 trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
     case maybe_init_ctx(Packet) of
         false ->
             ProcessFun(Packet);
         RootCtx ->
-            Attrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)),
+            NAttrs = maps:merge(packet_attributes(Packet), channel_attributes(ChannelInfo)),
             SpanCtx = otel_tracer:start_span(RootCtx, ?current_tracer, process_message, #{
-                attributes => Attrs
+                attributes => NAttrs
             }),
             Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
             %% put ctx to packet, so it can be further propagated
@@ -117,17 +278,23 @@ trace_process_publish(Packet, ChannelInfo, ProcessFun) ->
             end
     end.
 
--spec start_trace_send(list(emqx_types:deliver()), emqx_external_trace:channel_info()) ->
-    list(emqx_types:deliver()).
-start_trace_send(Delivers, ChannelInfo) ->
+-spec start_trace_send(
+    list(Delivers),
+    Attrs
+) ->
+    list(Delivers)
+when
+    Delivers :: emqx_types:deliver(),
+    Attrs :: attrs().
+start_trace_send(Delivers, Attrs) ->
     lists:map(
         fun({deliver, Topic, Msg} = Deliver) ->
             case get_ctx_from_msg(Msg) of
                 Ctx when is_map(Ctx) ->
-                    Attrs = maps:merge(
-                        msg_attributes(Msg), sub_channel_attributes(ChannelInfo)
+                    NAttrs = maps:merge(
+                        msg_attributes(Msg), sub_channel_attributes(Attrs)
                     ),
-                    StartOpts = #{attributes => Attrs},
+                    StartOpts = #{attributes => NAttrs},
                     SpanCtx = otel_tracer:start_span(
                         Ctx, ?current_tracer, send_published_message, StartOpts
                     ),
@@ -142,8 +309,15 @@ start_trace_send(Delivers, ChannelInfo) ->
         Delivers
     ).
 
--spec end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
-end_trace_send(Packets) ->
+-spec end_trace_send(
+    Packet | [Packet]
+) ->
+    ok
+when
+    Packet :: emqx_types:packet().
+end_trace_send(Packets) when
+    is_list(Packets)
+->
     lists:foreach(
         fun(Packet) ->
             case get_ctx_from_packet(Packet) of
@@ -153,18 +327,37 @@ end_trace_send(Packets) ->
                     ok
             end
         end,
-        packets_list(Packets)
-    ).
+        Packets
+    );
+end_trace_send(Packet) ->
+    end_trace_send([Packet]).
 
 %%--------------------------------------------------------------------
-%% Internal functions
+%% Span Attributes API
 %%--------------------------------------------------------------------
 
-packets_list(Packets) when is_list(Packets) ->
-    Packets;
-packets_list(Packet) ->
-    [Packet].
+-spec add_span_attrs(AttrsOrMeta) -> ok when
+    AttrsOrMeta :: attrs() | attrs_meta().
+add_span_attrs(?EXT_TRACE_ATTRS_META(_Meta)) ->
+    %% TODO
+    %% add_span_attrs(meta_to_attrs(Meta));
+    ok;
+add_span_attrs(_Attrs) ->
+    ok.
 
+-spec add_span_event(EventName, Attrs) -> ok when
+    EventName :: event_name(),
+    Attrs :: attrs() | attrs_meta().
+add_span_event(_EventName, ?EXT_TRACE_ATTRS_META(_Meta)) ->
+    %% TODO
+    %% add_span_event(_EventName, meta_to_attrs(_Meta));
+    ok;
+add_span_event(_EventName, _Attrs) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
 maybe_init_ctx(#mqtt_packet{variable = Packet}) ->
     case should_trace_packet(Packet) of
         true ->
@@ -199,21 +392,52 @@ msg_attributes(Msg) ->
         'messaging.client_id' => emqx_message:from(Msg)
     }.
 
-packet_attributes(#mqtt_packet{variable = Packet}) ->
-    #{'messaging.destination.name' => emqx_packet:info(topic_name, Packet)}.
+%% TODO: refactor to use raw `Attrs` or `AttrsMeta`
+%% FIXME:
+%% function_clause for DISCONNECT packet
+%% XXX:
+%% emqx_packet:info/2 as utils
+packet_attributes(?CONNECT_PACKET(#mqtt_packet_connect{clientid = ClientId})) ->
+    #{'client.client_id' => ClientId};
+packet_attributes(#mqtt_packet{
+    header = #mqtt_packet_header{type = ?DISCONNECT}
+}) ->
+    #{};
+packet_attributes(#mqtt_packet{
+    header = #mqtt_packet_header{type = ?PUBLISH},
+    variable = PubPacket
+}) ->
+    #{'messaging.destination.name' => emqx_packet:info(topic_name, PubPacket)};
+packet_attributes(#mqtt_packet{
+    header = #mqtt_packet_header{type = Type}
+}) when Type =:= ?SUBSCRIBE orelse Type =:= ?UNSUBSCRIBE ->
+    #{'client.unsubscribe.topic_filters' => <<"TOPIC_FILTER_PH">>}.
 
 channel_attributes(ChannelInfo) ->
     #{'messaging.client_id' => maps:get(clientid, ChannelInfo, undefined)}.
 
+channel_attributes(?trace_connect, ChannelInfo) ->
+    #{
+        'client.client_id' => maps:get(clientid, ChannelInfo, undefined),
+        'client.peername' => maps:get(peername, ChannelInfo, undefined)
+    }.
+
 sub_channel_attributes(ChannelInfo) ->
     channel_attributes(ChannelInfo).
 
+%% ====================
+%% Trace Context in message
 put_ctx_to_msg(OtelCtx, Msg = #message{extra = Extra}) when is_map(Extra) ->
     Msg#message{extra = Extra#{?EMQX_OTEL_CTX => OtelCtx}};
 %% extra field has not being used previously and defaulted to an empty list, it's safe to overwrite it
 put_ctx_to_msg(OtelCtx, Msg) when is_record(Msg, message) ->
     Msg#message{extra = #{?EMQX_OTEL_CTX => OtelCtx}}.
 
+get_ctx_from_msg(#message{extra = Extra}) ->
+    from_extra(Extra).
+
+%% ====================
+%% Trace Context in packet
 put_ctx_to_packet(
     OtelCtx, #mqtt_packet{variable = #mqtt_packet_publish{properties = Props} = PubPacket} = Packet
 ) ->
@@ -221,9 +445,6 @@ put_ctx_to_packet(
     Props1 = Props#{internal_extra => Extra#{?EMQX_OTEL_CTX => OtelCtx}},
     Packet#mqtt_packet{variable = PubPacket#mqtt_packet_publish{properties = Props1}}.
 
-get_ctx_from_msg(#message{extra = Extra}) ->
-    from_extra(Extra).
-
 get_ctx_from_packet(#mqtt_packet{
     variable = #mqtt_packet_publish{properties = #{internal_extra := Extra}}
 }) ->
@@ -256,3 +477,6 @@ assert_started({error, Reason}) -> {error, Reason}.
 
 set_trace_all(TraceAll) ->
     persistent_term:put({?MODULE, trace_all}, TraceAll).
+
+%%--------------------------------------------------------------------
+%%