Quellcode durchsuchen

fix(sessds): Save protocol name and version in the session metadata

ieQu1 vor 1 Jahr
Ursprung
Commit
e439a2e0f2

+ 1 - 1
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -184,7 +184,7 @@ list_all_pubranges(Node) ->
 
 session_open(Node, ClientId) ->
     ClientInfo = #{},
-    ConnInfo = #{peername => {undefined, undefined}},
+    ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5},
     WillMsg = undefined,
     erpc:call(
         Node,

+ 13 - 4
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -767,7 +767,12 @@ sync(ClientId) ->
 %% the broker.
 -spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
     session() | false.
-session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
+session_open(
+    SessionId,
+    ClientInfo,
+    NewConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer},
+    MaybeWillMsg
+) ->
     NowMS = now_ms(),
     case emqx_persistent_session_ds_state:open(SessionId) of
         {ok, S0} ->
@@ -787,7 +792,8 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
                     ),
                     S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
                     S5 = set_clientinfo(ClientInfo, S4),
-                    S = emqx_persistent_session_ds_state:commit(S5),
+                    S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
+                    S = emqx_persistent_session_ds_state:commit(S6),
                     Inflight = emqx_persistent_session_ds_inflight:new(
                         receive_maximum(NewConnInfo)
                     ),
@@ -810,7 +816,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
     emqx_session:conf()
 ) ->
     session().
-session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
+session_ensure_new(
+    Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf
+) ->
     ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
     Now = now_ms(),
     S0 = emqx_persistent_session_ds_state:create_new(Id),
@@ -834,7 +842,8 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
     ),
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
     S6 = set_clientinfo(ClientInfo, S5),
-    S = emqx_persistent_session_ds_state:commit(S6),
+    S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
+    S = emqx_persistent_session_ds_state:commit(S7),
     #{
         id => Id,
         props => Conf,

+ 3 - 1
apps/emqx/src/emqx_persistent_session_ds.hrl

@@ -74,10 +74,12 @@
 -define(created_at, created_at).
 -define(last_alive_at, last_alive_at).
 -define(expiry_interval, expiry_interval).
-%% Unique integer used to create unique identities
+%% Unique integer used to create unique identities:
 -define(last_id, last_id).
+%% Connection info (relevent for the dashboard):
 -define(peername, peername).
 -define(will_message, will_message).
 -define(clientinfo, clientinfo).
+-define(protocol, protocol).
 
 -endif.

+ 15 - 2
apps/emqx/src/emqx_persistent_session_ds_state.erl

@@ -33,6 +33,7 @@
 -export([get_clientinfo/1, set_clientinfo/2]).
 -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
 -export([get_peername/1, set_peername/2]).
+-export([get_protocol/1, set_protocol/2]).
 -export([new_id/1]).
 -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]).
 -export([get_seqno/2, put_seqno/3]).
@@ -66,7 +67,8 @@
     seqno_type/0,
     stream_key/0,
     rank_key/0,
-    session_iterator/0
+    session_iterator/0,
+    protocol/0
 ]).
 
 -include("emqx_mqtt.hrl").
@@ -108,13 +110,16 @@
         dirty :: #{K => dirty | del}
     }.
 
+-type protocol() :: {binary(), emqx_types:proto_ver()}.
+
 -type metadata() ::
     #{
         ?created_at => emqx_persistent_session_ds:timestamp(),
         ?last_alive_at => emqx_persistent_session_ds:timestamp(),
         ?expiry_interval => non_neg_integer(),
         ?last_id => integer(),
-        ?peername => emqx_types:peername()
+        ?peername => emqx_types:peername(),
+        ?protocol => protocol()
     }.
 
 -type seqno_type() ::
@@ -321,6 +326,14 @@ get_peername(Rec) ->
 set_peername(Val, Rec) ->
     set_meta(?peername, Val, Rec).
 
+-spec get_protocol(t()) -> protocol() | undefined.
+get_protocol(Rec) ->
+    get_meta(?protocol, Rec).
+
+-spec set_protocol(protocol(), t()) -> t().
+set_protocol(Val, Rec) ->
+    set_meta(?protocol, Val, Rec).
+
 -spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()).
 get_clientinfo(Rec) ->
     get_meta(?clientinfo, Rec).

+ 4 - 1
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -1747,6 +1747,7 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
 
 format_persistent_session_info(ClientId, PSInfo0) ->
     Metadata = maps:get(metadata, PSInfo0, #{}),
+    {ProtoName, ProtoVer} = maps:get(protocol, Metadata),
     PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
     CreatedAt = maps:get(created_at, PSInfo1),
     case Metadata of
@@ -1765,7 +1766,9 @@ format_persistent_session_info(ClientId, PSInfo0) ->
         is_persistent => true,
         port => Port,
         heap_size => 0,
-        mqueue_len => 0
+        mqueue_len => 0,
+        proto_name => ProtoName,
+        proto_ver => ProtoVer
     },
     PSInfo = lists:foldl(
         fun result_format_time_fun/2,