Просмотр исходного кода

fix: trace broker send disconnect to clients

JimMoen 1 год назад
Родитель
Сommit
d70172b9bb

+ 6 - 1
apps/emqx/src/emqx_channel.erl

@@ -205,6 +205,8 @@ info(clientid, #channel{clientinfo = ClientInfo}) ->
     maps:get(clientid, ClientInfo, undefined);
 info(username, #channel{clientinfo = ClientInfo}) ->
     maps:get(username, ClientInfo, undefined);
+info(is_bridge, #channel{clientinfo = ClientInfo}) ->
+    maps:get(is_bridge, ClientInfo, undefined);
 info(session, #channel{session = Session}) ->
     maybe_apply(fun emqx_session:info/1, Session);
 info({session, Info}, #channel{session = Session}) ->
@@ -505,8 +507,11 @@ handle_in(
         client_disconnect,
         Packet,
         (basic_trace_attrs(Channel))#{
-            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
+            'client.proto_name' => info(proto_name, Channel),
+            'client.proto_ver' => info(proto_ver, Channel),
+            'client.is_bridge' => info(is_bridge, Channel),
             'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
+            'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
             'client.disconnect.reason_code' => emqx_packet:info(reason_code, _PktVar)
         },
         fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end

+ 66 - 1
apps/emqx/src/emqx_cm.erl

@@ -22,6 +22,8 @@
 -include("emqx_cm.hrl").
 -include("logger.hrl").
 -include("types.hrl").
+-include("emqx_mqtt.hrl").
+-include("emqx_external_trace.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
@@ -457,6 +459,14 @@ discard_session(ClientId) when is_binary(ClientId) ->
 when
     Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'} | takeover_kick.
 request_stepdown(Action, ConnMod, Pid) ->
+    ?EXT_TRACE_WITH_PROCESS_FUN(
+        broker_disconnect,
+        [],
+        maps:merge(basic_trace_attrs(Pid), action_to_reason(Action)),
+        fun([]) -> do_request_stepdown(Action, ConnMod, Pid) end
+    ).
+
+do_request_stepdown(Action, ConnMod, Pid) ->
     Timeout =
         case Action == kick orelse Action == discard of
             true -> ?T_KICK;
@@ -696,7 +706,9 @@ lookup_channels(local, ClientId) ->
     [ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
 
 -spec lookup_client(
-    {clientid, emqx_types:clientid()} | {username, emqx_types:username()} | {chan_pid, chan_pid()}
+    {clientid, emqx_types:clientid()}
+    | {username, emqx_types:username()}
+    | {chan_pid, chan_pid()}
 ) ->
     [channel_info()].
 lookup_client({username, Username}) ->
@@ -842,3 +854,56 @@ kick_session_chans(ClientId, ChanPids) ->
             ok
     end,
     lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids).
+
+-if(?EMQX_RELEASE_EDITION == ee).
+
+basic_trace_attrs(Pid) ->
+    %% io:format("lookup_client({chan_pid, Pid}): ~p", [lookup_client({chan_pid, Pid})]),
+    case lookup_client({chan_pid, Pid}) of
+        [] ->
+            #{'channel.pid' => iolist_to_binary(io_lib:format("~p", [Pid]))};
+        [{_Chan, #{clientinfo := ClientInfo, conninfo := ConnInfo}, _Stats}] ->
+            #{
+                'client.clientid' => maps:get(clientid, ClientInfo, undefined),
+                'client.username' => maps:get(username, ClientInfo, undefined),
+                'client.proto_name' => maps:get(proto_name, ConnInfo, undefined),
+                'client.proto_ver' => maps:get(proto_ver, ConnInfo, undefined),
+                'client.is_bridge' => maps:get(is_bridge, ClientInfo, undefined),
+                'client.sockname' => ntoa(maps:get(sockname, ConnInfo, undefined)),
+                'client.peername' => ntoa(maps:get(peername, ConnInfo, undefined))
+            };
+        _ ->
+            #{}
+    end.
+
+action_to_reason(Action) when
+    Action =:= kick orelse
+        Action =:= takeover_kick
+->
+    #{
+        'client.disconnect.reason_code' => ?RC_ADMINISTRATIVE_ACTION,
+        'client.disconnect.reason' => kick
+    };
+action_to_reason(discard) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'client.disconnect.reason' => discard
+    };
+action_to_reason({takeover, 'begin'}) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'client.disconnect.reason' => takeover_begin
+    };
+action_to_reason({takeover, 'end'}) ->
+    #{
+        'client.disconnect.reason_code' => ?RC_SESSION_TAKEN_OVER,
+        'disconnect.reason' => takeover_end
+    }.
+
+ntoa(undefined) ->
+    undefined;
+ntoa(IpPort) ->
+    emqx_utils:ntoa(IpPort).
+
+-else.
+-endif.

+ 5 - 0
apps/emqx/src/emqx_external_trace.erl

@@ -56,6 +56,11 @@
     InitAttrs :: attrs(),
     Res :: term().
 
+-callback broker_disconnect(Any, InitAttrs, fun((Any) -> Res)) -> Res when
+    Any :: any(),
+    InitAttrs :: attrs(),
+    Res :: any().
+
 %% Message Processing Spans
 %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
 -callback client_publish(Packet, InitAttrs, fun((Packet) -> Res)) -> Res when

+ 2 - 0
apps/emqx_opentelemetry/include/emqx_otel_trace.hrl

@@ -13,6 +13,8 @@
 -define(CLIENT_UNSUBSCRIBE_SPAN_NAME, 'client.unsubscribe').
 -define(CLIENT_PUBLISH_SPAN_NAME, 'client.publish').
 
+-define(BROKER_DISCONNECT_SPAN_NAME, 'broker.disconnect').
+
 -define(CLIENT_AUTHN_SPAN_NAME, 'client.authn').
 -define(CLIENT_AUTHZ_SPAN_NAME, 'client.authz').
 

+ 24 - 0
apps/emqx_opentelemetry/src/emqx_otel_trace.erl

@@ -24,6 +24,8 @@
     client_authn/3,
     client_authz/3,
 
+    broker_disconnect/3,
+
     %% Message Processing Spans (From Client)
     %% PUBLISH(form Publisher) -> ROUTE -> FORWARD(optional) -> DELIVER(to Subscribers)
     client_publish/3,
@@ -325,6 +327,28 @@ client_authz(Packet, Attrs, ProcessFun) ->
         )
     ).
 
+-spec broker_disconnect(
+    Any,
+    Attrs,
+    fun((Any) -> Res)
+) ->
+    Res
+when
+    Any :: term(),
+    Attrs :: attrs(),
+    Res :: term().
+broker_disconnect(Any, Attrs, ProcessFun) ->
+    ?with_trace_mode(
+        ProcessFun(Any),
+        ?with_span(
+            ?BROKER_DISCONNECT_SPAN_NAME,
+            #{attributes => Attrs},
+            fun(_SpanCtx) ->
+                ProcessFun(Any)
+            end
+        )
+    ).
+
 -spec client_publish(
     Packet,
     Attrs,

+ 4 - 2
apps/emqx_opentelemetry/src/sampler/emqx_otel_sampler.erl

@@ -180,7 +180,8 @@ should_sample(
         SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
         SpanName =:= ?CLIENT_SUBSCRIBE_SPAN_NAME orelse
         SpanName =:= ?CLIENT_UNSUBSCRIBE_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME
+        SpanName =:= ?CLIENT_PUBLISH_SPAN_NAME orelse
+        SpanName =:= ?BROKER_DISCONNECT_SPAN_NAME
 ->
     Desicion =
         decide_by_match_rule(Attributes, Opts) orelse
@@ -241,7 +242,8 @@ decide_by_traceid_ratio(TraceId, SpanName, #{id_upper := IdUpperBound} = Opts) -
 
 span_name_to_config_key(SpanName) when
     SpanName =:= ?CLIENT_CONNECT_SPAN_NAME orelse
-        SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME
+        SpanName =:= ?CLIENT_DISCONNECT_SPAN_NAME orelse
+        SpanName =:= ?BROKER_DISCONNECT_SPAN_NAME
 ->
     client_connect_disconnect;
 span_name_to_config_key(SpanName) when