Просмотр исходного кода

refactor: external trace callbacks func rename

JimMoen 1 год назад
Родитель
Сommit
7dec71c94d

+ 3 - 3
apps/emqx/src/emqx_broker.erl

@@ -341,7 +341,7 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
 %%--------------------------------------------------------------------
 
 route_with_trace(Routes, Delivery, PersistRes) ->
-    emqx_external_trace:trace_route(
+    emqx_external_trace:msg_route(
         Delivery,
         #{
             'message.route.msg_from_node' => node(),
@@ -400,7 +400,7 @@ do_forward_external(Delivery, RouteRes) ->
     emqx_external_broker:forward(Delivery) ++ RouteRes.
 
 forward_with_trace(Node, To, Delivery, RpcMode) ->
-    emqx_external_trace:trace_forward(
+    emqx_external_trace:msg_forward(
         Delivery,
         #{
             'message.forward.to_topic' => To,
@@ -439,7 +439,7 @@ forward(Node, To, Delivery, sync) ->
     end.
 
 dispatch_with_trace(Topic, Delivery) ->
-    emqx_external_trace:trace_dispatch(
+    emqx_external_trace:msg_dispatch(
         Delivery,
         #{
             'message.dispatch.from' => pid_to_binary(Delivery#delivery.sender),

+ 7 - 7
apps/emqx/src/emqx_channel.erl

@@ -360,7 +360,7 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = ConnState}) when
 handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
     handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
 handle_in(?PACKET(?CONNECT) = Packet, Channel) ->
-    emqx_external_trace:trace_client_connect(
+    emqx_external_trace:client_connect(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) ->
@@ -509,7 +509,7 @@ handle_in(
             {ok, Channel}
     end;
 handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Channel) ->
-    emqx_external_trace:trace_client_subscribe(
+    emqx_external_trace:client_subscribe(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) -> process_subscribe(PacketWithTrace, Channel) end
@@ -518,7 +518,7 @@ handle_in(
     Packet = ?UNSUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters),
     Channel
 ) ->
-    emqx_external_trace:trace_client_unsubscribe(
+    emqx_external_trace:client_unsubscribe(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) -> process_unsubscribe(PacketWithTrace, Channel) end
@@ -531,7 +531,7 @@ handle_in(
     ?DISCONNECT_PACKET() = Packet,
     Channel
 ) ->
-    emqx_external_trace:trace_client_disconnect(
+    emqx_external_trace:client_disconnect(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end
@@ -1958,7 +1958,7 @@ authenticate(?PACKET(?AUTH) = Packet, Channel) ->
     %% TODO: extended authentication sub-span
     process_authenticate(Packet, Channel);
 authenticate(Packet, Channel) ->
-    emqx_external_trace:trace_client_authn(
+    emqx_external_trace:client_authn(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) ->
@@ -2234,7 +2234,7 @@ authz_action(#message{qos = QoS}) ->
 %% Check Pub Authorization
 
 check_pub_authz(Packet, Channel) ->
-    emqx_external_trace:trace_client_authz(
+    emqx_external_trace:client_authz(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) ->
@@ -2292,7 +2292,7 @@ check_subscribe(SubPkt, _Channel) ->
 %% Check Sub Authorization
 
 check_sub_authzs(Packet, Channel) ->
-    emqx_external_trace:trace_client_authz(
+    emqx_external_trace:client_authz(
         Packet,
         init_trace_attrs(Packet, Channel),
         fun(PacketWithTrace) ->

+ 38 - 38
apps/emqx/src/emqx_external_trace.erl

@@ -21,47 +21,47 @@
 %% Trace in Rich mode callbacks
 
 %% Client Connect/Disconnect
--callback trace_client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
 
--callback trace_client_authn(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
 
--callback trace_client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
 
--callback trace_client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
 
--callback trace_client_authz(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_authn(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
 
--callback trace_client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-callback client_authz(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
 
--callback trace_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+-callback msg_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     InitAttrs :: attrs(),
     Delivery :: emqx_types:delivery(),
     Res :: term().
 
--callback trace_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+-callback msg_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     InitAttrs :: attrs(),
     Delivery :: emqx_types:delivery(),
     Res :: term().
 
--callback trace_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+-callback msg_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     InitAttrs :: attrs(),
     Delivery :: emqx_types:delivery(),
     Res :: term().
@@ -95,23 +95,23 @@
     [
         add_span_attrs/1,
         add_span_event/2,
-        trace_client_authn/3,
-        trace_client_authz/3
+        client_authn/3,
+        client_authz/3
     ]
 ).
 
 %% --------------------------------------------------------------------
 
 -export([
-    trace_client_connect/3,
-    trace_client_disconnect/3,
-    trace_client_authn/3,
-    trace_client_authz/3,
-    trace_client_subscribe/3,
-    trace_client_unsubscribe/3,
-    trace_route/3,
-    trace_dispatch/3,
-    trace_forward/3
+    client_connect/3,
+    client_disconnect/3,
+    client_subscribe/3,
+    client_unsubscribe/3,
+    client_authn/3,
+    client_authz/3,
+    msg_route/3,
+    msg_dispatch/3,
+    msg_forward/3
 ]).
 
 -export([
@@ -173,72 +173,72 @@ provider() ->
 %%--------------------------------------------------------------------
 
 %% @doc Start a trace event for Client CONNECT
--spec trace_client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-spec client_connect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_client_connect(Packet, InitAttrs, ProcessFun) ->
+client_connect(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
 %% @doc Start a trace event for Client DISCONNECT
--spec trace_client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-spec client_disconnect(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_client_disconnect(Packet, InitAttrs, ProcessFun) ->
+client_disconnect(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
 %% @doc Start a trace event for Client SUBSCRIBE
--spec trace_client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-spec client_subscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_client_subscribe(Packet, InitAttrs, ProcessFun) ->
+client_subscribe(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
 %% @doc Start a trace event for Client UNSUBSCRIBE
--spec trace_client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-spec client_unsubscribe(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_client_unsubscribe(Packet, InitAttrs, ProcessFun) ->
+client_unsubscribe(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
 %% @doc Start a sub-span for Client AUTHN
--spec trace_client_authn(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-spec client_authn(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_client_authn(Packet, InitAttrs, ProcessFun) ->
+client_authn(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
 %% @doc Start a sub-span for Client AUTHZ
--spec trace_client_authz(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
+-spec client_authz(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_client_authz(Packet, InitAttrs, ProcessFun) ->
+client_authz(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
--spec trace_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+-spec msg_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     Delivery :: emqx_types:delivery(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_route(Delivery, InitAttrs, ProcessFun) ->
+msg_route(Delivery, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
 
--spec trace_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+-spec msg_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     Delivery :: emqx_types:delivery(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_dispatch(Delivery, InitAttrs, ProcessFun) ->
+msg_dispatch(Delivery, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
 
--spec trace_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+-spec msg_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
     Delivery :: emqx_types:delivery(),
     InitAttrs :: attrs(),
     Res :: term().
-trace_forward(Delivery, InitAttrs, ProcessFun) ->
+msg_forward(Delivery, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
 
 %% --------------------------------------------------------------------

+ 27 - 28
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -29,15 +29,15 @@
 %% Rich Trace Mode callbacks
 
 -export([
-    trace_client_connect/3,
-    trace_client_disconnect/3,
-    trace_client_subscribe/3,
-    trace_client_unsubscribe/3,
-    trace_client_authn/3,
-    trace_client_authz/3,
-    trace_route/3,
-    trace_dispatch/3,
-    trace_forward/3
+    client_connect/3,
+    client_disconnect/3,
+    client_subscribe/3,
+    client_unsubscribe/3,
+    client_authn/3,
+    client_authz/3,
+    msg_route/3,
+    msg_dispatch/3,
+    msg_forward/3
 ]).
 
 %% --------------------------------------------------------------------
@@ -120,7 +120,7 @@ stop() ->
 %% Rich mode trace API
 %%--------------------------------------------------------------------
 
--spec trace_client_connect(
+-spec client_connect(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -130,7 +130,7 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-trace_client_connect(Packet, Attrs, ProcessFun) ->
+client_connect(Packet, Attrs, ProcessFun) ->
     RootCtx = otel_ctx:new(),
     SpanCtx = otel_tracer:start_span(
         RootCtx,
@@ -143,7 +143,6 @@ trace_client_connect(Packet, Attrs, ProcessFun) ->
     Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
     _ = otel_ctx:attach(Ctx),
     CurrentSpanCtx = otel_tracer:current_span_ctx(),
-    io:format("current span ctx: ~p~n", [CurrentSpanCtx]),
     try
         ProcessFun(Packet)
     after
@@ -151,7 +150,7 @@ trace_client_connect(Packet, Attrs, ProcessFun) ->
         clear()
     end.
 
--spec trace_client_disconnect(
+-spec client_disconnect(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -161,7 +160,7 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-trace_client_disconnect(Packet, Attrs, ProcessFun) ->
+client_disconnect(Packet, Attrs, ProcessFun) ->
     RootCtx = otel_ctx:new(),
     SpanCtx = otel_tracer:start_span(
         RootCtx,
@@ -180,7 +179,7 @@ trace_client_disconnect(Packet, Attrs, ProcessFun) ->
         clear()
     end.
 
--spec trace_client_subscribe(
+-spec client_subscribe(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -190,7 +189,7 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-trace_client_subscribe(Packet, Attrs, ProcessFun) ->
+client_subscribe(Packet, Attrs, ProcessFun) ->
     RootCtx = otel_ctx:new(),
     SpanCtx = otel_tracer:start_span(
         RootCtx,
@@ -210,7 +209,7 @@ trace_client_subscribe(Packet, Attrs, ProcessFun) ->
         clear()
     end.
 
--spec trace_client_unsubscribe(
+-spec client_unsubscribe(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -220,7 +219,7 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-trace_client_unsubscribe(Packet, Attrs, ProcessFun) ->
+client_unsubscribe(Packet, Attrs, ProcessFun) ->
     RootCtx = otel_ctx:new(),
     SpanCtx = otel_tracer:start_span(
         RootCtx,
@@ -240,7 +239,7 @@ trace_client_unsubscribe(Packet, Attrs, ProcessFun) ->
         clear()
     end.
 
--spec trace_client_authn(
+-spec client_authn(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -250,7 +249,7 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-trace_client_authn(Packet, _Attrs, ProcessFun) ->
+client_authn(Packet, _Attrs, ProcessFun) ->
     ?with_span(
         ?CLIENT_AUTHN_SPAN_NAME,
         #{
@@ -263,7 +262,7 @@ trace_client_authn(Packet, _Attrs, ProcessFun) ->
         end
     ).
 
--spec trace_client_authz(
+-spec client_authz(
     Packet,
     Attrs,
     fun((Packet) -> Res)
@@ -273,7 +272,7 @@ when
     Packet :: emqx_types:packet(),
     Attrs :: attrs(),
     Res :: term().
-trace_client_authz(Packet, _Attrs, ProcessFun) ->
+client_authz(Packet, _Attrs, ProcessFun) ->
     ?with_span(
         ?CLIENT_AUTHZ_SPAN_NAME,
         #{
@@ -285,7 +284,7 @@ trace_client_authz(Packet, _Attrs, ProcessFun) ->
         end
     ).
 
--spec trace_route(
+-spec msg_route(
     Delivery,
     Attrs,
     fun(() -> Res)
@@ -295,7 +294,7 @@ when
     Delivery :: emqx_types:delivery(),
     Attrs :: attrs(),
     Res :: term().
-trace_route(Delivery, Attrs, Fun) ->
+msg_route(Delivery, Attrs, Fun) ->
     case ignore_delivery(Delivery) of
         true ->
             Fun(Delivery);
@@ -309,7 +308,7 @@ trace_route(Delivery, Attrs, Fun) ->
             )
     end.
 
--spec trace_dispatch(
+-spec msg_dispatch(
     Delivery,
     Attrs,
     fun(() -> Res)
@@ -319,7 +318,7 @@ when
     Delivery :: emqx_types:delivery(),
     Attrs :: attrs(),
     Res :: term().
-trace_dispatch(Delivery, Attrs, Fun) ->
+msg_dispatch(Delivery, Attrs, Fun) ->
     case ignore_delivery(Delivery) of
         true ->
             Fun(Delivery);
@@ -333,7 +332,7 @@ trace_dispatch(Delivery, Attrs, Fun) ->
             )
     end.
 
--spec trace_forward(
+-spec msg_forward(
     Delivery,
     Attrs,
     fun(() -> Res)
@@ -343,7 +342,7 @@ when
     Delivery :: emqx_types:delivery(),
     Attrs :: attrs(),
     Res :: term().
-trace_forward(Delivery, Attrs, Fun) ->
+msg_forward(Delivery, Attrs, Fun) ->
     case ignore_delivery(Delivery) of
         true ->
             Fun(Delivery);