Просмотр исходного кода

Merge pull request #14310 from JimMoen/fix-e2e-broker-trace

fix: trace broker send disconnect to clients
JianBo He 1 год назад
Родитель
Сommit
d806ca9938

+ 49 - 15
apps/emqx/src/emqx_channel.erl

@@ -205,6 +205,8 @@ info(clientid, #channel{clientinfo = ClientInfo}) ->
     maps:get(clientid, ClientInfo, undefined);
 info(username, #channel{clientinfo = ClientInfo}) ->
     maps:get(username, ClientInfo, undefined);
+info(is_bridge, #channel{clientinfo = ClientInfo}) ->
+    maps:get(is_bridge, ClientInfo, undefined);
 info(session, #channel{session = Session}) ->
     maybe_apply(fun emqx_session:info/1, Session);
 info({session, Info}, #channel{session = Session}) ->
@@ -505,8 +507,11 @@ handle_in(
         client_disconnect,
         Packet,
         (basic_trace_attrs(Channel))#{
-            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
+            'client.proto_name' => info(proto_name, Channel),
+            'client.proto_ver' => info(proto_ver, Channel),
+            'client.is_bridge' => info(is_bridge, Channel),
             'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
+            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
             'client.disconnect.reason_code' => emqx_packet:info(reason_code, _PktVar)
         },
         fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end
@@ -1492,12 +1497,26 @@ handle_call(Req, Channel) ->
     ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}.
 
 handle_info({subscribe, TopicFilters}, Channel) ->
-    NTopicFilters = enrich_subscribe(TopicFilters, Channel),
-    {_TopicFiltersWithRC, NChannel} = post_process_subscribe(NTopicFilters, Channel),
-    {ok, NChannel};
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_subscribe,
+        [],
+        maps:merge(basic_trace_attrs(Channel), topic_filters_attrs({subscribe, TopicFilters})),
+        fun([]) ->
+            NTopicFilters = enrich_subscribe(TopicFilters, Channel),
+            {_TopicFiltersWithRC, NChannel} = post_process_subscribe(NTopicFilters, Channel),
+            {ok, NChannel}
+        end
+    );
 handle_info({unsubscribe, TopicFilters}, Channel) ->
-    {_RC, NChannel} = post_process_unsubscribe(TopicFilters, #{}, Channel),
-    {ok, NChannel};
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_unsubscribe,
+        [],
+        maps:merge(basic_trace_attrs(Channel), topic_filters_attrs({unsubscribe, TopicFilters})),
+        fun([]) ->
+            {_RC, NChannel} = post_process_unsubscribe(TopicFilters, #{}, Channel),
+            {ok, NChannel}
+        end
+    );
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
     shutdown(Reason, Channel);
 handle_info({sock_closed, Reason}, Channel = #channel{conn_state = connecting}) ->
@@ -3144,23 +3163,38 @@ basic_trace_attrs(Channel) ->
     }.
 
 topic_filters_attrs(?PACKET(?SUBSCRIBE, PktVar)) ->
-    {TFs, SubOpts} = lists:foldl(
-        fun({Topic, SubOpts}, {AccTFs, AccSubOpts}) ->
-            {[emqx_topic:maybe_format_share(Topic) | AccTFs], [SubOpts | AccSubOpts]}
-        end,
-        {[], []},
-        emqx_packet:info(topic_filters, PktVar)
-    ),
+    {TFs, SubOpts} = do_topic_filters_attrs(subscribe, emqx_packet:info(topic_filters, PktVar)),
     #{
         'client.subscribe.topics' => emqx_utils_json:encode(lists:reverse(TFs)),
         'client.subscribe.sub_opts' => emqx_utils_json:encode(lists:reverse(SubOpts))
     };
 topic_filters_attrs(?PACKET(?UNSUBSCRIBE, PktVar)) ->
+    {TFs, _} = do_topic_filters_attrs(unsubscribe, emqx_packet:info(topic_filters, PktVar)),
+    #{'client.unsubscribe.topics' => emqx_utils_json:encode(TFs)};
+topic_filters_attrs({subscribe, TopicFilters}) ->
+    {TFs, SubOpts} = do_topic_filters_attrs(subscribe, TopicFilters),
+    #{
+        'broker.subscribe.topics' => emqx_utils_json:encode(lists:reverse(TFs)),
+        'broker.subscribe.sub_opts' => emqx_utils_json:encode(lists:reverse(SubOpts))
+    };
+topic_filters_attrs({unsubscribe, TopicFilters}) ->
+    {TFs, _} = do_topic_filters_attrs(unsubscribe, [TF || {TF, _} <- TopicFilters]),
+    #{'broker.unsubscribe.topics' => emqx_utils_json:encode(TFs)}.
+
+do_topic_filters_attrs(subscribe, TopicFilters) ->
+    {_TFs, _SubOpts} = lists:foldl(
+        fun({Topic, SubOpts}, {AccTFs, AccSubOpts}) ->
+            {[emqx_topic:maybe_format_share(Topic) | AccTFs], [SubOpts | AccSubOpts]}
+        end,
+        {[], []},
+        TopicFilters
+    );
+do_topic_filters_attrs(unsubscribe, TopicFilters) ->
     TFs = [
         emqx_topic:maybe_format_share(Name)
-     || Name <- emqx_packet:info(topic_filters, PktVar)
+     || Name <- TopicFilters
     ],
-    #{'client.unsubscribe.topics' => emqx_utils_json:encode(TFs)}.
+    {TFs, undefined}.
 
 authn_attrs({continue, _Properties, _Channel}) ->
     %% TODO

+ 66 - 1
apps/emqx/src/emqx_cm.erl

@@ -22,6 +22,8 @@
 -include("emqx_cm.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include("emqx_mqtt.hrl").
+-include("emqx_external_trace.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
@@ -457,6 +459,14 @@ discard_session(ClientId) when is_binary(ClientId) ->
 when
     Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick.
 request_stepdown(Action, ConnMod, Pid) ->
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_disconnect,
+        [],
+        maps:merge(basic_trace_attrs(Pid), action_to_reason(Action)),
+        fun([]) -> do_request_stepdown(Action, ConnMod, Pid) end
+    ).
+
+do_request_stepdown(Action, ConnMod, Pid) ->
     Timeout =
         case Action == kick orelse Action == discard of
             true -> ?T_KICK;
@@ -696,7 +706,9 @@ lookup_channels(local, ClientId) ->
     [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
 
 -spec lookup_client(
-    {clientid, emqx_types:clientid()} | {username, emqx_types:username()} | {chan_pid, chan_pid()}
+    {clientid, emqx_types:clientid()}
+    | {username, emqx_types:username()}
+    | {chan_pid, chan_pid()}
 ) ->
     [channel_info()].
 lookup_client({username, Username}) ->
@@ -842,3 +854,56 @@ kick_session_chans(ClientId, ChanPids) ->
             ok
     end,
     lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+
+basic_trace_attrs(Pid) ->
+    %% io:format("lookup_client({chan_pid, Pid}): ~p", [lookup_client({chan_pid, Pid})]),
+    case lookup_client({chan_pid, Pid}) of
+        [] ->
+            #{'channel.pid' => iolist_to_binary(io_lib:format("~p", [Pid]))};
+        [{_Chan, #{clientinfo := ClientInfo, conninfo := ConnInfo}, _Stats}] ->
+            #{
+                'client.clientid' => maps:get(clientid, ClientInfo, undefined),
+                'client.username' => maps:get(username, ClientInfo, undefined),
+                'client.proto_name' => maps:get(proto_name, ConnInfo, undefined),
+                'client.proto_ver' => maps:get(proto_ver, ConnInfo, undefined),
+                'client.is_bridge' => maps:get(is_bridge, ClientInfo, undefined),
+                'client.sockname' => ntoa(maps:get(sockname, ConnInfo, undefined)),
+                'client.peername' => ntoa(maps:get(peername, ConnInfo, undefined))
+            };
+        _ ->
+            #{}
+    end.
+
+action_to_reason(Action) when
+    Action =:= kick orelse
+        Action =:= takeover_kick
+->
+    #{
+        'client.disconnect.reason_code' => ?RC_ADMINISTRATIVE_ACTION,
+        'client.disconnect.reason' => kick
+    };
+action_to_reason(discard) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'client.disconnect.reason' => discard
+    };
+action_to_reason({takeover, 'begin'}) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'client.disconnect.reason' => takeover_begin
+    };
+action_to_reason({takeover, 'end'}) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'disconnect.reason' => takeover_end
+    }.
+
+ntoa(undefined) ->
+    undefined;
+ntoa(IpPort) ->
+    emqx_utils:ntoa(IpPort).
+
+-else.
+-endif.

+ 15 - 0
apps/emqx/src/emqx_external_trace.erl

@@ -56,6 +56,21 @@
     InitAttrs :: attrs(),
     Res :: term().
 
+-callback broker_disconnect(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
+-callback broker_subscribe(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
+-callback broker_unsubscribe(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
 %% Message Processing Spans
 %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
 -callback client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when

+ 4 - 0
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -13,6 +13,10 @@
 -define(CLIENT_UNSUBSCRIBE_SPAN_NAME, 'client.unsubscribe').
 -define(CLIENT_PUBLISH_SPAN_NAME, 'client.publish').
 
+-define(BROKER_DISCONNECT_SPAN_NAME, 'broker.disconnect').
+-define(BROKER_SUBSCRIBE_SPAN_NAME, 'broker.subscribe').
+-define(BROKER_UNSUBSCRIBE_SPAN_NAME, 'broker.unsubscribe').
+
 -define(CLIENT_AUTHN_SPAN_NAME, 'client.authn').
 -define(CLIENT_AUTHZ_SPAN_NAME, 'client.authz').
 

+ 70 - 0
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -24,6 +24,10 @@
     client_authn/3,
     client_authz/3,
 
+    broker_disconnect/3,
+    broker_subscribe/3,
+    broker_unsubscribe/3,
+
     %% Message Processing Spans (From Client)
     %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
     client_publish/3,
@@ -325,6 +329,72 @@ client_authz(Packet, Attrs, ProcessFun) ->
         )
     ).
 
+-spec broker_disconnect(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_disconnect(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_DISCONNECT_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
+-spec broker_subscribe(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_subscribe(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_SUBSCRIBE_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
+-spec broker_unsubscribe(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_unsubscribe(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_UNSUBSCRIBE_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
 -spec client_publish(
     Packet,
     Attrs,

+ 9 - 3
apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler.erl

@@ -180,7 +180,10 @@ should_sample(
         SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
         SpanName =:= ?CLIENT_SUBSCRIBE_SPAN_NAME orelse
         SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME
+        SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME orelse
+        SpanName =:= ?BROKER_DISCONNECT_SPAN_NAME orelse
+        SpanName =:= ?BROKER_SUBSCRIBE_SPAN_NAME orelse
+        SpanName =:= ?BROKER_UNSUBSCRIBE_SPAN_NAME
 ->
     Desicion =
         decide_by_match_rule(Attributes, Opts) orelse
@@ -241,12 +244,15 @@ decide_by_traceid_ratio(TraceId, SpanName, #{id_upper := IdUpperBound} = Opts) -
 
 span_name_to_config_key(SpanName) when
     SpanName =:= ?CLIENT_CONNECT_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME
+        SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
+        SpanName =:= ?BROKER_DISCONNECT_SPAN_NAME
 ->
     client_connect_disconnect;
 span_name_to_config_key(SpanName) when
     SpanName =:= ?CLIENT_SUBSCRIBE_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME
+        SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME orelse
+        SpanName =:= ?BROKER_SUBSCRIBE_SPAN_NAME orelse
+        SpanName =:= ?BROKER_UNSUBSCRIBE_SPAN_NAME
 ->
     client_subscribe_unsubscribe;
 span_name_to_config_key(?CLIENT_PUBLISH_SPAN_NAME) ->