Просмотр исходного кода

feat(sessds): Expose relevant durable session info in the REST API

ieQu1 2 лет назад
Родитель
Сommit
cada944350

+ 32 - 1
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -162,12 +162,21 @@
 -type replies() :: emqx_session:replies().
 
 -define(STATS_KEYS, [
+    durable,
     subscriptions_cnt,
     subscriptions_max,
     inflight_cnt,
     inflight_max,
     mqueue_len,
-    mqueue_dropped
+    mqueue_dropped,
+    seqno_q1_comm,
+    seqno_q1_dup,
+    seqno_q1_next,
+    seqno_q2_comm,
+    seqno_q2_dup,
+    seqno_q2_rec,
+    seqno_q2_next,
+    n_streams
 ]).
 
 %%
@@ -214,6 +223,8 @@ info(id, #{id := ClientID}) ->
     ClientID;
 info(clientid, #{id := ClientID}) ->
     ClientID;
+info(durable, _) ->
+    true;
 info(created_at, #{s := S}) ->
     emqx_persistent_session_ds_state:get_created_at(S);
 info(is_persistent, #{}) ->
@@ -249,6 +260,26 @@ info(mqueue_dropped, _Session) ->
 %     AwaitingRel;
 %% info(awaiting_rel_cnt, #{s := S}) ->
 %%     seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S);
+info(seqno_q1_comm, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S);
+info(seqno_q1_dup, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S);
+info(seqno_q1_next, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?next(?QOS_1), S);
+info(seqno_q2_comm, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S);
+info(seqno_q2_dup, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S);
+info(seqno_q2_rec, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?rec, S);
+info(seqno_q2_next, #{s := S}) ->
+    emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S);
+info(n_streams, #{s := S}) ->
+    emqx_persistent_session_ds_state:fold_streams(
+        fun(_, _, Acc) -> Acc + 1 end,
+        0,
+        S
+    );
 info(awaiting_rel_max, #{props := Conf}) ->
     maps:get(max_awaiting_rel, Conf);
 info(await_rel_timeout, #{props := _Conf}) ->

+ 3 - 0
apps/emqx/src/emqx_session_mem.erl

@@ -132,6 +132,7 @@
 -type replies() :: emqx_session:replies().
 
 -define(STATS_KEYS, [
+    durable,
     subscriptions_cnt,
     subscriptions_max,
     inflight_cnt,
@@ -254,6 +255,8 @@ info(created_at, #session{created_at = CreatedAt}) ->
     CreatedAt;
 info(is_persistent, #session{is_persistent = IsPersistent}) ->
     IsPersistent;
+info(durable, _) ->
+    false;
 info(subscriptions, #session{subscriptions = Subs}) ->
     Subs;
 info(subscriptions_cnt, #session{subscriptions = Subs}) ->

+ 63 - 2
apps/emqx_management/src/emqx_mgmt_api_clients.erl

@@ -621,7 +621,67 @@ fields(client) ->
                         " Maximum number of subscriptions allowed by this client">>
             })},
         {username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
-        {mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})}
+        {mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})},
+        {durable, hoconsc:mk(boolean(), #{desc => <<"Session is durable">>})},
+        {n_streams,
+            hoconsc:mk(non_neg_integer(), #{
+                desc => <<"Number of streams used by the durable session">>
+            })},
+
+        {seqno_q1_comm,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<
+                        "Sequence number of the last PUBACK received from the client "
+                        "(Durable sessions only)"
+                    >>
+            })},
+        {seqno_q1_dup,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<
+                        "Sequence number of the last QoS1 message sent to the client, that hasn't been acked "
+                        "(Durable sessions only)"
+                    >>
+            })},
+        {seqno_q1_next,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<
+                        "Sequence number of next QoS1 message to be added to the batch "
+                        "(Durable sessions only)"
+                    >>
+            })},
+
+        {seqno_q2_comm,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<
+                        "Sequence number of the last PUBCOMP received from the client "
+                        "(Durable sessions only)"
+                    >>
+            })},
+        {seqno_q2_dup,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<
+                        "Sequence number of last unacked QoS2 PUBLISH message sent to the client "
+                        "(Durable sessions only)"
+                    >>
+            })},
+        {seqno_q2_rec,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<"Sequence number of last PUBREC received from the client (Durable sessions only)">>
+            })},
+        {seqno_q2_next,
+            hoconsc:mk(non_neg_integer(), #{
+                desc =>
+                    <<
+                        "Sequence number of next QoS2 message to be added to the batch "
+                        "(Durable sessions only)"
+                    >>
+            })}
     ];
 fields(authz_cache) ->
     [
@@ -1588,7 +1648,8 @@ client_example() ->
         <<"recv_msg">> => 0,
         <<"recv_pkt">> => 4,
         <<"recv_cnt">> => 4,
-        <<"recv_msg.qos0">> => 0
+        <<"recv_msg.qos0">> => 0,
+        <<"durable">> => false
     }.
 
 message_example() ->