Просмотр исходного кода

fix(ds): Allow to write batches to older generations

ieQu1 1 год назад
Родитель
Сommit
6eb04f90a3
1 измененных файлов с 38 добавлено и 15 удалено
  1. 38 15
      apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

+ 38 - 15
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -301,26 +301,30 @@ store_batch(Shard, Messages, Options) ->
     [{emqx_ds:time(), emqx_types:message()}],
     emqx_ds:message_store_opts()
 ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
-prepare_batch(Shard, Messages = [_ | _], Options) ->
+prepare_batch(Shard, Messages = [{Time, _} | _], Options) ->
     %% NOTE
     %% We assume that batches do not span generations. Callers should enforce this.
     ?tp(emqx_ds_storage_layer_prepare_batch, #{
         shard => Shard, messages => Messages, options => Options
     }),
-    GenId = generation_current(Shard),
-    #{module := Mod, data := GenData} = generation_get(Shard, GenId),
-    T0 = erlang:monotonic_time(microsecond),
-    Result =
-        case Mod:prepare_batch(Shard, GenData, Messages, Options) of
-            {ok, CookedBatch} ->
-                {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
-            Error = {error, _, _} ->
-                Error
-        end,
-    T1 = erlang:monotonic_time(microsecond),
-    %% TODO store->prepare
-    emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
-    Result;
+    %% FIXME: always store messages in the current generation
+    case generation_at(Shard, Time) of
+        {GenId, #{module := Mod, data := GenData}} ->
+            T0 = erlang:monotonic_time(microsecond),
+            Result =
+                case Mod:prepare_batch(Shard, GenData, Messages, Options) of
+                    {ok, CookedBatch} ->
+                        {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}};
+                    Error = {error, _, _} ->
+                        Error
+                end,
+            T1 = erlang:monotonic_time(microsecond),
+            %% TODO store->prepare
+            emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
+            Result;
+        not_found ->
+            ignore
+    end;
 prepare_batch(_Shard, [], _Options) ->
     ignore.
 
@@ -964,6 +968,25 @@ generation_current(Shard) ->
     #{current_generation := Current} = get_schema_runtime(Shard),
     Current.
 
+%% TODO: remove me
+-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()} | not_found.
+generation_at(Shard, Time) ->
+    Schema = #{current_generation := Current} = get_schema_runtime(Shard),
+    generation_at(Time, Current, Schema).
+
+generation_at(Time, GenId, Schema) ->
+    case Schema of
+        #{?GEN_KEY(GenId) := Gen} ->
+            case Gen of
+                #{since := Since} when Time < Since andalso GenId > 0 ->
+                    generation_at(Time, GenId - 1, Schema);
+                _ ->
+                    {GenId, Gen}
+            end;
+        _ ->
+            not_found
+    end.
+
 -spec generation_get(shard_id(), gen_id()) -> generation() | not_found.
 generation_get(Shard, GenId) ->
     case get_schema_runtime(Shard) of