فهرست منبع

Merge pull request #13565 from savonarola/0801-shared-subs-compact-structures

Reduce size of shared sub protocol structures
Ilia Averianov 1 سال پیش
والد
کامیت
6bfddd9952

+ 12 - 7
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl

@@ -56,6 +56,11 @@
     cold_get_subscription/2
 ]).
 
+-export([
+    format_lease_events/1,
+    format_stream_progresses/1
+]).
+
 -define(schedule_subscribe, schedule_subscribe).
 -define(schedule_unsubscribe, schedule_unsubscribe).
 
@@ -236,14 +241,14 @@ schedule_subscribe(
             ScheduledActions1 = ScheduledActions0#{
                 ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
             },
-            ?tp(warning, shared_subs_schedule_subscribe_override, #{
+            ?tp(debug, shared_subs_schedule_subscribe_override, #{
                 share_topic_filter => ShareTopicFilter,
                 new_type => {?schedule_subscribe, SubOpts},
                 old_action => format_schedule_action(ScheduledAction)
             }),
             SharedSubS0#{scheduled_actions := ScheduledActions1};
         _ ->
-            ?tp(warning, shared_subs_schedule_subscribe_new, #{
+            ?tp(debug, shared_subs_schedule_subscribe_new, #{
                 share_topic_filter => ShareTopicFilter, subopts => SubOpts
             }),
             Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
@@ -294,7 +299,7 @@ schedule_unsubscribe(
             ScheduledActions1 = ScheduledActions0#{
                 ShareTopicFilter => ScheduledAction1
             },
-            ?tp(warning, shared_subs_schedule_unsubscribe_override, #{
+            ?tp(debug, shared_subs_schedule_unsubscribe_override, #{
                 share_topic_filter => ShareTopicFilter,
                 new_type => ?schedule_unsubscribe,
                 old_action => format_schedule_action(ScheduledAction0)
@@ -309,7 +314,7 @@ schedule_unsubscribe(
                     progresses => []
                 }
             },
-            ?tp(warning, shared_subs_schedule_unsubscribe_new, #{
+            ?tp(debug, shared_subs_schedule_unsubscribe_new, #{
                 share_topic_filter => ShareTopicFilter,
                 stream_keys => format_stream_keys(StreamKeys)
             }),
@@ -334,7 +339,7 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh
         Agent0
     ),
     StreamLeaseEvents =/= [] andalso
-        ?tp(warning, shared_subs_new_stream_lease_events, #{
+        ?tp(debug, shared_subs_new_stream_lease_events, #{
             stream_lease_events => format_lease_events(StreamLeaseEvents)
         }),
     S1 = lists:foldl(
@@ -501,7 +506,7 @@ run_scheduled_action(
     Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
     case StreamKeysToWait1 of
         [] ->
-            ?tp(warning, shared_subs_schedule_action_complete, #{
+            ?tp(debug, shared_subs_schedule_action_complete, #{
                 share_topic_filter => ShareTopicFilter,
                 progresses => format_stream_progresses(Progresses1),
                 type => Type
@@ -525,7 +530,7 @@ run_scheduled_action(
             end;
         _ ->
             Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
-            ?tp(warning, shared_subs_schedule_action_continue, #{
+            ?tp(debug, shared_subs_schedule_action_continue, #{
                 share_topic_filter => ShareTopicFilter,
                 new_action => format_schedule_action(Action1)
             }),

+ 8 - 8
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl

@@ -100,7 +100,7 @@ open(TopicSubscriptions, Opts) ->
     State0 = init_state(Opts),
     State1 = lists:foldl(
         fun({ShareTopicFilter, #{}}, State) ->
-            ?tp(warning, ds_agent_open_subscription, #{
+            ?tp(debug, ds_agent_open_subscription, #{
                 topic_filter => ShareTopicFilter
             }),
             add_shared_subscription(State, ShareTopicFilter)
@@ -120,7 +120,7 @@ can_subscribe(_State, _ShareTopicFilter, _SubOpts) ->
 
 -spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
 on_subscribe(State0, ShareTopicFilter, _SubOpts) ->
-    ?tp(warning, ds_agent_on_subscribe, #{
+    ?tp(debug, ds_agent_on_subscribe, #{
         share_topic_filter => ShareTopicFilter
     }),
     add_shared_subscription(State0, ShareTopicFilter).
@@ -163,7 +163,7 @@ on_disconnect(#{groups := Groups0} = State, StreamProgresses) ->
 
 -spec on_info(t(), term()) -> t().
 on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_lease_streams,
         group_id => GroupId,
         streams => StreamProgresses,
@@ -176,7 +176,7 @@ on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Ve
         )
     end);
 on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_renew_stream_lease,
         group_id => GroupId,
         version => Version
@@ -185,7 +185,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
         emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
     end);
 on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_renew_stream_lease,
         group_id => GroupId,
         version_old => VersionOld,
@@ -195,7 +195,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)
         emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew)
     end);
 on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_update_streams,
         group_id => GroupId,
         version_old => VersionOld,
@@ -208,7 +208,7 @@ on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, Str
         )
     end);
 on_info(State, ?leader_invalidate_match(GroupId)) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_invalidate,
         group_id => GroupId
     }),
