|
|
@@ -132,10 +132,10 @@ init_data(#share{topic = Topic} = ShareTopicFilter, StartTime) ->
|
|
|
Group = group_name(ShareTopicFilter),
|
|
|
case emqx_ds_shared_sub_leader_store:open(Group) of
|
|
|
Store when Store =/= false ->
|
|
|
- ?tp(warning, shared_sub_leader_store_open, #{topic => ShareTopicFilter, store => Store}),
|
|
|
+ ?tp(debug, shared_sub_leader_store_open, #{topic => ShareTopicFilter, store => Store}),
|
|
|
ok;
|
|
|
false ->
|
|
|
- ?tp(warning, shared_sub_leader_store_init, #{topic => ShareTopicFilter}),
|
|
|
+ ?tp(debug, shared_sub_leader_store_init, #{topic => ShareTopicFilter}),
|
|
|
RankProgress = emqx_ds_shared_sub_leader_rank_progress:init(),
|
|
|
Store0 = emqx_ds_shared_sub_leader_store:init(Group),
|
|
|
Store1 = emqx_ds_shared_sub_leader_store:set(start_time, StartTime, Store0),
|
|
|
@@ -162,7 +162,7 @@ init_claim_renewal(_Data = #{leader_claim := Claim}) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% repalying state
|
|
|
handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
|
|
|
- ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}),
|
|
|
+ ?tp(debug, shared_sub_leader_enter_actve, #{topic => Topic}),
|
|
|
{keep_state_and_data, [
|
|
|
{{timeout, #renew_streams{}}, 0, #renew_streams{}},
|
|
|
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}},
|
|
|
@@ -407,7 +407,7 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) ->
|
|
|
false ->
|
|
|
AgentState0;
|
|
|
true ->
|
|
|
- ?tp(warning, shared_sub_leader_revoke_streams, #{
|
|
|
+ ?tp(debug, shared_sub_leader_revoke_streams, #{
|
|
|
agent => Agent,
|
|
|
agent_stream_count => length(Streams0),
|
|
|
revoke_count => RevokeCount,
|
|
|
@@ -460,7 +460,7 @@ assign_lacking_streams(Data0, Agent, DesiredCount) ->
|
|
|
false ->
|
|
|
Data0;
|
|
|
true ->
|
|
|
- ?tp(warning, shared_sub_leader_assign_streams, #{
|
|
|
+ ?tp(debug, shared_sub_leader_assign_streams, #{
|
|
|
agent => Agent,
|
|
|
agent_stream_count => length(Streams0),
|
|
|
assign_count => AssignCount,
|
|
|
@@ -488,7 +488,7 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
|
|
|
%% renew_leases - send lease confirmations to agents
|
|
|
|
|
|
renew_leases(#{agents := AgentStates} = Data) ->
|
|
|
- ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
|
|
|
+ ?tp(debug, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}),
|
|
|
ok = lists:foreach(
|
|
|
fun({Agent, AgentState}) ->
|
|
|
renew_lease(Data, Agent, AgentState)
|
|
|
@@ -562,7 +562,7 @@ connect_agent(
|
|
|
}),
|
|
|
case Agents of
|
|
|
#{Agent := AgentState} ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_already_connected, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_already_connected, #{
|
|
|
agent => Agent
|
|
|
}),
|
|
|
reconnect_agent(Data, Agent, AgentMetadata, AgentState);
|
|
|
@@ -585,7 +585,7 @@ reconnect_agent(
|
|
|
AgentMetadata,
|
|
|
#{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState
|
|
|
) ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_reconnect, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_reconnect, #{
|
|
|
agent => Agent,
|
|
|
agent_metadata => AgentMetadata,
|
|
|
inherited_streams => OldStreams
|
|
|
@@ -820,7 +820,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
|
|
|
disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) ->
|
|
|
case get_agent_state(Data0, Agent) of
|
|
|
#{version := Version} ->
|
|
|
- ?tp(warning, shared_sub_leader_disconnect_agent, #{
|
|
|
+ ?tp(debug, shared_sub_leader_disconnect_agent, #{
|
|
|
agent => Agent,
|
|
|
version => Version
|
|
|
}),
|
|
|
@@ -847,7 +847,7 @@ agent_transition_to_waiting_updating(
|
|
|
Streams,
|
|
|
RevokedStreams
|
|
|
) ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_state_transition, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_state_transition, #{
|
|
|
agent => Agent,
|
|
|
old_state => OldState,
|
|
|
new_state => ?waiting_updating
|
|
|
@@ -871,7 +871,7 @@ agent_transition_to_waiting_updating(
|
|
|
agent_transition_to_waiting_replaying(
|
|
|
#{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
|
|
|
) ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_state_transition, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_state_transition, #{
|
|
|
agent => Agent,
|
|
|
old_state => OldState,
|
|
|
new_state => ?waiting_replaying
|
|
|
@@ -886,7 +886,7 @@ agent_transition_to_waiting_replaying(
|
|
|
agent_transition_to_initial_waiting_replaying(
|
|
|
#{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams
|
|
|
) ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_state_transition, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_state_transition, #{
|
|
|
agent => Agent,
|
|
|
old_state => none,
|
|
|
new_state => ?waiting_replaying
|
|
|
@@ -909,7 +909,7 @@ agent_transition_to_initial_waiting_replaying(
|
|
|
renew_no_replaying_deadline(AgentState).
|
|
|
|
|
|
agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_state_transition, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_state_transition, #{
|
|
|
agent => Agent,
|
|
|
old_state => ?waiting_replaying,
|
|
|
new_state => ?replaying
|
|
|
@@ -921,7 +921,7 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState
|
|
|
}.
|
|
|
|
|
|
agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) ->
|
|
|
- ?tp(warning, shared_sub_leader_agent_state_transition, #{
|
|
|
+ ?tp(debug, shared_sub_leader_agent_state_transition, #{
|
|
|
agent => Agent,
|
|
|
old_state => ?waiting_updating,
|
|
|
new_state => ?updating
|
|
|
@@ -1047,7 +1047,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) ->
|
|
|
#{streams := Streams, revoked_streams := RevokedStreams} = AgentState,
|
|
|
AllStreams = Streams ++ RevokedStreams,
|
|
|
Data1 = unassign_streams(Data0, AllStreams),
|
|
|
- ?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}),
|
|
|
+ ?tp(debug, shared_sub_leader_drop_agent, #{agent => Agent}),
|
|
|
Data1#{agents => maps:remove(Agent, Agents)}.
|
|
|
|
|
|
invalidate_agent(#{group_id := GroupId}, Agent) ->
|