|
|
@@ -102,7 +102,12 @@ start_link(Options) ->
|
|
|
become(ShareTopicFilter, StartTime, Claim) ->
|
|
|
Data0 = init_data(ShareTopicFilter, StartTime),
|
|
|
Data1 = attach_claim(Claim, Data0),
|
|
|
- Actions = init_claim_renewal(Data1),
|
|
|
+ case store_is_dirty(Data1) of
|
|
|
+ true ->
|
|
|
+ Actions = force_claim_renewal(Data1);
|
|
|
+ false ->
|
|
|
+ Actions = init_claim_renewal(Data1)
|
|
|
+ end,
|
|
|
gen_statem:enter_loop(?MODULE, [], ?leader_active, Data1, Actions).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -147,6 +152,9 @@ init_data(#share{topic = Topic} = ShareTopicFilter, StartTime) ->
|
|
|
attach_claim(Claim, Data) ->
|
|
|
Data#{leader_claim => Claim}.
|
|
|
|
|
|
+force_claim_renewal(_Data = #{}) ->
|
|
|
+ [{{timeout, #renew_leader_claim{}}, 0, #renew_leader_claim{}}].
|
|
|
+
|
|
|
init_claim_renewal(_Data = #{leader_claim := Claim}) ->
|
|
|
Interval = emqx_ds_shared_sub_leader_store:heartbeat_interval(Claim),
|
|
|
[{{timeout, #renew_leader_claim{}}, Interval, #renew_leader_claim{}}].
|
|
|
@@ -1068,6 +1076,9 @@ make_iterator(Stream, TopicFilter, StartTime) ->
|
|
|
|
|
|
%% Leader store
|
|
|
|
|
|
+store_is_dirty(#{store := Store}) ->
|
|
|
+ emqx_ds_shared_sub_leader_store:dirty(Store).
|
|
|
+
|
|
|
store_get_stream(#{store := Store}, ID) ->
|
|
|
emqx_ds_shared_sub_leader_store:get(stream, ID, Store).
|
|
|
|