@@ -245,7 +245,7 @@ delete_shared_subscription(State, ShareTopicFilter, GroupProgress) ->
 add_shared_subscription(
     #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
 ) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => agent_add_shared_subscription,
         share_topic_filter => ShareTopicFilter
     }),

+ 14 - 23
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl

@@ -120,7 +120,7 @@ new(#{
     send_after := SendAfter
 }) ->
     ?SLOG(
-        info,
+        debug,
         #{
             msg => group_sm_new,
             agent => Agent,
@@ -133,7 +133,7 @@ new(#{
         agent => Agent,
         send_after => SendAfter
     },
-    ?tp(warning, group_sm_new, #{
+    ?tp(debug, group_sm_new, #{
         agent => Agent,
         share_topic_filter => ShareTopicFilter
     }),
@@ -176,7 +176,7 @@ handle_disconnect(
 %% Connecting state
 
 handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
-    ?tp(warning, group_sm_enter_connecting, #{
+    ?tp(debug, group_sm_enter_connecting, #{
         agent => Agent,
         share_topic_filter => ShareTopicFilter
     }),
@@ -264,11 +264,13 @@ handle_leader_update_streams(
     VersionNew,
     StreamProgresses
 ) ->
-    ?tp(warning, shared_sub_group_sm_leader_update_streams, #{
+    ?tp(debug, shared_sub_group_sm_leader_update_streams, #{
         id => Id,
         version_old => VersionOld,
         version_new => VersionNew,
-        stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses)
+        stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses(
+            StreamProgresses
+        )
     }),
     {AddEvents, Streams1} = lists:foldl(
         fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
@@ -303,9 +305,11 @@ handle_leader_update_streams(
         maps:keys(Streams1)
     ),
     StreamLeaseEvents = AddEvents ++ RevokeEvents,
-    ?tp(warning, shared_sub_group_sm_leader_update_streams, #{
+    ?tp(debug, shared_sub_group_sm_leader_update_streams, #{
         id => Id,
-        stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents)
+        stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
+            StreamLeaseEvents
+        )
     }),
     transition(
         GSM,
@@ -431,24 +435,11 @@ handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilte
 %% Internal API
 %%-----------------------------------------------------------------------
 
-handle_state_timeout(
-    #{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM,
-    find_leader_timeout,
-    _Message
-) ->
-    ?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}),
+handle_state_timeout(#{state := ?connecting} = GSM, find_leader_timeout, _Message) ->
     handle_find_leader_timeout(GSM);
-handle_state_timeout(
-    #{state := ?replaying} = GSM,
-    renew_lease_timeout,
-    _Message
-) ->
+handle_state_timeout(#{state := ?replaying} = GSM, renew_lease_timeout, _Message) ->
     handle_renew_lease_timeout(GSM);
-handle_state_timeout(
-    GSM,
-    update_stream_state_timeout,
-    _Message
-) ->
+handle_state_timeout(GSM, update_stream_state_timeout, _Message) ->
     ?tp(debug, update_stream_state_timeout, #{}),
     handle_stream_progress(GSM, []).
 

+ 18 - 18
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl

@@ -164,7 +164,7 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist
 %%--------------------------------------------------------------------
 %% repalying state
 handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
-    ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}),
+    ?tp(debug, shared_sub_leader_enter_actve, #{topic => Topic}),
     {keep_state_and_data, [
         {{timeout, #renew_streams{}}, 0, #renew_streams{}},
         {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}},
@@ -174,7 +174,7 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
 %% timers
 %% renew_streams timer
 handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) ->
-    % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}),
+    ?tp(debug, shared_sub_leader_timeout, #{timeout => renew_streams}),
     Data1 = renew_streams(Data0),
     {keep_state, Data1,
         {
@@ -184,7 +184,7 @@ handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data
         }};
 %% renew_leases timer
 handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) ->
-    % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}),
+    ?tp(debug, shared_sub_leader_timeout, #{timeout => renew_leases}),
     Data1 = renew_leases(Data0),
     {keep_state, Data1,
         {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}};
@@ -279,7 +279,7 @@ renew_streams(
     Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1},
     Data3 = revoke_streams(Data2),
     Data4 = assign_streams(Data3),
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_renew_streams,
         topic_filter => TopicFilter,
         new_streams => length(NewStreamsWRanks)
@@ -368,7 +368,7 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) ->
             false ->
                 AgentState0;
             true ->
