Bladeren bron

feat(queue): compact protocol structures, organize formatting

Ilya Averyanov 1 jaar geleden
bovenliggende
commit
0e8e5a04b7

+ 5 - 0
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -56,6 +56,11 @@
     cold_get_subscription/2
 ]).
 
+-export([
+    format_lease_events/1,
+    format_stream_progresses/1
+]).
+
 -define(schedule_subscribe, schedule_subscribe).
 -define(schedule_unsubscribe, schedule_unsubscribe).
 

+ 6 - 2
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl

@@ -268,7 +268,9 @@ handle_leader_update_streams(
         id => Id,
         version_old => VersionOld,
         version_new => VersionNew,
-        stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses)
+        stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses(
+            StreamProgresses
+        )
     }),
     {AddEvents, Streams1} = lists:foldl(
         fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
@@ -305,7 +307,9 @@ handle_leader_update_streams(
     StreamLeaseEvents = AddEvents ++ RevokeEvents,
     ?tp(warning, shared_sub_group_sm_leader_update_streams, #{
         id => Id,
-        stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents)
+        stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
+            StreamLeaseEvents
+        )
     }),
     transition(
         GSM,

+ 32 - 140
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl

@@ -22,12 +22,6 @@
 ]).
 
 -export([
-    format_stream_progresses/1,
-    format_stream_progress/1,
-    format_stream_key/1,
-    format_stream_keys/1,
-    format_lease_event/1,
-    format_lease_events/1,
     agent/2
 ]).
 
@@ -57,6 +51,20 @@
     agent_metadata/0
 ]).
 
+-define(log_agent_msg(ToLeader, Msg),
+    ?tp(debug, shared_sub_proto_msg, #{
+        to_leader => ToLeader,
+        msg => emqx_ds_shared_sub_proto_format:format_agent_msg(Msg)
+    })
+).
+
+-define(log_leader_msg(ToAgent, Msg),
+    ?tp(debug, shared_sub_proto_msg, #{
+        to_agent => ToAgent,
+        msg => emqx_ds_shared_sub_proto_format:format_leader_msg(Msg)
+    })
+).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
@@ -67,15 +75,7 @@
 agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_connect_leader,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        agent_metadata => AgentMetadata,
-        share_topic_filter => ShareTopicFilter
-    }),
-    _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)),
-    ok;
+    send_agent_msg(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter));
 agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
     emqx_ds_shared_sub_proto_v1:agent_connect_leader(
         ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
@@ -85,15 +85,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_update_stream_states,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        stream_progresses => format_stream_progresses(StreamProgresses),
-        version => Version
-    }),
-    _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
-    ok;
+    send_agent_msg(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version));
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
     emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
         ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
@@ -105,18 +97,9 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_update_stream_states,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        stream_progresses => format_stream_progresses(StreamProgresses),
-        version_old => VersionOld,
-        version_new => VersionNew
-    }),
-    _ = erlang:send(
+    send_agent_msg(
         ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
-    ),
-    ok;
+    );
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
     emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
         ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
@@ -125,15 +108,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
 agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_disconnect,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        stream_progresses => format_stream_progresses(StreamProgresses),
-        version => Version
-    }),
-    _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)),
-    ok;
+    send_agent_msg(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version));
 agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
     emqx_ds_shared_sub_proto_v1:agent_disconnect(
         ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
@@ -144,19 +119,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
 -spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) ->
     ok.
 leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_lease_streams,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        leader => Leader,
-        streams => format_stream_progresses(Streams),
-        version => Version
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_lease_streams(OfGroup, Leader, Streams, Version)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_lease_streams(OfGroup, Leader, Streams, Version));
 leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
     emqx_ds_shared_sub_proto_v1:leader_lease_streams(
         ?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version
@@ -164,17 +127,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
 
 -spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
 leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_renew_stream_lease,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        version => Version
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_renew_stream_lease(OfGroup, Version)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, Version));
 leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
     emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
         ?agent_node(ToAgent), ToAgent, OfGroup, Version
@@ -182,18 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
 
 -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
 leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_renew_stream_lease,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        version_old => VersionOld,
-        version_new => VersionNew
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew));
 leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
     emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
         ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew
@@ -204,19 +146,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
 leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
     ?is_local_agent(ToAgent)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_update_streams,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        version_old => VersionOld,
