Просмотр исходного кода

feat(otel): trace message route, forward and dispatch

JimMoen 1 год назад
Родитель
Сommit
a82fd56275

+ 2 - 2
apps/emqx/include/emqx_external_trace.hrl

@@ -26,8 +26,8 @@
     {emqx_ext_trace, ?ATTRS_META, Meta}
 ).
 
--define(ext_trace_add_attrs(META),
-    emqx_external_trace:add_span_attrs(META)
+-define(ext_trace_add_attrs(ATTRS_OR_META),
+    emqx_external_trace:add_span_attrs(ATTRS_OR_META)
 ).
 
 -define(ext_trace_add_event(EVENT_NAME, TRACE_ATTRS),

+ 54 - 3
apps/emqx/src/emqx_broker.erl

@@ -20,6 +20,7 @@
 
 -include("emqx.hrl").
 -include("emqx_router.hrl").
+-include("emqx_external_trace.hrl").
 
 -include("logger.hrl").
 -include("types.hrl").
@@ -291,7 +292,7 @@ do_publish(#message{topic = Topic} = Msg) ->
     PersistRes = persist_publish(Msg),
     Routes = aggre(emqx_router:match_routes(Topic)),
     Delivery = delivery(Msg),
-    RouteRes = route(Routes, Delivery, PersistRes),
+    RouteRes = route_with_trace(Routes, Delivery, PersistRes),
     do_forward_external(Delivery, RouteRes).
 
 persist_publish(Msg) ->
@@ -339,11 +340,27 @@ delivery(Msg) -> #delivery{sender = self(), message = Msg}.
 %% Route
 %%--------------------------------------------------------------------
 
+route_with_trace(Routes, Delivery, PersistRes) ->
+    emqx_external_trace:trace_route(
+        Delivery,
+        #{
+            'message.route.msg_from_node' => node(),
+            'message.route.msg_id' => Delivery#delivery.message#message.id
+        },
+        fun(DeliveryWithTrace) ->
+            route(Routes, DeliveryWithTrace, PersistRes)
+        end
+    ).
+
 -spec route([emqx_types:route_entry()], emqx_types:delivery(), nil() | [persisted]) ->
     emqx_types:publish_result().
 route([], #delivery{message = Msg}, _PersistRes = []) ->
     ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
     ok = inc_dropped_cnt(Msg),
+    ?ext_trace_add_attrs(#{
+        'message.route.dropped.node' => node(),
+        'message.route.dropped.reason' => no_subscribers
+    }),
     [];
 route([], _Delivery, PersistRes = [_ | _]) ->
     PersistRes;
@@ -357,10 +374,11 @@ route(Routes, Delivery, PersistRes) ->
     ).
 
 do_route({To, Node}, Delivery) when Node =:= node() ->
-    {Node, To, dispatch(To, Delivery)};
+    {Node, To, dispatch_with_trace(To, Delivery)};
 do_route({To, Node}, Delivery) when is_atom(Node) ->
-    {Node, To, forward(Node, To, Delivery, emqx:get_config([rpc, mode]))};
+    {Node, To, forward_with_trace(Node, To, Delivery, emqx:get_config([rpc, mode]))};
 do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
+    %% TODO: trace shared-sub dispatch
     {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
 
 aggre([]) ->
@@ -384,6 +402,19 @@ aggre([], true, Acc) ->
 do_forward_external(Delivery, RouteRes) ->
     emqx_external_broker:forward(Delivery) ++ RouteRes.
 
+forward_with_trace(Node, To, Delivery, RpcMode) ->
+    emqx_external_trace:trace_forward(
+        Delivery,
+        #{
+            'message.forward.to_topic' => To,
+            'message.forward.to_node' => Node,
+            'message.forward.mode' => RpcMode
+        },
+        fun(DeliveryWithTrace) ->
+            forward(Node, To, DeliveryWithTrace, RpcMode)
+        end
+    ).
+
 %% @doc Forward message to another node.
 -spec forward(
     node(), emqx_types:topic() | emqx_types:share(), emqx_types:delivery(), RpcMode :: sync | async
@@ -410,6 +441,18 @@ forward(Node, To, Delivery, sync) ->
             Result
     end.
 
+dispatch_with_trace(Topic, Delivery) ->
+    emqx_external_trace:trace_dispatch(
+        Delivery,
+        #{
+            'message.dispatch.from' => pid_to_binary(Delivery#delivery.sender),
+            'message.dispatch.to_topic' => Topic
+        },
+        fun(DeliveryWithTrace) ->
+            dispatch(Topic, DeliveryWithTrace)
+        end
+    ).
+
 -spec dispatch(emqx_types:topic() | emqx_types:share(), emqx_types:delivery()) ->
     emqx_types:deliver_result().
 dispatch(Topic, Delivery = #delivery{}) when is_binary(Topic) ->
@@ -676,6 +719,9 @@ do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
         true ->
             SubPid ! {deliver, Topic, Msg},
+            ?ext_trace_add_attrs(#{
+                'message.dispatch.to_subscriber' => pid_to_binary(SubPid)
+            }),
             1;
         false ->
             0
@@ -736,3 +782,8 @@ regular_sync_route(add, Topic) ->
     emqx_router:do_add_route(Topic, node());
 regular_sync_route(delete, Topic) ->
     emqx_router:do_delete_route(Topic, node()).
+
+pid_to_binary(Pid) when is_pid(Pid) ->
+    iolist_to_binary(pid_to_list(Pid));
+pid_to_binary(_) ->
+    <<>>.

+ 48 - 6
apps/emqx/src/emqx_external_trace.erl

@@ -51,6 +51,21 @@
     InitAttrs :: attrs(),
     Res :: term().
 
+-callback trace_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+    InitAttrs :: attrs(),
+    Delivery :: emqx_types:delivery(),
+    Res :: term().
+
+-callback trace_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+    InitAttrs :: attrs(),
+    Delivery :: emqx_types:delivery(),
+    Res :: term().
+
+-callback trace_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+    InitAttrs :: attrs(),
+    Delivery :: emqx_types:delivery(),
+    Res :: term().
+
 %% --------------------------------------------------------------------
 %% Span enrichments APIs
 
@@ -90,13 +105,13 @@
 -export([
     trace_client_connect/3,
     trace_client_disconnect/3,
-    trace_client_subscribe/3,
-    trace_client_unsubscribe/3
-]).
-
--export([
     trace_client_authn/3,
-    trace_client_authz/3
+    trace_client_authz/3,
+    trace_client_subscribe/3,
+    trace_client_unsubscribe/3,
+    trace_route/3,
+    trace_dispatch/3,
+    trace_forward/3
 ]).
 
 -export([
@@ -115,6 +130,8 @@
 
 -define(PROVIDER, {?MODULE, trace_provider}).
 
+%% TODO:
+%% check both trace_mode and trace_provider
 -define(with_provider(IfRegistered, IfNotRegistered),
     case persistent_term:get(?PROVIDER, undefined) of
         undefined ->
@@ -203,6 +220,27 @@ trace_client_authn(Packet, InitAttrs, ProcessFun) ->
 trace_client_authz(Packet, InitAttrs, ProcessFun) ->
     ?with_provider(?FUNCTION_NAME(Packet, InitAttrs, ProcessFun), ProcessFun(Packet)).
 
+-spec trace_route(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+    Delivery :: emqx_types:delivery(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_route(Delivery, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
+
+-spec trace_dispatch(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+    Delivery :: emqx_types:delivery(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_dispatch(Delivery, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
+
+-spec trace_forward(Delivery, InitAttrs, fun((Delivery) -> Res)) -> Res when
+    Delivery :: emqx_types:delivery(),
+    InitAttrs :: attrs(),
+    Res :: term().
+trace_forward(Delivery, InitAttrs, ProcessFun) ->
+    ?with_provider(?FUNCTION_NAME(Delivery, InitAttrs, ProcessFun), ProcessFun(Delivery)).
+
 %% --------------------------------------------------------------------
 %% Span enrichments APIs
 %% --------------------------------------------------------------------
@@ -226,6 +264,10 @@ add_span_event(EventName, AttrsOrMeta) ->
 %% Legacy trace API
 %%--------------------------------------------------------------------
 
+%% TODO:
+%% split to:
+%% `trace_process_publish/3` for legacy_mode
+%% `trace_client_publish/3` for end_to_end_mode
 %% @doc Trace message processing from publisher
 -spec trace_process_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when
     Packet :: emqx_types:packet(),

+ 3 - 0
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -23,6 +23,9 @@
 -define(CLIENT_AUTHZ_SPAN_NAME, <<"client.authz">>).
 -define(CLIENT_SUBSCRIBE_SPAN_NAME, <<"client.subscribe">>).
 -define(CLIENT_UNSUBSCRIBE_SPAN_NAME, <<"client.unsubscribe">>).
+-define(MSG_ROUTE_SPAN_NAME, <<"message.route">>).
+-define(MSG_DISPATCH_SPAN_NAME, <<"message.dispatch">>).
+-define(MSG_FORWARD_SPAN_NAME, <<"message.forward">>).
 
 %% -define(MSG_SEND_SPAN_NAME, <<"message.send">>).
 

+ 80 - 2
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -31,10 +31,13 @@
 -export([
     trace_client_connect/3,
     trace_client_disconnect/3,
+    trace_client_subscribe/3,
+    trace_client_unsubscribe/3,
     trace_client_authn/3,
     trace_client_authz/3,
-    trace_client_subscribe/3,
-    trace_client_unsubscribe/3
+    trace_route/3,
+    trace_dispatch/3,
+    trace_forward/3
 ]).
 
 %% --------------------------------------------------------------------
@@ -282,6 +285,78 @@ trace_client_authz(Packet, _Attrs, ProcessFun) ->
         end
     ).
 
+-spec trace_route(
+    Delivery,
+    Attrs,
+    fun(() -> Res)
+) ->
+    Res
+when
+    Delivery :: emqx_types:delivery(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_route(Delivery, Attrs, Fun) ->
+    case ignore_delivery(Delivery) of
+        true ->
+            Fun(Delivery);
+        false ->
+            ?with_span(
+                ?MSG_ROUTE_SPAN_NAME,
+                #{attributes => Attrs},
+                fun(_SpanCtx) ->
+                    Fun(Delivery)
+                end
+            )
+    end.
+
+-spec trace_dispatch(
+    Delivery,
+    Attrs,
+    fun(() -> Res)
+) ->
+    Res
+when
+    Delivery :: emqx_types:delivery(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_dispatch(Delivery, Attrs, Fun) ->
+    case ignore_delivery(Delivery) of
+        true ->
+            Fun(Delivery);
+        false ->
+            ?with_span(
+                ?MSG_DISPATCH_SPAN_NAME,
+                #{attributes => Attrs},
+                fun(_SpanCtx) ->
+                    Fun(Delivery)
+                end
+            )
+    end.
+
+-spec trace_forward(
+    Delivery,
+    Attrs,
+    fun(() -> Res)
+) ->
+    Res
+when
+    Delivery :: emqx_types:delivery(),
+    Attrs :: attrs(),
+    Res :: term().
+trace_forward(Delivery, Attrs, Fun) ->
+    case ignore_delivery(Delivery) of
+        true ->
+            Fun(Delivery);
+        false ->
+            ?with_span(
+                ?MSG_FORWARD_SPAN_NAME,
+                #{attributes => Attrs},
+                fun(_SpanCtx) ->
+                    Fun(Delivery)
+                end
+            )
+    end.
+
 %% --------------------------------------------------------------------
 %% Legacy trace API
 %% --------------------------------------------------------------------
@@ -423,6 +498,9 @@ should_trace_context(RootCtx) ->
 should_trace_packet(Packet) ->
     not is_sys(emqx_packet:info(topic_name, Packet)).
 
+ignore_delivery(#delivery{message = #message{topic = Topic}}) ->
+    is_sys(Topic).
+
 %% TODO: move to emqx_topic module?
 is_sys(<<"$SYS/", _/binary>> = _Topic) -> true;
 is_sys(_Topic) -> false.