-                ?tp(warning, shared_sub_leader_revoke_streams, #{
+                ?tp(debug, shared_sub_leader_revoke_streams, #{
                     agent => Agent,
                     agent_stream_count => length(Streams0),
                     revoke_count => RevokeCount,
@@ -421,7 +421,7 @@ assign_lacking_streams(Data0, Agent, DesiredCount) ->
         false ->
             Data0;
         true ->
-            ?tp(warning, shared_sub_leader_assign_streams, #{
+            ?tp(debug, shared_sub_leader_assign_streams, #{
                 agent => Agent,
                 agent_stream_count => length(Streams0),
                 assign_count => AssignCount,
@@ -449,7 +449,7 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
 %% renew_leases - send lease confirmations to agents
 
 renew_leases(#{agents := AgentStates} = Data) ->
-    ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
+    ?tp(debug, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
     ok = lists:foreach(
         fun({Agent, AgentState}) ->
             renew_lease(Data, Agent, AgentState)
@@ -492,7 +492,7 @@ drop_timeout_agents(#{agents := Agents} = Data) ->
                     (is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now)
             of
                 true ->
-                    ?SLOG(info, #{
+                    ?SLOG(debug, #{
                         msg => leader_agent_timeout,
                         now => Now,
                         update_deadline => UpdateDeadline,
@@ -516,14 +516,14 @@ connect_agent(
     Agent,
     AgentMetadata
 ) ->
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => leader_agent_connected,
         agent => Agent,
         group_id => GroupId
     }),
     case Agents of
         #{Agent := AgentState} ->
-            ?tp(warning, shared_sub_leader_agent_already_connected, #{
+            ?tp(debug, shared_sub_leader_agent_already_connected, #{
                 agent => Agent
             }),
             reconnect_agent(Data, Agent, AgentMetadata, AgentState);
@@ -546,7 +546,7 @@ reconnect_agent(
     AgentMetadata,
     #{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState
 ) ->
-    ?tp(warning, shared_sub_leader_agent_reconnect, #{
+    ?tp(debug, shared_sub_leader_agent_reconnect, #{
         agent => Agent,
         agent_metadata => AgentMetadata,
         inherited_streams => OldStreams
@@ -767,7 +767,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
 disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
     case get_agent_state(Data0, Agent) of
         #{version := Version} ->
-            ?tp(warning, shared_sub_leader_disconnect_agent, #{
+            ?tp(debug, shared_sub_leader_disconnect_agent, #{
                 agent => Agent,
                 version => Version
             }),
@@ -794,7 +794,7 @@ agent_transition_to_waiting_updating(
     Streams,
     RevokedStreams
 ) ->
-    ?tp(warning, shared_sub_leader_agent_state_transition, #{
+    ?tp(debug, shared_sub_leader_agent_state_transition, #{
         agent => Agent,
         old_state => OldState,
         new_state => ?waiting_updating
@@ -818,7 +818,7 @@ agent_transition_to_waiting_updating(
 agent_transition_to_waiting_replaying(
     #{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
 ) ->
-    ?tp(warning, shared_sub_leader_agent_state_transition, #{
+    ?tp(debug, shared_sub_leader_agent_state_transition, #{
         agent => Agent,
         old_state => OldState,
         new_state => ?waiting_replaying
@@ -833,7 +833,7 @@ agent_transition_to_waiting_replaying(
 agent_transition_to_initial_waiting_replaying(
     #{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams
 ) ->
-    ?tp(warning, shared_sub_leader_agent_state_transition, #{
+    ?tp(debug, shared_sub_leader_agent_state_transition, #{
         agent => Agent,
         old_state => none,
         new_state => ?waiting_replaying
@@ -856,7 +856,7 @@ agent_transition_to_initial_waiting_replaying(
     renew_no_replaying_deadline(AgentState).
 
 agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) ->
-    ?tp(warning, shared_sub_leader_agent_state_transition, #{
+    ?tp(debug, shared_sub_leader_agent_state_transition, #{
         agent => Agent,
         old_state => ?waiting_replaying,
         new_state => ?replaying
@@ -868,7 +868,7 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState
     }.
 
 agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) ->
-    ?tp(warning, shared_sub_leader_agent_state_transition, #{
+    ?tp(debug, shared_sub_leader_agent_state_transition, #{
         agent => Agent,
         old_state => ?waiting_updating,
         new_state => ?updating
@@ -995,7 +995,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) ->
     #{streams := Streams, revoked_streams := RevokedStreams} = AgentState,
     AllStreams = Streams ++ RevokedStreams,
     Data1 = unassign_streams(Data0, AllStreams),
-    ?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}),
+    ?tp(debug, shared_sub_leader_drop_agent, #{agent => Agent}),
     Data1#{agents => maps:remove(Agent, Agents)}.
 
 invalidate_agent(#{group_id := GroupId}, Agent) ->

+ 1 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl

@@ -55,7 +55,7 @@ set_replayed({{RankX, RankY}, Stream}, State) ->
             State#{RankX => #{min_y => MinY, ys => Ys2}};
         _ ->
             ?SLOG(
-                warning,
+                debug,
                 #{
                     msg => leader_rank_progress_double_or_invalid_update,
                     rank_x => RankX,

+ 32 - 140
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl

@@ -22,12 +22,6 @@
 ]).
 
 -export([
-    format_stream_progresses/1,
-    format_stream_progress/1,
-    format_stream_key/1,
-    format_stream_keys/1,
-    format_lease_event/1,
-    format_lease_events/1,
     agent/2
 ]).
 
@@ -57,6 +51,20 @@
     agent_metadata/0
 ]).
 
+-define(log_agent_msg(ToLeader, Msg),
+    ?tp(debug, shared_sub_proto_msg, #{
+        to_leader => ToLeader,
+        msg => emqx_ds_shared_sub_proto_format:format_agent_msg(Msg)
+    })
+).
+
+-define(log_leader_msg(ToAgent, Msg),
+    ?tp(debug, shared_sub_proto_msg, #{
+        to_agent => ToAgent,
+        msg => emqx_ds_shared_sub_proto_format:format_leader_msg(Msg)
+    })
+).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
@@ -67,15 +75,7 @@
 agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_connect_leader,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        agent_metadata => AgentMetadata,
-        share_topic_filter => ShareTopicFilter
-    }),
-    _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)),
-    ok;
+    send_agent_msg(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter));
 agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
     emqx_ds_shared_sub_proto_v1:agent_connect_leader(
         ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
@@ -85,15 +85,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_update_stream_states,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        stream_progresses => format_stream_progresses(StreamProgresses),
-        version => Version
-    }),
-    _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
-    ok;
+    send_agent_msg(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version));
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
     emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
         ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
@@ -105,18 +97,9 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_update_stream_states,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        stream_progresses => format_stream_progresses(StreamProgresses),
-        version_old => VersionOld,
-        version_new => VersionNew
-    }),
-    _ = erlang:send(
+    send_agent_msg(
         ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
-    ),
-    ok;
+    );
 agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
     emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
         ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
@@ -125,15 +108,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
 agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when
     ?is_local_leader(ToLeader)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => agent_disconnect,
-        to_leader => ToLeader,
-        from_agent => FromAgent,
-        stream_progresses => format_stream_progresses(StreamProgresses),
-        version => Version
-    }),
-    _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)),
-    ok;
+    send_agent_msg(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version));
 agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
     emqx_ds_shared_sub_proto_v1:agent_disconnect(
         ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
@@ -144,19 +119,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
 -spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) ->
     ok.
 leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_lease_streams,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        leader => Leader,