-        version_new => VersionNew,
-        streams_new => format_stream_progresses(StreamsNew)
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew));
 leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
     emqx_ds_shared_sub_proto_v1:leader_update_streams(
         ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
@@ -224,16 +154,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
 
 -spec leader_invalidate(agent(), group()) -> ok.
 leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_invalidate,
-        to_agent => ToAgent,
-        of_group => OfGroup
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_invalidate(OfGroup)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_invalidate(OfGroup));
 leader_invalidate(ToAgent, OfGroup) ->
     emqx_ds_shared_sub_proto_v1:leader_invalidate(
         ?agent_node(ToAgent), ToAgent, OfGroup
@@ -247,41 +168,12 @@ agent(Id, Pid) ->
     _ = Id,
     ?agent(Id, Pid).
 
-format_stream_progresses(Streams) ->
-    lists:map(
-        fun format_stream_progress/1,
-        Streams
-    ).
-
-format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
-    Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
-
-format_progress(#{iterator := Iterator} = Progress) ->
-    Progress#{iterator => format_opaque(Iterator)}.
-
-format_stream_key({SubId, Stream}) ->
-    {SubId, format_opaque(Stream)}.
-
-format_stream_keys(StreamKeys) ->
-    lists:map(
-        fun format_stream_key/1,
-        StreamKeys
-    ).
-
-format_lease_events(Events) ->
-    lists:map(
-        fun format_lease_event/1,
-        Events
-    ).
-
-format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
-    Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
-format_lease_event(#{stream := Stream} = Event) ->
-    Event#{stream => format_opaque(Stream)}.
-
-%%--------------------------------------------------------------------
-%% Helpers
-%%--------------------------------------------------------------------
+send_agent_msg(ToLeader, Msg) ->
+    ?log_agent_msg(ToLeader, Msg),
+    _ = erlang:send(ToLeader, Msg),
+    ok.
 
-format_opaque(Opaque) ->
-    erlang:phash2(Opaque).
+send_leader_msg(ToAgent, Msg) ->
+    ?log_leader_msg(ToAgent, Msg),
+    _ = emqx_persistent_session_ds_shared_subs_agent:send(?agent_pid(ToAgent), Msg),
+    ok.

+ 100 - 79
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl

@@ -12,146 +12,167 @@
 
 %% agent messages, sent from agent side to the leader
 
--define(agent_connect_leader_msg, agent_connect_leader).
--define(agent_update_stream_states_msg, agent_update_stream_states).
--define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout).
--define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout).
--define(agent_disconnect_msg, agent_disconnect).
+-define(agent_connect_leader_msg, 1).
+-define(agent_update_stream_states_msg, 2).
+-define(agent_connect_leader_timeout_msg, 3).
+-define(agent_renew_stream_lease_timeout_msg, 4).
+-define(agent_disconnect_msg, 5).
+
+%% message keys (used used not to send atoms over the network)
+-define(agent_msg_type, 1).
+-define(agent_msg_agent, 2).
+-define(agent_msg_share_topic_filter, 3).
+-define(agent_msg_agent_metadata, 4).
+-define(agent_msg_stream_states, 5).
+-define(agent_msg_version, 6).
+-define(agent_msg_version_old, 7).
+-define(agent_msg_version_new, 8).
 
 %% Agent messages sent to the leader.
 %% Leader talks to many agents, `agent` field is used to identify the sender.
 
 -define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{
-    type => ?agent_connect_leader_msg,
-    share_topic_filter => ShareTopicFilter,
-    agent_metadata => AgentMetadata,
-    agent => Agent
+    ?agent_msg_type => ?agent_connect_leader_msg,
+    ?agent_msg_share_topic_filter => ShareTopicFilter,
+    ?agent_msg_agent_metadata => AgentMetadata,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{
-    type := ?agent_connect_leader_msg,
-    share_topic_filter := ShareTopicFilter,
-    agent_metadata := AgentMetadata,
-    agent := Agent
+    ?agent_msg_type := ?agent_connect_leader_msg,
+    ?agent_msg_share_topic_filter := ShareTopicFilter,
+    ?agent_msg_agent_metadata := AgentMetadata,
+    ?agent_msg_agent := Agent
 }).
 
 -define(agent_update_stream_states(Agent, StreamStates, Version), #{
-    type => ?agent_update_stream_states_msg,
-    stream_states => StreamStates,
-    version => Version,
-    agent => Agent
+    ?agent_msg_type => ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states => StreamStates,
+    ?agent_msg_version => Version,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_update_stream_states_match(Agent, StreamStates, Version), #{
-    type := ?agent_update_stream_states_msg,
-    stream_states := StreamStates,
-    version := Version,
-    agent := Agent
+    ?agent_msg_type := ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states := StreamStates,
+    ?agent_msg_version := Version,
+    ?agent_msg_agent := Agent
 }).
 
 -define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{
-    type => ?agent_update_stream_states_msg,
-    stream_states => StreamStates,
-    version_old => VersionOld,
-    version_new => VersionNew,
-    agent => Agent
+    ?agent_msg_type => ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states => StreamStates,
+    ?agent_msg_version_old => VersionOld,
+    ?agent_msg_version_new => VersionNew,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{
-    type := ?agent_update_stream_states_msg,
-    stream_states := StreamStates,
-    version_old := VersionOld,
-    version_new := VersionNew,
-    agent := Agent
+    ?agent_msg_type := ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states := StreamStates,
+    ?agent_msg_version_old := VersionOld,
+    ?agent_msg_version_new := VersionNew,
+    ?agent_msg_agent := Agent
 }).
 
 -define(agent_disconnect(Agent, StreamStates, Version), #{
-    type => ?agent_disconnect_msg,
-    stream_states => StreamStates,
-    version => Version,
-    agent => Agent
+    ?agent_msg_type => ?agent_disconnect_msg,
+    ?agent_msg_stream_states => StreamStates,
+    ?agent_msg_version => Version,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_disconnect_match(Agent, StreamStates, Version), #{
-    type := ?agent_disconnect_msg,
-    stream_states := StreamStates,
-    version := Version,
-    agent := Agent
+    ?agent_msg_type := ?agent_disconnect_msg,
+    ?agent_msg_stream_states := StreamStates,
+    ?agent_msg_version := Version,
+    ?agent_msg_agent := Agent
 }).
 
 %% leader messages, sent from the leader to the agent
 %% Agent may have several shared subscriptions, so may talk to several leaders
 %% `group_id` field is used to identify the leader.
 
