|
|
@@ -24,13 +24,11 @@
|
|
|
start_link/1
|
|
|
]).
|
|
|
|
|
|
--behaviour(gen_server).
|
|
|
+-behaviour(gen_statem).
|
|
|
-export([
|
|
|
+ callback_mode/0,
|
|
|
init/1,
|
|
|
- handle_continue/2,
|
|
|
- handle_call/3,
|
|
|
- handle_cast/2,
|
|
|
- handle_info/2
|
|
|
+ handle_event/4
|
|
|
]).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -38,7 +36,7 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
start_link(ShareTopic) ->
|
|
|
- gen_server:start_link(?MODULE, {elect, ShareTopic}, []).
|
|
|
+ gen_statem:start_link(?MODULE, {elect, ShareTopic}, []).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
@@ -50,28 +48,24 @@ start_link(ShareTopic) ->
|
|
|
alive_until :: non_neg_integer()
|
|
|
}).
|
|
|
|
|
|
+callback_mode() ->
|
|
|
+ handle_event_function.
|
|
|
+
|
|
|
init(Elect = {elect, _ShareTopic}) ->
|
|
|
%% NOTE
|
|
|
%% Important to have it here, because this process can become
|
|
|
%% `emqx_ds_shared_sub_leader`, which has `terminate/2` logic.
|
|
|
_ = erlang:process_flag(trap_exit, true),
|
|
|
- {ok, #{}, {continue, Elect}}.
|
|
|
-
|
|
|
-handle_continue({elect, ShareTopic}, _State) ->
|
|
|
- elect(ShareTopic, _TS = emqx_message:timestamp_now()).
|
|
|
-
|
|
|
-handle_call(_Request, _From, State) ->
|
|
|
- {reply, {error, unknown_request}, State}.
|
|
|
-
|
|
|
-handle_cast(_Cast, State) ->
|
|
|
- {noreply, State}.
|
|
|
+ {ok, electing, undefined, {next_event, internal, Elect}}.
|
|
|
|
|
|
-handle_info(?agent_connect_leader_match(Agent, AgentMetadata, _ShareTopic), State) ->
|
|
|
+handle_event(internal, {elect, ShareTopic}, electing, _) ->
|
|
|
+ elect(ShareTopic, _TS = emqx_message:timestamp_now());
|
|
|
+handle_event(info, ?agent_connect_leader_match(Agent, Metadata, _ShareTopic), follower, Data) ->
|
|
|
%% NOTE: Redirecting to the known leader.
|
|
|
- ok = connect_leader(Agent, AgentMetadata, State),
|
|
|
- {noreply, State};
|
|
|
-handle_info({timeout, _TRef, invalidate}, State) ->
|
|
|
- {stop, {shutdown, invalidate}, State}.
|
|
|
+ ok = connect_leader(Agent, Metadata, Data),
|
|
|
+ keep_state_and_data;
|
|
|
+handle_event(state_timeout, invalidate, follower, _Data) ->
|
|
|
+ {stop, {shutdown, invalidate}}.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Internal functions
|
|
|
@@ -100,13 +94,12 @@ elect(ShareTopic, TS) ->
|
|
|
until => AliveUntil
|
|
|
}),
|
|
|
TTL = AliveUntil - TS,
|
|
|
- _TRef = erlang:start_timer(max(0, TTL), self(), invalidate),
|
|
|
- St = #follower{
|
|
|
+ Data = #follower{
|
|
|
topic = ShareTopic,
|
|
|
leader = emqx_ds_shared_sub_store:leader_id(LeaderClaim),
|
|
|
alive_until = AliveUntil
|
|
|
},
|
|
|
- {noreply, St};
|
|
|
+ {next_state, follower, Data, {state_timeout, max(0, TTL), invalidate}};
|
|
|
{error, Class, Reason} = Error ->
|
|
|
?tp(warning, "Shared subscription leader election failed", #{
|
|
|
id => ShareTopic,
|
|
|
@@ -117,7 +110,7 @@ elect(ShareTopic, TS) ->
|
|
|
recoverable -> StopReason = {shutdown, Reason};
|
|
|
unrecoverable -> StopReason = Error
|
|
|
end,
|
|
|
- {stop, StopReason, ShareTopic}
|
|
|
+ {stop, StopReason}
|
|
|
end.
|
|
|
|
|
|
connect_leader(Agent, AgentMetadata, #follower{topic = ShareTopic, leader = Pid}) ->
|