ieQu1 1 год назад
Родитель
Сommit
59a09fb86f

+ 3 - 3
apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl

@@ -115,11 +115,11 @@ get_servers_local_preferred(DB, Shard) ->
         Servers when is_list(Servers) ->
             ok
     end,
-    case lists:keyfind(node(), 2, Servers) of
+    case lists:keytake(node(), 2, Servers) of
         false ->
             Servers;
-        Local when is_tuple(Local) ->
-            [Local | lists:delete(Local, Servers)]
+        {value, Local, Rest} ->
+            [Local | Rest]
     end.
 
 lookup_leader(DB, Shard) ->

+ 13 - 0
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -553,6 +553,17 @@ delete_next_until(
     end.
 
 handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
+    %% If the last message was published more than one epoch ago, and
+    %% the shard remains idle, we need to advance safety cutoff
+    %% interval to make sure the last epoch becomes visible to the
+    %% readers.
+    %%
+    %% We do so by emitting a dummy event that will be persisted by
+    %% the replication layer. Processing it will advance the
+    %% replication layer's clock.
+    %%
+    %% This operation is latched to avoid publishing events on every
+    %% tick.
     case ets:lookup(Gvars, ?IDLE_DETECT) of
         [{?IDLE_DETECT, Latch, LastWrittenTs}] ->
             ok;
@@ -562,6 +573,8 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
     end,
     case Latch of
         false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 ->
+            %% Note: + 1 above delays the event by one epoch to add a
+            %% safety margin.
             ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
             [dummy_event];
         _ ->

+ 10 - 0
changes/ce/fix-13072.en.md

@@ -0,0 +1,10 @@
+Various fixes related to the `durable_sessions` feature:
+
+- Add an option to execute read operations on the leader.
+- `drop_generation` operation can be replayed multiple times by the replication layer, but it's not idempotent. This PR adds a workaround that avoids a crash when `drop_generation` doesn't succeed. In the future, however, we want to make `drop_generation` idempotent in a nicer way.
+- Wrap storage layer events in a small structure containing the generation ID, to make sure events are handled by the same layout CBM & context that produced them.
+- Fix crash when storage event arrives to the dropped generation (now removed `storage_layer:generation_at` function didn't handle the case of dropped generations).
+- Implement `format_status` callback for several workers to minimize log spam
+- Move the responsibility of `end_of_stream` detection to the layout CBM. Previously storage layer used a heuristic: old generations that return an empty batch won't produce more data. This was, obviously, incorrect: for example, bitfield-LTS layout MAY return empty batch while waiting for safe cutoff time.
+- `reference` layout has been enabled in prod build. It could be useful for integration testing.
+- Fix incorrect epoch calculation in `bitfield_lts:handle_event` callback that lead to missed safe cutoff time updates, and effectively, subscribers being unable to fetch messages until a fresh batch was published.