Przeglądaj źródła

fix: otel integration as enterprise feature

JimMoen 1 rok temu
rodzic
commit
df4566a8d5

+ 57 - 11
apps/emqx/include/emqx_external_trace.hrl

@@ -20,30 +20,76 @@
 %% --------------------------------------------------------------------
 %% --------------------------------------------------------------------
 %% Macros
 %% Macros
 
 
--define(ATTRS_META, attrs_meta).
+-define(EXT_TRACE_START, '$ext_trace_start').
+-define(EXT_TRACE_STOP, '$ext_trace_stop').
+
+-define(EMQX_EXTERNAL_MODULE, emqx_external_trace).
+-define(PROVIDER, {?EMQX_EXTERNAL_MODULE, trace_provider}).
 
 
--define(EXT_TRACE_ATTRS_META(Meta),
-    {emqx_ext_trace, ?ATTRS_META, Meta}
+-if(?EMQX_RELEASE_EDITION == ee).
+
+-define(res_without_provider(TraceAction, Any),
+    case TraceAction of
+        ?EXT_TRACE_START ->
+            Any;
+        ?EXT_TRACE_STOP ->
+            ok
+    end
 ).
 ).
 
 
--define(ext_trace_add_attrs(ATTRS_OR_META),
-    emqx_external_trace:add_span_attrs(ATTRS_OR_META)
+-define(with_provider(IfRegistered, IfNotRegistered),
+    case persistent_term:get(?PROVIDER, undefined) of
+        undefined ->
+            IfNotRegistered;
+        Provider ->
+            Provider:IfRegistered
+    end
 ).
 ).
 
 
--define(ext_trace_add_event(EVENT_NAME, TRACE_ATTRS),
-    emqx_external_trace:add_span_event(EVENT_NAME, TRACE_ATTRS)
+-define(EXT_TRACE_ANY(FuncName, Any, Attrs),
+    ?with_provider(
+        FuncName(Any, Attrs),
+        Any
+    )
 ).
 ).
 
 
--define(EXT_TRACE_START, '$ext_trace_start').
--define(EXT_TRACE_STOP, '$ext_trace_stop').
+-define(EXT_TRACE_ADD_ATTRS(Attrs),
+    ?with_provider(add_span_attrs(Attrs), ok)
+).
+
+-define(EXT_TRACE_ADD_ATTRS(Attrs, Ctx),
+    ?with_provider(add_span_attrs(Attrs, Ctx), ok)
+).
+
+-define(EXT_TRACE_WITH_ACTION(FuncName, TraceAction, Any, Attrs),
+    ?with_provider(
+        FuncName(TraceAction, Any, Attrs),
+        ?res_without_provider(TraceAction, Any)
+    )
+).
+
+-define(EXT_TRACE_WITH_PROCESS_FUN(FuncName, Any, Attrs, ProcessFun),
+    ?with_provider(
+        FuncName(Any, Attrs, ProcessFun),
+        ProcessFun(Any)
+    )
+).
+
+-else.
+
+-define(EXT_TRACE_ANY(_FuncName, Any, _Attrs), Any).
+-define(EXT_TRACE_ADD_ATTRS(_Attrs), ok).
+-define(EXT_TRACE_ADD_ATTRS(_Attrs, _Ctx), ok).
+-define(EXT_TRACE_WITH_ACTION(_FuncName, _TraceAction, Any, _Attrs), Any).
+-define(EXT_TRACE_WITH_PROCESS_FUN(_FuncName, Any, _Attrs, ProcessFun), ProcessFun(Any)).
+
+-endif.
 
 
 %% --------------------------------------------------------------------
 %% --------------------------------------------------------------------
 %% types
 %% types
 
 
 -type attrs() :: #{atom() => _}.
 -type attrs() :: #{atom() => _}.
 
 
--type attrs_meta() :: {emqx_ext_trace, ?ATTRS_META, any()}.
-
 -type event_name() :: opentelemetry:event_name().
 -type event_name() :: opentelemetry:event_name().
 
 
 -endif.
 -endif.

+ 29 - 37
apps/emqx/src/emqx_broker.erl

@@ -341,21 +341,17 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 route(Routes, Delivery = #delivery{message = Msg}, PersistRes) ->
 route(Routes, Delivery = #delivery{message = Msg}, PersistRes) ->
-    TraceRouteAttrs = #{
-        'route.from' => node(),
-        'route.matched_result' => emqx_utils_json:encode([
-            route_result({TF, RouteTo})
-         || {TF, RouteTo} <- Routes
-        ])
-    },
-    emqx_external_trace:msg_route(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        msg_route,
         Delivery,
         Delivery,
-        TraceRouteAttrs,
+        (emqx_external_trace:msg_attrs(Msg))#{
+            'route.from' => node(),
+            'route.matched_result' => emqx_utils_json:encode([
+                route_result({TF, RouteTo})
+             || {TF, RouteTo} <- Routes
+            ])
+        },
         fun(DeliveryWithTrace) ->
         fun(DeliveryWithTrace) ->
-            add_route_attrs(
-                (not emqx_message:is_sys(Msg)) andalso
-                    emqx_external_trace:msg_attrs(Msg)
-            ),
             do_route(Routes, DeliveryWithTrace, PersistRes)
             do_route(Routes, DeliveryWithTrace, PersistRes)
         end
         end
     ).
     ).