--define(leader_lease_streams_msg, leader_lease_streams).
--define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
+-define(leader_lease_streams_msg, 101).
+-define(leader_renew_stream_lease_msg, 102).
+-define(leader_update_streams, 103).
+-define(leader_invalidate, 104).
+
+-define(leader_msg_type, 101).
+-define(leader_msg_streams, 102).
+-define(leader_msg_version, 103).
+-define(leader_msg_version_old, 104).
+-define(leader_msg_version_new, 105).
+-define(leader_msg_streams_new, 106).
+-define(leader_msg_leader, 107).
+-define(leader_msg_group_id, 108).
 
 -define(leader_lease_streams(GrouId, Leader, Streams, Version), #{
-    type => ?leader_lease_streams_msg,
-    streams => Streams,
-    version => Version,
-    leader => Leader,
-    group_id => GrouId
+    ?leader_msg_type => ?leader_lease_streams_msg,
+    ?leader_msg_streams => Streams,
+    ?leader_msg_version => Version,
+    ?leader_msg_leader => Leader,
+    ?leader_msg_group_id => GrouId
 }).
 
 -define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{
-    type := ?leader_lease_streams_msg,
-    streams := Streams,
-    version := Version,
-    leader := Leader,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_lease_streams_msg,
+    ?leader_msg_streams := Streams,
+    ?leader_msg_version := Version,
+    ?leader_msg_leader := Leader,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_renew_stream_lease(GroupId, Version), #{
-    type => ?leader_renew_stream_lease_msg,
-    version => Version,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_renew_stream_lease_msg,
+    ?leader_msg_version => Version,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_renew_stream_lease_match(GroupId, Version), #{
-    type := ?leader_renew_stream_lease_msg,
-    version := Version,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_renew_stream_lease_msg,
+    ?leader_msg_version := Version,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{
-    type => ?leader_renew_stream_lease_msg,
-    version_old => VersionOld,
-    version_new => VersionNew,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_renew_stream_lease_msg,
+    ?leader_msg_version_old => VersionOld,
+    ?leader_msg_version_new => VersionNew,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{
-    type := ?leader_renew_stream_lease_msg,
-    version_old := VersionOld,
-    version_new := VersionNew,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_renew_stream_lease_msg,
+    ?leader_msg_version_old := VersionOld,
+    ?leader_msg_version_new := VersionNew,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{
-    type => leader_update_streams,
-    version_old => VersionOld,
-    version_new => VersionNew,
-    streams_new => StreamsNew,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_update_streams,
+    ?leader_msg_version_old => VersionOld,
+    ?leader_msg_version_new => VersionNew,
+    ?leader_msg_streams_new => StreamsNew,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{
-    type := leader_update_streams,
-    version_old := VersionOld,
-    version_new := VersionNew,
-    streams_new := StreamsNew,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_update_streams,
+    ?leader_msg_version_old := VersionOld,
+    ?leader_msg_version_new := VersionNew,
+    ?leader_msg_streams_new := StreamsNew,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_invalidate(GroupId), #{
-    type => leader_invalidate,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_invalidate,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_invalidate_match(GroupId), #{
-    type := leader_invalidate,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_invalidate,
+    ?leader_msg_group_id := GroupId
 }).
 
 %% Helpers

+ 82 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl

@@ -0,0 +1,82 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_proto_format).
+
+-include("emqx_ds_shared_sub_proto.hrl").
+
+-export([format_agent_msg/1, format_leader_msg/1]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+format_agent_msg(Msg) ->
+    maps:from_list(
+        lists:map(
+            fun({K, V}) ->
+                FormattedKey = agent_msg_key(K),
+                {FormattedKey, format_agent_msg_value(FormattedKey, V)}
+            end,
+            maps:to_list(Msg)
+        )
+    ).
+
+format_leader_msg(Msg) ->
+    maps:from_list(
+        lists:map(
+            fun({K, V}) ->
+                FormattedKey = leader_msg_key(K),
+                {FormattedKey, format_leader_msg_value(FormattedKey, V)}
+            end,
+            maps:to_list(Msg)
+        )
+    ).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+format_agent_msg_value(agent_msg_type, Type) ->
+    agent_msg_type(Type);
+format_agent_msg_value(agent_msg_stream_states, StreamStates) ->
+    emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates);
+format_agent_msg_value(_, Value) ->
+    Value.
+
+format_leader_msg_value(leader_msg_type, Type) ->
+    leader_msg_type(Type);
+format_leader_msg_value(leader_msg_streams, Streams) ->
+    emqx_persistent_session_ds_shared_subs:format_lease_events(Streams);
+format_leader_msg_value(_, Value) ->
+    Value.
+
+agent_msg_type(?agent_connect_leader_msg) -> agent_connect_leader_msg;
+agent_msg_type(?agent_update_stream_states_msg) -> agent_update_stream_states_msg;
+agent_msg_type(?agent_connect_leader_timeout_msg) -> agent_connect_leader_timeout_msg;
+agent_msg_type(?agent_renew_stream_lease_timeout_msg) -> agent_renew_stream_lease_timeout_msg;
+agent_msg_type(?agent_disconnect_msg) -> agent_disconnect_msg.
+
+agent_msg_key(?agent_msg_type) -> agent_msg_type;
+agent_msg_key(?agent_msg_agent) -> agent_msg_agent;
+agent_msg_key(?agent_msg_share_topic_filter) -> agent_msg_share_topic_filter;
+agent_msg_key(?agent_msg_agent_metadata) -> agent_msg_agent_metadata;
+agent_msg_key(?agent_msg_stream_states) -> agent_msg_stream_states;
+agent_msg_key(?agent_msg_version) -> agent_msg_version;
+agent_msg_key(?agent_msg_version_old) -> agent_msg_version_old;
+agent_msg_key(?agent_msg_version_new) -> agent_msg_version_new.
+
+leader_msg_type(?leader_lease_streams_msg) -> leader_lease_streams_msg;
+leader_msg_type(?leader_renew_stream_lease_msg) -> leader_renew_stream_lease_msg;
+leader_msg_type(?leader_update_streams) -> leader_update_streams;
+leader_msg_type(?leader_invalidate) -> leader_invalidate.
+
+leader_msg_key(?leader_msg_type) -> leader_msg_type;
+leader_msg_key(?leader_msg_streams) -> leader_msg_streams;
+leader_msg_key(?leader_msg_version) -> leader_msg_version;
+leader_msg_key(?leader_msg_version_old) -> leader_msg_version_old;
+leader_msg_key(?leader_msg_version_new) -> leader_msg_version_new;
+leader_msg_key(?leader_msg_streams_new) -> leader_msg_streams_new;
+leader_msg_key(?leader_msg_leader) -> leader_msg_leader;
+leader_msg_key(?leader_msg_group_id) -> leader_msg_group_id.