-        streams => format_stream_progresses(Streams),
-        version => Version
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_lease_streams(OfGroup, Leader, Streams, Version)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_lease_streams(OfGroup, Leader, Streams, Version));
 leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
     emqx_ds_shared_sub_proto_v1:leader_lease_streams(
         ?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version
@@ -164,17 +127,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
 
 -spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
 leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_renew_stream_lease,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        version => Version
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_renew_stream_lease(OfGroup, Version)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, Version));
 leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
     emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
         ?agent_node(ToAgent), ToAgent, OfGroup, Version
@@ -182,18 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
 
 -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
 leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_renew_stream_lease,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        version_old => VersionOld,
-        version_new => VersionNew
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew));
 leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
     emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
         ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew
@@ -204,19 +146,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
 leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
     ?is_local_agent(ToAgent)
 ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_update_streams,
-        to_agent => ToAgent,
-        of_group => OfGroup,
-        version_old => VersionOld,
-        version_new => VersionNew,
-        streams_new => format_stream_progresses(StreamsNew)
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew));
 leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
     emqx_ds_shared_sub_proto_v1:leader_update_streams(
         ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
@@ -224,16 +154,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
 
 -spec leader_invalidate(agent(), group()) -> ok.
 leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) ->
-    ?tp(warning, shared_sub_proto_msg, #{
-        type => leader_invalidate,
-        to_agent => ToAgent,
-        of_group => OfGroup
-    }),
-    _ = emqx_persistent_session_ds_shared_subs_agent:send(
-        ?agent_pid(ToAgent),
-        ?leader_invalidate(OfGroup)
-    ),
-    ok;
+    send_leader_msg(ToAgent, ?leader_invalidate(OfGroup));
 leader_invalidate(ToAgent, OfGroup) ->
     emqx_ds_shared_sub_proto_v1:leader_invalidate(
         ?agent_node(ToAgent), ToAgent, OfGroup
@@ -247,41 +168,12 @@ agent(Id, Pid) ->
     _ = Id,
     ?agent(Id, Pid).
 
-format_stream_progresses(Streams) ->
-    lists:map(
-        fun format_stream_progress/1,
-        Streams
-    ).
-
-format_stream_progress(#{stream := Stream, progress := Progress} = Value) ->
-    Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}.
-
-format_progress(#{iterator := Iterator} = Progress) ->
-    Progress#{iterator => format_opaque(Iterator)}.
-
-format_stream_key({SubId, Stream}) ->
-    {SubId, format_opaque(Stream)}.
-
-format_stream_keys(StreamKeys) ->
-    lists:map(
-        fun format_stream_key/1,
-        StreamKeys
-    ).
-
-format_lease_events(Events) ->
-    lists:map(
-        fun format_lease_event/1,
-        Events
-    ).
-
-format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
-    Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
-format_lease_event(#{stream := Stream} = Event) ->
-    Event#{stream => format_opaque(Stream)}.
-
-%%--------------------------------------------------------------------
-%% Helpers
-%%--------------------------------------------------------------------
+send_agent_msg(ToLeader, Msg) ->
+    ?log_agent_msg(ToLeader, Msg),
+    _ = erlang:send(ToLeader, Msg),
+    ok.
 
-format_opaque(Opaque) ->
-    erlang:phash2(Opaque).
+send_leader_msg(ToAgent, Msg) ->
+    ?log_leader_msg(ToAgent, Msg),
+    _ = emqx_persistent_session_ds_shared_subs_agent:send(?agent_pid(ToAgent), Msg),
+    ok.

+ 100 - 79
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl

@@ -12,146 +12,167 @@
 
 %% agent messages, sent from agent side to the leader
 
--define(agent_connect_leader_msg, agent_connect_leader).
--define(agent_update_stream_states_msg, agent_update_stream_states).
--define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout).
--define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout).
--define(agent_disconnect_msg, agent_disconnect).
+-define(agent_connect_leader_msg, 1).
+-define(agent_update_stream_states_msg, 2).
+-define(agent_connect_leader_timeout_msg, 3).
+-define(agent_renew_stream_lease_timeout_msg, 4).
+-define(agent_disconnect_msg, 5).
+
+%% message keys (used used not to send atoms over the network)
+-define(agent_msg_type, 1).
+-define(agent_msg_agent, 2).
+-define(agent_msg_share_topic_filter, 3).
+-define(agent_msg_agent_metadata, 4).
+-define(agent_msg_stream_states, 5).
+-define(agent_msg_version, 6).
+-define(agent_msg_version_old, 7).
+-define(agent_msg_version_new, 8).
 
 %% Agent messages sent to the leader.
 %% Leader talks to many agents, `agent` field is used to identify the sender.
 
 -define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{
-    type => ?agent_connect_leader_msg,
-    share_topic_filter => ShareTopicFilter,
-    agent_metadata => AgentMetadata,
-    agent => Agent
+    ?agent_msg_type => ?agent_connect_leader_msg,
+    ?agent_msg_share_topic_filter => ShareTopicFilter,
+    ?agent_msg_agent_metadata => AgentMetadata,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{
-    type := ?agent_connect_leader_msg,
-    share_topic_filter := ShareTopicFilter,
-    agent_metadata := AgentMetadata,
-    agent := Agent
+    ?agent_msg_type := ?agent_connect_leader_msg,
+    ?agent_msg_share_topic_filter := ShareTopicFilter,
+    ?agent_msg_agent_metadata := AgentMetadata,
+    ?agent_msg_agent := Agent
 }).
 
 -define(agent_update_stream_states(Agent, StreamStates, Version), #{
-    type => ?agent_update_stream_states_msg,
-    stream_states => StreamStates,
-    version => Version,
-    agent => Agent
+    ?agent_msg_type => ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states => StreamStates,
+    ?agent_msg_version => Version,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_update_stream_states_match(Agent, StreamStates, Version), #{
-    type := ?agent_update_stream_states_msg,
-    stream_states := StreamStates,
-    version := Version,
-    agent := Agent
+    ?agent_msg_type := ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states := StreamStates,
+    ?agent_msg_version := Version,
+    ?agent_msg_agent := Agent
 }).
 
 -define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{
-    type => ?agent_update_stream_states_msg,
-    stream_states => StreamStates,
-    version_old => VersionOld,
-    version_new => VersionNew,
-    agent => Agent
+    ?agent_msg_type => ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states => StreamStates,
+    ?agent_msg_version_old => VersionOld,
+    ?agent_msg_version_new => VersionNew,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{
-    type := ?agent_update_stream_states_msg,
-    stream_states := StreamStates,
-    version_old := VersionOld,
-    version_new := VersionNew,
-    agent := Agent
+    ?agent_msg_type := ?agent_update_stream_states_msg,
+    ?agent_msg_stream_states := StreamStates,
+    ?agent_msg_version_old := VersionOld,
+    ?agent_msg_version_new := VersionNew,
+    ?agent_msg_agent := Agent
 }).
 
 -define(agent_disconnect(Agent, StreamStates, Version), #{
-    type => ?agent_disconnect_msg,
-    stream_states => StreamStates,
-    version => Version,
-    agent => Agent
+    ?agent_msg_type => ?agent_disconnect_msg,
+    ?agent_msg_stream_states => StreamStates,
+    ?agent_msg_version => Version,
+    ?agent_msg_agent => Agent
 }).
 
 -define(agent_disconnect_match(Agent, StreamStates, Version), #{
-    type := ?agent_disconnect_msg,
-    stream_states := StreamStates,
-    version := Version,
-    agent := Agent
+    ?agent_msg_type := ?agent_disconnect_msg,
+    ?agent_msg_stream_states := StreamStates,
+    ?agent_msg_version := Version,
+    ?agent_msg_agent := Agent
 }).
 
 %% leader messages, sent from the leader to the agent
 %% Agent may have several shared subscriptions, so may talk to several leaders
 %% `group_id` field is used to identify the leader.
 
