Преглед изворни кода

fix: remove span `message.dispatch`

JimMoen пре 1 година
родитељ
комит
74484772cd

+ 2 - 10
apps/emqx/src/emqx_broker.erl

@@ -379,7 +379,7 @@ do_route(Routes, Delivery, PersistRes) ->
     ).
 
 do_route2({To, Node}, Delivery) when Node =:= node() ->
-    {Node, To, dispatch_local(To, Delivery)};
+    {Node, To, do_dispatch(To, Delivery)};
 do_route2({To, Node}, Delivery) when is_atom(Node) ->
     {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
 do_route2({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
@@ -452,15 +452,6 @@ do_forward(Node, To, Delivery, sync) ->
 %% `emqx_broker_proto_v1:forward_async/3`
 dispatch(Topic, Delivery = #delivery{message = Msg}) ->
     emqx_external_trace:msg_handle_forward(
-        Delivery,
-        emqx_external_trace:msg_attrs(Msg),
-        fun(DeliveryWithTrace) ->
-            dispatch_local(Topic, DeliveryWithTrace)
-        end
-    ).
-
-dispatch_local(Topic, Delivery = #delivery{message = Msg}) ->
-    emqx_external_trace:msg_dispatch(
         Delivery,
         emqx_external_trace:msg_attrs(Msg),
         fun(DeliveryWithTrace) ->
@@ -468,6 +459,7 @@ dispatch_local(Topic, Delivery = #delivery{message = Msg}) ->
         end
     ).
 
+%% @doc Dispatch message to local subscribers.
 -spec do_dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) ->
     emqx_types:deliver_result().
 do_dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) ->

+ 1 - 14
apps/emqx/src/emqx_external_trace.erl

@@ -53,7 +53,7 @@
     Res :: term().
 
 %% Message Processing Spans
-%% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DISPATCH -> DELIVER(to Subscribers)
+%% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
 -callback client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),
     InitAttrs :: attrs(),
@@ -84,11 +84,6 @@
     Delivery :: emqx_types:delivery(),
     Res :: term().
 
--callback msg_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
-    InitAttrs :: attrs(),
-    Delivery :: emqx_types:delivery(),
-    Res :: term().
-
 %% @doc Trace message forwarding
 %% The span `message.forward` always starts in the publisher process and ends in the subscriber process.
 %% They are logically two unrelated processes. So the SpanCtx always need to be propagated.
@@ -163,7 +158,6 @@ when
     client_pubrel/3,
     client_pubcomp/3,
     msg_route/3,
-    msg_dispatch/3,
     msg_forward/3,
     msg_handle_forward/3,
     msg_deliver/2,
@@ -344,13 +338,6 @@ client_pubcomp(Packet, InitAttrs, ProcessFun) ->
 msg_route(Delivery, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
 
--spec msg_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
-    Delivery :: emqx_types:delivery(),
-    InitAttrs :: attrs(),
-    Res :: term().
-msg_dispatch(Delivery, InitAttrs, ProcessFun) ->
-    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
-
 %% @doc Trace message forwarding
 %% `Span' is the smallest unit in tracing and `CANNOT' be propagated across nodes.
 %%  Divide the message forwarding process into two spans: `message.forward` and `message.handle_forward`.

+ 0 - 1
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -36,7 +36,6 @@
 -define(CLIENT_PUBCOMP_SPAN_NAME, <<"client.pubcomp">>).
 
 -define(MSG_ROUTE_SPAN_NAME, <<"message.route">>).
--define(MSG_DISPATCH_SPAN_NAME, <<"message.dispatch">>).
 -define(MSG_FORWARD_SPAN_NAME, <<"message.forward">>).
 -define(MSG_HANDLE_FORWARD_SPAN_NAME, <<"message.handle_forward">>).
 -define(MSG_DELIVER_SPAN_NAME, <<"message.deliver">>).

+ 1 - 27
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -37,14 +37,13 @@
     client_authz/3,
 
     %% Message Processing Spans (From Client)
-    %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DISPATCH -> DELIVER(to Subscribers)
+    %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
     client_publish/3,
     client_puback/3,
     client_pubrec/3,
     client_pubrel/3,
     client_pubcomp/3,
     msg_route/3,
-    msg_dispatch/3,
     msg_forward/3,
     msg_handle_forward/3,
     msg_deliver/2,
@@ -401,31 +400,6 @@ msg_route(Delivery, Attrs, Fun) ->
             )
     end.
 
--spec msg_dispatch(
-    Delivery,
-    Attrs,
-    fun((Delivery) -> Res)
-) ->
-    Res
-when
-    Delivery :: emqx_types:delivery(),
-    Attrs :: attrs(),
-    Res :: emqx_types:deliver_result().
-msg_dispatch(Delivery, Attrs, Fun) ->
-    case ignore_delivery(Delivery) of
-        true ->
-            Fun(Delivery);
-        false ->
-            _ = otel_ctx:detach(get_ctx(Delivery)),
-            ?with_span(
-                ?MSG_DISPATCH_SPAN_NAME,
-                #{attributes => Attrs},
-                fun(_SpanCtx) ->
-                    Fun(put_ctx(otel_ctx:get_current(), Delivery))
-                end
-            )
-    end.
-
 -spec msg_forward(
     Delivery,
     Attrs,