@@ -370,12 +366,18 @@ route_result({TF, Group}) ->
 do_route([], #delivery{message = Msg}, _PersistRes = []) ->
 do_route([], #delivery{message = Msg}, _PersistRes = []) ->
     ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
     ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
     ok = inc_dropped_cnt(Msg),
     ok = inc_dropped_cnt(Msg),
-    add_route_attrs(
-        (not emqx_message:is_sys(Msg)) andalso
-            #{
-                'route.dropped.node' => node(),
-                'route.dropped.reason' => no_subscribers
-            }
+    ?EXT_TRACE_ADD_ATTRS(
+        begin
+            case Msg of
+                #message{flags = #{sys := true}} ->
+                    #{};
+                _ ->
+                    #{
+                        'route.dropped.node' => node(),
+                        'route.dropped.reason' => no_subscribers
+                    }
+            end
+        end
     ),
     ),
     [];
     [];
 do_route([], _Delivery, PersistRes = [_ | _]) ->
 do_route([], _Delivery, PersistRes = [_ | _]) ->
@@ -418,14 +420,13 @@ do_forward_external(Delivery, RouteRes) ->
     emqx_external_broker:forward(Delivery) ++ RouteRes.
     emqx_external_broker:forward(Delivery) ++ RouteRes.
 
 
 forward(Node, To, Delivery = #delivery{message = Msg}, RpcMode) ->
 forward(Node, To, Delivery = #delivery{message = Msg}, RpcMode) ->
-    ForwardAttrs = #{
-        'forward.from' => node(),
-        'forward.to' => Node
-    },
-    MsgAttrs = emqx_external_trace:msg_attrs(Msg),
-    emqx_external_trace:msg_forward(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        msg_forward,
         Delivery,
         Delivery,
-        maps:merge(ForwardAttrs, MsgAttrs),
+        (emqx_external_trace:msg_attrs(Msg))#{
+            'forward.from' => node(),
+            'forward.to' => Node
+        },
         fun(DeliveryWithTrace) ->
         fun(DeliveryWithTrace) ->
             do_forward(Node, To, DeliveryWithTrace, RpcMode)
             do_forward(Node, To, DeliveryWithTrace, RpcMode)
         end
         end
@@ -461,7 +462,8 @@ do_forward(Node, To, Delivery, sync) ->
 %% `emqx_broker_proto_v1:forward/3` or
 %% `emqx_broker_proto_v1:forward/3` or
 %% `emqx_broker_proto_v1:forward_async/3`
 %% `emqx_broker_proto_v1:forward_async/3`
 dispatch(Topic, Delivery = #delivery{message = Msg}) ->
 dispatch(Topic, Delivery = #delivery{message = Msg}) ->
-    emqx_external_trace:msg_handle_forward(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        msg_handle_forward,
         Delivery,
         Delivery,
         emqx_external_trace:msg_attrs(Msg),
         emqx_external_trace:msg_attrs(Msg),
         fun(DeliveryWithTrace) ->
         fun(DeliveryWithTrace) ->
@@ -494,12 +496,6 @@ inc_dropped_cnt(Msg) ->
             emqx_metrics:inc('messages.dropped.no_subscribers')
             emqx_metrics:inc('messages.dropped.no_subscribers')
     end.
     end.
 
 
--compile({inline, [add_route_attrs/1]}).
-add_route_attrs(false) ->
-    ok;
-add_route_attrs(Attrs) ->
-    ?ext_trace_add_attrs(Attrs).
-
 -compile({inline, [subscribers/1]}).
 -compile({inline, [subscribers/1]}).
 -spec subscribers(
 -spec subscribers(
     emqx_types:topic()
     emqx_types:topic()
@@ -742,10 +738,6 @@ do_dispatch2(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
     case erlang:is_process_alive(SubPid) of
         true ->
         true ->
             SubPid ! {deliver, Topic, Msg},
             SubPid ! {deliver, Topic, Msg},
-            ?ext_trace_add_attrs(#{
-                'dispatch.to_clientid' => maps:get(subid, get_subopts(SubPid, Topic), undefined),
-                'dispatch.to_topic' => Topic
-            }),
             1;
             1;
         false ->
         false ->
             0
             0

+ 51 - 41
apps/emqx/src/emqx_channel.erl

@@ -361,7 +361,8 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
     handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
     handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
 handle_in(?PACKET(?CONNECT) = Packet, Channel) ->
 handle_in(?PACKET(?CONNECT) = Packet, Channel) ->
-    emqx_external_trace:client_connect(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_connect,
         Packet,
         Packet,
         connect_trace_attrs(Packet, Channel),
         connect_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) ->
         fun(PacketWithTrace) ->
@@ -424,7 +425,8 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
 handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
 handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) ->
     case emqx_packet:check(Packet) of
     case emqx_packet:check(Packet) of
         ok ->
         ok ->
-            emqx_external_trace:client_publish(
+            ?EXT_TRACE_WITH_PROCESS_FUN(
+                client_publish,
                 Packet,
                 Packet,
                 basic_trace_attrs(Channel),
                 basic_trace_attrs(Channel),
                 fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
                 fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
@@ -436,7 +438,8 @@ handle_in(
     ?PACKET(?PUBACK) = Packet,
     ?PACKET(?PUBACK) = Packet,
     Channel
     Channel
 ) ->
 ) ->
-    emqx_external_trace:client_puback(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_puback,
         Packet,
         Packet,
         basic_trace_attrs(Channel),
         basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_puback(PacketWithTrace, Channel) end
         fun(PacketWithTrace) -> process_puback(PacketWithTrace, Channel) end
@@ -445,7 +448,8 @@ handle_in(
     ?PACKET(?PUBREC) = Packet,
     ?PACKET(?PUBREC) = Packet,
     Channel
     Channel
 ) ->
 ) ->
-    emqx_external_trace:client_pubrec(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_pubrec,
         Packet,
         Packet,
         basic_trace_attrs(Channel),
         basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_pubrec(PacketWithTrace, Channel) end
         fun(PacketWithTrace) -> process_pubrec(PacketWithTrace, Channel) end
@@ -454,7 +458,8 @@ handle_in(
     ?PACKET(?PUBREL) = Packet,
     ?PACKET(?PUBREL) = Packet,
     Channel
     Channel
 ) ->
 ) ->
-    emqx_external_trace:client_pubrel(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_pubrel,
         Packet,
         Packet,
         basic_trace_attrs(Channel),
         basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_pubrel(PacketWithTrace, Channel) end
         fun(PacketWithTrace) -> process_pubrel(PacketWithTrace, Channel) end
@@ -463,13 +468,15 @@ handle_in(
     ?PACKET(?PUBCOMP) = Packet,
     ?PACKET(?PUBCOMP) = Packet,
     Channel
     Channel
 ) ->
 ) ->
-    emqx_external_trace:client_pubcomp(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_pubcomp,
         Packet,
         Packet,
         basic_trace_attrs(Channel),
         basic_trace_attrs(Channel),
         fun(PacketWithTrace) -> process_pubcomp(PacketWithTrace, Channel) end
         fun(PacketWithTrace) -> process_pubcomp(PacketWithTrace, Channel) end
     );
     );
 handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Channel) ->
 handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Channel) ->
-    emqx_external_trace:client_subscribe(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_subscribe,
         Packet,
         Packet,
         (basic_trace_attrs(Channel))#{
         (basic_trace_attrs(Channel))#{
             'client.subscribe.topics' => serialize_topic_filters(Packet)
             'client.subscribe.topics' => serialize_topic_filters(Packet)
@@ -480,7 +487,8 @@ handle_in(
     Packet = ?UNSUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters),
     Packet = ?UNSUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters),
     Channel
     Channel
 ) ->
 ) ->
-    emqx_external_trace:client_unsubscribe(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_unsubscribe,
         Packet,
         Packet,
         (basic_trace_attrs(Channel))#{
         (basic_trace_attrs(Channel))#{
             'client.unsubscribe.topics' => serialize_topic_filters(Packet)
             'client.unsubscribe.topics' => serialize_topic_filters(Packet)
@@ -495,7 +503,8 @@ handle_in(
     ?PACKET(?DISCONNECT, PktVar) = Packet,
     ?PACKET(?DISCONNECT, PktVar) = Packet,
     Channel
     Channel
 ) ->
 ) ->
-    emqx_external_trace:client_disconnect(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_disconnect,
         Packet,
         Packet,
         (basic_trace_attrs(Channel))#{
         (basic_trace_attrs(Channel))#{
             'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
             'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
@@ -601,7 +610,7 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
     of
     of
         {ok, NPacket, NChannel} ->
         {ok, NPacket, NChannel} ->
             Msg = packet_to_message(NPacket, NChannel),
             Msg = packet_to_message(NPacket, NChannel),
-            ?ext_trace_add_attrs(emqx_external_trace:msg_attrs(Msg)),
+            ok = ?EXT_TRACE_ADD_ATTRS(emqx_external_trace:msg_attrs(Msg)),
             do_publish(PacketId, Msg, NChannel);
             do_publish(PacketId, Msg, NChannel);
         {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
         {error, Rc = ?RC_NOT_AUTHORIZED, NChannel} ->
             ?SLOG_THROTTLE(
             ?SLOG_THROTTLE(
@@ -1100,10 +1109,12 @@ handle_deliver(
     %% we need to update stats here, as the stats_timer is canceled after disconnected
     %% we need to update stats here, as the stats_timer is canceled after disconnected
     {ok, {event, updated}, Channel#channel{session = NSession}};
     {ok, {event, updated}, Channel#channel{session = NSession}};
 handle_deliver(Delivers, Channel) ->
 handle_deliver(Delivers, Channel) ->
-    Delivers1 = emqx_external_trace:broker_publish(
-        Delivers,
-        basic_trace_attrs(Channel)
-    ),
+    Delivers1 =
+        ?EXT_TRACE_ANY(
+            broker_publish,
+            Delivers,
+            basic_trace_attrs(Channel)
+        ),
     do_handle_deliver(Delivers1, Channel).
     do_handle_deliver(Delivers1, Channel).
 
 
 do_handle_deliver(
 do_handle_deliver(
@@ -1246,28 +1257,36 @@ handle_out(publish, Publishes, Channel) ->
     {ok, ?REPLY_OUTGOING(Packets), NChannel};
     {ok, ?REPLY_OUTGOING(Packets), NChannel};
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
 handle_out(puback, {PacketId, ReasonCode}, Channel) ->
     {ok,
     {ok,
-        start_outgoing_trace(
+        ?EXT_TRACE_WITH_ACTION(
+            outgoing,
+            ?EXT_TRACE_START,
             ?PUBACK_PACKET(PacketId, ReasonCode),
             ?PUBACK_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
             basic_trace_attrs(Channel)
         ),
         ),
         Channel};
         Channel};
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
 handle_out(pubrec, {PacketId, ReasonCode}, Channel) ->
     {ok,
     {ok,
-        start_outgoing_trace(
+        ?EXT_TRACE_WITH_ACTION(
+            outgoing,
+            ?EXT_TRACE_START,
             ?PUBREC_PACKET(PacketId, ReasonCode),
             ?PUBREC_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
             basic_trace_attrs(Channel)
         ),
         ),
         Channel};
         Channel};
 handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
 handle_out(pubrel, {PacketId, ReasonCode}, Channel) ->
     {ok,
     {ok,
-        start_outgoing_trace(
+        ?EXT_TRACE_WITH_ACTION(
+            outgoing,
+            ?EXT_TRACE_START,
             ?PUBREL_PACKET(PacketId, ReasonCode),
             ?PUBREL_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
             basic_trace_attrs(Channel)
         ),
         ),
         Channel};
         Channel};
 handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
 handle_out(pubcomp, {PacketId, ReasonCode}, Channel) ->
     {ok,
     {ok,
-        start_outgoing_trace(
+        ?EXT_TRACE_WITH_ACTION(
+            outgoing,
+            ?EXT_TRACE_START,
             ?PUBCOMP_PACKET(PacketId, ReasonCode),
             ?PUBCOMP_PACKET(PacketId, ReasonCode),
             basic_trace_attrs(Channel)
             basic_trace_attrs(Channel)
         ),
         ),
@@ -1301,13 +1320,8 @@ handle_out(Type, Data, Channel) ->
 %% Return ConnAck
 %% Return ConnAck
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-return_connack(?CONNACK_PACKET(RC, SessPresent) = AckPacket, Channel) ->
-    ?ext_trace_add_attrs(#{'client.connack.reason_code' => RC}),
-    ?ext_trace_add_event('client.connack', #{
-        reason_code => RC,
-        msg => connack_in_queue,
-        session_present => bool(SessPresent)
-    }),
+return_connack(?CONNACK_PACKET(RC, _SessPresent) = AckPacket, Channel) ->
+    ?EXT_TRACE_ADD_ATTRS(#{'client.connack.reason_code' => RC}),
     do_return_connack(AckPacket, Channel).
     do_return_connack(AckPacket, Channel).
 
 
 do_return_connack(AckPacket, Channel) ->
 do_return_connack(AckPacket, Channel) ->
@@ -1330,10 +1344,6 @@ do_return_connack(AckPacket, Channel) ->
             {ok, Replies ++ Outgoing, NChannel2}
             {ok, Replies ++ Outgoing, NChannel2}
     end.
     end.
 
 
--compile({inline, [bool/1]}).
-bool(0) -> false;
-bool(1) -> true.
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Deliver publish: broker -> client
 %% Deliver publish: broker -> client
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -1923,7 +1933,7 @@ maybe_assign_clientid(_ConnPkt, ClientInfo = #{clientid := ClientId}) when
     {ok, ClientInfo};
     {ok, ClientInfo};
 maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
 maybe_assign_clientid(#mqtt_packet_connect{clientid = <<>>}, ClientInfo) ->
     RandClientId = emqx_utils:rand_id(?RAND_CLIENTID_BYTES),
     RandClientId = emqx_utils:rand_id(?RAND_CLIENTID_BYTES),
-    ?ext_trace_add_attrs(#{'client.clientid' => RandClientId}),
+    ?EXT_TRACE_ADD_ATTRS(#{'client.clientid' => RandClientId}),
     {ok, ClientInfo#{clientid => RandClientId}};
     {ok, ClientInfo#{clientid => RandClientId}};
 maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
 maybe_assign_clientid(#mqtt_packet_connect{clientid = ClientId}, ClientInfo) ->
     {ok, ClientInfo#{clientid => ClientId}}.
     {ok, ClientInfo#{clientid => ClientId}}.
@@ -2074,7 +2084,8 @@ authenticate(?PACKET(?AUTH) = Packet, Channel) ->
     process_authenticate(Packet, Channel);
     process_authenticate(Packet, Channel);
 authenticate(Packet, Channel) ->
 authenticate(Packet, Channel) ->
     %% Authenticate by CONNECT Packet
     %% Authenticate by CONNECT Packet
-    emqx_external_trace:client_authn(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_authn,
         Packet,
         Packet,
         #{
         #{
             'client.clientid' => info(clientid, Channel),
             'client.clientid' => info(clientid, Channel),
@@ -2083,7 +2094,7 @@ authenticate(Packet, Channel) ->
         fun(PacketWithTrace) ->
         fun(PacketWithTrace) ->
             Res = process_authenticate(PacketWithTrace, Channel),
             Res = process_authenticate(PacketWithTrace, Channel),
             %% TODO: which authenticator is used
             %% TODO: which authenticator is used
-            ?ext_trace_add_attrs(authn_attrs(Res)),
+            ?EXT_TRACE_ADD_ATTRS(authn_attrs(Res)),
             Res
             Res
         end
         end
     ).
     ).
@@ -2354,7 +2365,8 @@ authz_action(#message{qos = QoS}) ->
 %% Check Pub Authorization
 %% Check Pub Authorization
 
 
 check_pub_authz(Packet, Channel) ->
 check_pub_authz(Packet, Channel) ->
-    emqx_external_trace:client_authz(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_authz,
         Packet,
         Packet,
         (basic_trace_attrs(Channel))#{'authz.action_type' => publish},
         (basic_trace_attrs(Channel))#{'authz.action_type' => publish},
         fun(PacketWithTrace) ->
         fun(PacketWithTrace) ->
@@ -2371,13 +2383,13 @@ do_check_pub_authz(
     Action = authz_action(Packet),
     Action = authz_action(Packet),
     case emqx_access_control:authorize(ClientInfo, Action, Topic) of
     case emqx_access_control:authorize(ClientInfo, Action, Topic) of
         allow ->
         allow ->
-            ?ext_trace_add_attrs(#{
+            ?EXT_TRACE_ADD_ATTRS(#{
                 'authz.publish.topic' => Topic,
                 'authz.publish.topic' => Topic,
                 'authz.publish.result' => allow
                 'authz.publish.result' => allow
             }),
             }),
             ok;
             ok;
         deny ->
         deny ->
-            ?ext_trace_add_attrs(#{
+            ?EXT_TRACE_ADD_ATTRS(#{
                 'authz.publish.topic' => Topic,
                 'authz.publish.topic' => Topic,
                 'authz.publish.result' => deny,
                 'authz.publish.result' => deny,
                 'authz.reason_code' => ?RC_NOT_AUTHORIZED
                 'authz.reason_code' => ?RC_NOT_AUTHORIZED
@@ -2413,7 +2425,8 @@ check_subscribe(SubPkt, _Channel) ->
 %% Check Sub Authorization
 %% Check Sub Authorization
 
 
 check_sub_authzs(Packet, Channel) ->
 check_sub_authzs(Packet, Channel) ->
-    emqx_external_trace:client_authz(
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        client_authz,
         Packet,
         Packet,
         (basic_trace_attrs(Channel))#{'authz.action_type' => subscribe},
         (basic_trace_attrs(Channel))#{'authz.action_type' => subscribe},
         fun(PacketWithTrace) ->
         fun(PacketWithTrace) ->
@@ -2435,13 +2448,13 @@ do_check_sub_authzs(
     DenyAction = emqx:get_config([authorization, deny_action], ignore),
     DenyAction = emqx:get_config([authorization, deny_action], ignore),
     case DenyAction =:= disconnect andalso HasAuthzDeny of
     case DenyAction =:= disconnect andalso HasAuthzDeny of
         true ->
         true ->
-            ?ext_trace_add_attrs(#{
+            ?EXT_TRACE_ADD_ATTRS(#{
                 'authz.deny_action' => disconnect,
                 'authz.deny_action' => disconnect,
                 'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
                 'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
             }),
             }),
             {error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel};
             {error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel};
         false ->
         false ->
-            ?ext_trace_add_attrs(#{
+            ?EXT_TRACE_ADD_ATTRS(#{
                 'authz.deny_action' => ignore,
                 'authz.deny_action' => ignore,
                 'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
                 'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
             }),
             }),
@@ -3143,9 +3156,6 @@ proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
 proto_ver(_, _) ->
 proto_ver(_, _) ->
     ?MQTT_PROTO_V4.
     ?MQTT_PROTO_V4.
 
 
-start_outgoing_trace(Packet, Attrs) ->
-    emqx_external_trace:outgoing(?EXT_TRACE_START, Packet, Attrs).
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% For CT tests
 %% For CT tests
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 2 - 1
apps/emqx/src/emqx_connection.erl

@@ -848,7 +848,8 @@ with_channel(Fun, Args, State = #state{channel = Channel}) ->
 
 
 handle_outgoing(Packets, State) ->
 handle_outgoing(Packets, State) ->
     Res = do_handle_outgoing(Packets, State),
     Res = do_handle_outgoing(Packets, State),
-    emqx_external_trace:outgoing(
+    ?EXT_TRACE_WITH_ACTION(
+        outgoing,
         ?EXT_TRACE_STOP,
         ?EXT_TRACE_STOP,
         Packets,
         Packets,
         _Attrs = #{}
         _Attrs = #{}

+ 29 - 258
apps/emqx/src/emqx_external_trace.erl

@@ -97,22 +97,11 @@
     Delivery :: emqx_types:delivery(),
     Delivery :: emqx_types:delivery(),
     Res :: term().
     Res :: term().
 
 
--callback broker_publish(
-    list(Deliver),
-    Attrs
-) ->
-    list(Deliver)
-when
+-callback broker_publish(list(Deliver), Attrs) -> list(Deliver) when
     Deliver :: emqx_types:deliver(),
     Deliver :: emqx_types:deliver(),
     Attrs :: attrs().
     Attrs :: attrs().
 
 
--callback outgoing(
-    TraceAction,
-    Packet,
-    Attrs
-) ->
-    Res
-when
+-callback outgoing(TraceAction, Packet, Attrs) -> Res when
     TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
     TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
     Packet :: emqx_types:packet(),
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Attrs :: attrs(),
@@ -122,55 +111,42 @@ when
 %% Span enrichments APIs
 %% Span enrichments APIs
 
 
 -callback add_span_attrs(Attrs) -> ok when
 -callback add_span_attrs(Attrs) -> ok when
-    Attrs :: attrs() | attrs_meta().
-
--callback add_span_event(EventName, Attrs) -> ok when
-    EventName :: event_name(),
-    Attrs :: attrs() | attrs_meta().
-
-%% --------------------------------------------------------------------
-%% Legacy mode callbacks
+    Attrs :: attrs().
 
 
-%% -callback end_trace_send(Packet | [Packet]) -> ok when
-%%     Packet :: emqx_types:packet().
+-callback add_span_attrs(Attrs, Ctx) -> ok when
+    Attrs :: attrs(),
+    Ctx :: otel_ctx:t().
 
 
 -optional_callbacks(
 -optional_callbacks(
     [
     [
         add_span_attrs/1,
         add_span_attrs/1,
-        add_span_event/2,
+        add_span_attrs/2,
         client_authn/3,
         client_authn/3,
         client_authz/3
         client_authz/3
     ]
     ]
 ).
 ).
 
 
 %% --------------------------------------------------------------------
 %% --------------------------------------------------------------------
+%% Legacy mode callbacks
 
 
--export([
-    client_connect/3,
-    client_disconnect/3,
-    client_subscribe/3,
-    client_unsubscribe/3,
-    client_authn/3,
-    client_authz/3,
-    client_publish/3,
-    client_puback/3,
-    client_pubrec/3,
-    client_pubrel/3,
-    client_pubcomp/3,
-    msg_route/3,
-    msg_forward/3,
-    msg_handle_forward/3,
-    broker_publish/2,
-
-    %% Start Span when Reply PACKETs generated
-    %% as when `emqx_channel:handle_out/3` called.
-    %% Stop when `emqx_channel:handle_outgoing/3` returned
-    outgoing/3
-]).
+%% TODO: legacy mode compatible
+%% XXX: not implemented by callback
+%% -callback trace_process_publish(Packet, ChannelInfo, fun((Packet) -> Res)) -> Res when
+%%     Packet :: emqx_types:packet(),
+%%     ChannelInfo :: channel_info(),
+%%     Res :: term().
+
+%% -callback start_trace_send(list(emqx_types:deliver()), channel_info()) ->
+%%     list(emqx_types:deliver()).
+
+%% -callback end_trace_send(emqx_types:packet() | [emqx_types:packet()]) -> ok.
+%% -export([
+%%     trace_process_publish/3,
+%%     start_trace_send/2,
+%%     end_trace_send/1
+%% ]).
 
 
 -export([
 -export([
-    add_span_attrs/1,
-    add_span_event/2,
     msg_attrs/1
     msg_attrs/1
 ]).
 ]).
 
 
@@ -180,28 +156,6 @@ when
     unregister_provider/1
     unregister_provider/1
 ]).
 ]).
 
 
--define(PROVIDER, {?MODULE, trace_provider}).
-
-%% TODO:
-%% check both trace_mode and trace_provider
--define(with_provider(IfRegistered, IfNotRegistered),
-    case persistent_term:get(?PROVIDER, undefined) of
-        undefined ->
-            IfNotRegistered;
-        Provider ->
-            Provider:IfRegistered
-    end
-).
-
--define(with_provider_attrs(IfRegistered),
-    case persistent_term:get(?PROVIDER, undefined) of
-        undefined ->
-            #{};
-        _Provider ->
-            IfRegistered
-    end
-).
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% provider API
 %% provider API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
@@ -233,189 +187,11 @@ provider() ->
 %% Trace in Rich mode API
 %% Trace in Rich mode API
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-%% @doc Start a trace event for Client CONNECT
--spec client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_connect(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Start a trace event for Client DISCONNECT
--spec client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_disconnect(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Start a trace event for Client SUBSCRIBE
--spec client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_subscribe(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Start a trace event for Client UNSUBSCRIBE
--spec client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_unsubscribe(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Start a sub-span for Client AUTHN
--spec client_authn(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_authn(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Start a sub-span for Client AUTHZ
--spec client_authz(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_authz(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% TODO:
-%% split to:
-%% `client_publish/3` for legacy_mode
-%% `trace_client_publish/3` for end_to_end_mode
-
-%% @doc Trace message PUBLISH (QoS=0, QoS=1, QoS=2)
-%% Client(Publisher) -> Broker: PUBLISH
--spec client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_publish(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Trace message PUBACK (QoS=1)
-%% Client(Subscriber) -> Broker: PUBACK
--spec client_puback(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_puback(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Trace message PUBREC (QoS=2)
-%% Client(Subscriber) -> Broker: PUBREC
--spec client_pubrec(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_pubrec(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Trace message PUBREL (QoS=2)
-%% Client(Publisher) -> Broker: PUBREL
--spec client_pubrel(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_pubrel(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
-%% @doc Trace message PUBCOMP (QoS=2)
-%% Client(Subscriber) -> Broker: PUBCOMP
--spec client_pubcomp(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
-    Packet :: emqx_types:packet(),
-    InitAttrs :: attrs(),
-    Res :: term().
-client_pubcomp(Packet, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
-
--spec msg_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
-    Delivery :: emqx_types:delivery(),
-    InitAttrs :: attrs(),
-    Res :: term().
-msg_route(Delivery, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
-
-%% @doc Trace message forwarding
-%% `Span' is the smallest unit in tracing and `CANNOT' be propagated across nodes.
-%%  Divide the message forwarding process into two spans: `message.forward` and `message.handle_forward`.
-%% The span `message.forward` starts on the publisher process and ends on the same one.
-%% Broker(Publisher) -> Broker(Subscriber): FORWARD on Broker(Publisher)
--spec msg_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
-    Delivery :: emqx_types:delivery(),
-    InitAttrs :: attrs(),
-    Res :: term().
-msg_forward(Delivery, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
-
-%% @doc Trace message forward handling
-%% The span `message.handle_forward` starts on the RPC process and ends on the same one.
-%% Broker(Publisher) -> Broker(Subscriber): HANDLE FORWARD on Broker(Subscriber)
--spec msg_handle_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
-    InitAttrs :: attrs(),
-    Delivery :: emqx_types:delivery(),
-    Res :: term().
-msg_handle_forward(Delivery, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
-
-%% TODO:
-%% split to:
-%% `broker_publish/3` for end_to_end_mode
-%% `start_trace_send/2` and `end_trace_send/1` for legacy_mode
-
-%% @doc Start Trace message delivery to subscriber
--spec broker_publish(
-    list(Deliver),
-    Attrs
-) -> list(Deliver) when
-    Deliver :: emqx_types:deliver(),
-    Attrs :: attrs().
-broker_publish(DeliverOrPackets, Attrs) ->
-    ?with_provider(
-        ?FUNCTION_NAME(DeliverOrPackets, Attrs),
-        DeliverOrPackets
-    ).
-
--spec outgoing(
-    TraceAction,
-    Packet,
-    Attrs
-) ->
-    Res
-when
-    TraceAction :: ?EXT_TRACE_START | ?EXT_TRACE_STOP,
-    Packet :: emqx_types:packet(),
-    Attrs :: attrs(),
-    Res :: term().
-outgoing(TraceAction, Packet, Attrs) ->
-    ?with_provider(
-        ?FUNCTION_NAME(TraceAction, Packet, Attrs),
-        res_without_provider(TraceAction, Packet)
-    ).
-
-%% --------------------------------------------------------------------
-%% Span enrichments APIs
-%% --------------------------------------------------------------------
-
-%% @doc Enrich trace attributes
--spec add_span_attrs(AttrsOrMeta) -> ok when
-    AttrsOrMeta :: attrs() | attrs_meta().
-add_span_attrs(AttrsOrMeta) ->
-    _ = catch ?with_provider(?FUNCTION_NAME(AttrsOrMeta), ok),
-    ok.
-
-%% @doc Add trace event
--spec add_span_event(EventName, AttrsOrMeta) -> ok when
-    EventName :: event_name(),
-    AttrsOrMeta :: attrs() | attrs_meta().
-add_span_event(EventName, AttrsOrMeta) ->
-    _ = catch ?with_provider(?FUNCTION_NAME(EventName, AttrsOrMeta), ok),
-    ok.
-
+msg_attrs(_Msg = #message{flags = #{sys := true}}) ->
+    %% Skip system messages
+    #{};
 msg_attrs(Msg = #message{}) ->
 msg_attrs(Msg = #message{}) ->
-    ?with_provider_attrs(#{
+    #{
         'message.msgid' => emqx_guid:to_hexstr(Msg#message.id),
         'message.msgid' => emqx_guid:to_hexstr(Msg#message.id),
         'message.qos' => Msg#message.qos,
         'message.qos' => Msg#message.qos,
         'message.from' => Msg#message.from,
         'message.from' => Msg#message.from,
@@ -425,17 +201,12 @@ msg_attrs(Msg = #message{}) ->
             maps:get(properties, Msg#message.headers, #{})
             maps:get(properties, Msg#message.headers, #{})
         ),
         ),
         'message.payload_size' => size(Msg#message.payload)
         'message.payload_size' => size(Msg#message.payload)
-    }).
+    }.
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
-res_without_provider(?EXT_TRACE_START, Any) ->
-    Any;
-res_without_provider(?EXT_TRACE_STOP, _Packets) ->
-    ok.
-
 %% TODO:
 %% TODO:
 %% 1. Add more checks for the provider module and functions
 %% 1. Add more checks for the provider module and functions
 %% 2. Add more checks for the trace functions
 %% 2. Add more checks for the trace functions

+ 2 - 23
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -58,8 +58,7 @@
 %% Span enrichments APIs
 %% Span enrichments APIs
 
 
 -export([
 -export([
-    add_span_attrs/1,
-    add_span_event/2
+    add_span_attrs/1
 ]).
 ]).
 
 
 -include("emqx_otel_trace.hrl").
 -include("emqx_otel_trace.hrl").
@@ -724,11 +723,7 @@ awaiting_span_name(?PUBCOMP) ->
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec add_span_attrs(AttrsOrMeta) -> ok when
 -spec add_span_attrs(AttrsOrMeta) -> ok when
-    AttrsOrMeta :: attrs() | attrs_meta().
-add_span_attrs(?EXT_TRACE_ATTRS_META(_Meta)) ->
-    %% TODO
-    %% add_span_attrs(meta_to_attrs(Meta));
-    ok;
+    AttrsOrMeta :: attrs().
 add_span_attrs(Attrs) ->
 add_span_attrs(Attrs) ->
     true = ?set_attributes(Attrs),
     true = ?set_attributes(Attrs),
     ok.
     ok.
@@ -738,22 +733,6 @@ add_span_attrs(Attrs, Ctx) ->
     otel_span:set_attributes(CurrentSpanCtx, Attrs),
     otel_span:set_attributes(CurrentSpanCtx, Attrs),
     ok.
     ok.
 
 
-%% TODO: remove me
--spec add_span_event(EventName, Attrs) -> ok when
-    EventName :: event_name(),
-    Attrs :: attrs() | attrs_meta().
-add_span_event(_EventName, ?EXT_TRACE_ATTRS_META(_Meta)) ->
-    %% TODO
-    %% add_span_event(_EventName, meta_to_attrs(_Meta));
-    ok;
-add_span_event(EventName, Attrs) ->
-    %% TODO
-    %% The otel ctx is in Packet or Delivery
-    %% not in the current process dictionary
-    %% get it by internal_extra_key(Packet)
-    true = ?add_event(EventName, Attrs),
-    ok.
-
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% Internal functions
 %% Internal functions
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------

+ 1 - 0
apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler.erl

@@ -140,6 +140,7 @@ setup(Opts) ->
 description(_Opts) ->
 description(_Opts) ->
     <<"AttributeSampler">>.
     <<"AttributeSampler">>.
 
 
+%% TODO: remote sampled
 should_sample(Ctx, _TraceId, _Links, ?MSG_ROUTE_SPAN_NAME, _SpanKind, _Attributes, _Opts) ->
 should_sample(Ctx, _TraceId, _Links, ?MSG_ROUTE_SPAN_NAME, _SpanKind, _Attributes, _Opts) ->
     ParentSpanCtx = otel_tracer:current_span_ctx(Ctx),
     ParentSpanCtx = otel_tracer:current_span_ctx(Ctx),
     case ParentSpanCtx of
     case ParentSpanCtx of