--define(leader_lease_streams_msg, leader_lease_streams).
--define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
+-define(leader_lease_streams_msg, 101).
+-define(leader_renew_stream_lease_msg, 102).
+-define(leader_update_streams, 103).
+-define(leader_invalidate, 104).
+
+-define(leader_msg_type, 101).
+-define(leader_msg_streams, 102).
+-define(leader_msg_version, 103).
+-define(leader_msg_version_old, 104).
+-define(leader_msg_version_new, 105).
+-define(leader_msg_streams_new, 106).
+-define(leader_msg_leader, 107).
+-define(leader_msg_group_id, 108).
 
 -define(leader_lease_streams(GrouId, Leader, Streams, Version), #{
-    type => ?leader_lease_streams_msg,
-    streams => Streams,
-    version => Version,
-    leader => Leader,
-    group_id => GrouId
+    ?leader_msg_type => ?leader_lease_streams_msg,
+    ?leader_msg_streams => Streams,
+    ?leader_msg_version => Version,
+    ?leader_msg_leader => Leader,
+    ?leader_msg_group_id => GrouId
 }).
 
 -define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{
-    type := ?leader_lease_streams_msg,
-    streams := Streams,
-    version := Version,
-    leader := Leader,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_lease_streams_msg,
+    ?leader_msg_streams := Streams,
+    ?leader_msg_version := Version,
+    ?leader_msg_leader := Leader,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_renew_stream_lease(GroupId, Version), #{
-    type => ?leader_renew_stream_lease_msg,
-    version => Version,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_renew_stream_lease_msg,
+    ?leader_msg_version => Version,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_renew_stream_lease_match(GroupId, Version), #{
-    type := ?leader_renew_stream_lease_msg,
-    version := Version,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_renew_stream_lease_msg,
+    ?leader_msg_version := Version,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{
-    type => ?leader_renew_stream_lease_msg,
-    version_old => VersionOld,
-    version_new => VersionNew,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_renew_stream_lease_msg,
+    ?leader_msg_version_old => VersionOld,
+    ?leader_msg_version_new => VersionNew,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{
-    type := ?leader_renew_stream_lease_msg,
-    version_old := VersionOld,
-    version_new := VersionNew,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_renew_stream_lease_msg,
+    ?leader_msg_version_old := VersionOld,
+    ?leader_msg_version_new := VersionNew,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{
-    type => leader_update_streams,
-    version_old => VersionOld,
-    version_new => VersionNew,
-    streams_new => StreamsNew,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_update_streams,
+    ?leader_msg_version_old => VersionOld,
+    ?leader_msg_version_new => VersionNew,
+    ?leader_msg_streams_new => StreamsNew,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{
-    type := leader_update_streams,
-    version_old := VersionOld,
-    version_new := VersionNew,
-    streams_new := StreamsNew,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_update_streams,
+    ?leader_msg_version_old := VersionOld,
+    ?leader_msg_version_new := VersionNew,
+    ?leader_msg_streams_new := StreamsNew,
+    ?leader_msg_group_id := GroupId
 }).
 
 -define(leader_invalidate(GroupId), #{
-    type => leader_invalidate,
-    group_id => GroupId
+    ?leader_msg_type => ?leader_invalidate,
+    ?leader_msg_group_id => GroupId
 }).
 
 -define(leader_invalidate_match(GroupId), #{
-    type := leader_invalidate,
-    group_id := GroupId
+    ?leader_msg_type := ?leader_invalidate,
+    ?leader_msg_group_id := GroupId
 }).
 
 %% Helpers

+ 82 - 0
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl

@@ -0,0 +1,82 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ds_shared_sub_proto_format).
+
+-include("emqx_ds_shared_sub_proto.hrl").
+
+-export([format_agent_msg/1, format_leader_msg/1]).
+
+%%--------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------
+
+format_agent_msg(Msg) ->
+    maps:from_list(
+        lists:map(
+            fun({K, V}) ->
+                FormattedKey = agent_msg_key(K),
+                {FormattedKey, format_agent_msg_value(FormattedKey, V)}
+            end,
+            maps:to_list(Msg)
+        )
+    ).
+
+format_leader_msg(Msg) ->
+    maps:from_list(
+        lists:map(
+            fun({K, V}) ->
+                FormattedKey = leader_msg_key(K),
+                {FormattedKey, format_leader_msg_value(FormattedKey, V)}
+            end,
+            maps:to_list(Msg)
+        )
+    ).
+
+%%--------------------------------------------------------------------
+%% Internal functions
+%%--------------------------------------------------------------------
+
+format_agent_msg_value(agent_msg_type, Type) ->
+    agent_msg_type(Type);
+format_agent_msg_value(agent_msg_stream_states, StreamStates) ->
+    emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates);
+format_agent_msg_value(_, Value) ->
+    Value.
+
+format_leader_msg_value(leader_msg_type, Type) ->
+    leader_msg_type(Type);
+format_leader_msg_value(leader_msg_streams, Streams) ->
+    emqx_persistent_session_ds_shared_subs:format_lease_events(Streams);
+format_leader_msg_value(_, Value) ->
+    Value.
+
+agent_msg_type(?agent_connect_leader_msg) -> agent_connect_leader_msg;
+agent_msg_type(?agent_update_stream_states_msg) -> agent_update_stream_states_msg;
+agent_msg_type(?agent_connect_leader_timeout_msg) -> agent_connect_leader_timeout_msg;
+agent_msg_type(?agent_renew_stream_lease_timeout_msg) -> agent_renew_stream_lease_timeout_msg;
+agent_msg_type(?agent_disconnect_msg) -> agent_disconnect_msg.
+
+agent_msg_key(?agent_msg_type) -> agent_msg_type;
+agent_msg_key(?agent_msg_agent) -> agent_msg_agent;
+agent_msg_key(?agent_msg_share_topic_filter) -> agent_msg_share_topic_filter;
+agent_msg_key(?agent_msg_agent_metadata) -> agent_msg_agent_metadata;
+agent_msg_key(?agent_msg_stream_states) -> agent_msg_stream_states;
+agent_msg_key(?agent_msg_version) -> agent_msg_version;
+agent_msg_key(?agent_msg_version_old) -> agent_msg_version_old;
+agent_msg_key(?agent_msg_version_new) -> agent_msg_version_new.
+
+leader_msg_type(?leader_lease_streams_msg) -> leader_lease_streams_msg;
+leader_msg_type(?leader_renew_stream_lease_msg) -> leader_renew_stream_lease_msg;
+leader_msg_type(?leader_update_streams) -> leader_update_streams;
+leader_msg_type(?leader_invalidate) -> leader_invalidate.
+
+leader_msg_key(?leader_msg_type) -> leader_msg_type;
+leader_msg_key(?leader_msg_streams) -> leader_msg_streams;
+leader_msg_key(?leader_msg_version) -> leader_msg_version;
+leader_msg_key(?leader_msg_version_old) -> leader_msg_version_old;
+leader_msg_key(?leader_msg_version_new) -> leader_msg_version_new;
+leader_msg_key(?leader_msg_streams_new) -> leader_msg_streams_new;
+leader_msg_key(?leader_msg_leader) -> leader_msg_leader;
+leader_msg_key(?leader_msg_group_id) -> leader_msg_group_id.

+ 1 - 1
apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl

@@ -113,7 +113,7 @@ do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) ->
             Pid ->
                 Pid
         end,
-    ?SLOG(info, #{
+    ?SLOG(debug, #{
         msg => lookup_leader,
         agent => Agent,
         share_topic_filter => ShareTopicFilter,

+ 1 - 1
apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl

@@ -417,7 +417,7 @@ t_lease_reconnect(_Config) ->
 
     ?assertWaitEvent(
         {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1),
-        #{?snk_kind := find_leader_timeout},
+        #{?snk_kind := group_sm_find_leader_timeout},
         5_000
     ),