ieQu1 1 год назад
Родитель
Сommit
d349f84f04

+ 14 - 7
apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl

@@ -135,16 +135,23 @@ t_05_update_iterator(Config) ->
 
 t_06_smoke_add_generation(Config) ->
     DB = ?FUNCTION_NAME,
+    BeginTime = os:system_time(millisecond),
+
     ?assertMatch(ok, emqx_ds:open_db(DB, opts(Config))),
-    ?assertMatch(
-        [{_, _}],
-        maps:to_list(emqx_ds:list_generations_with_lifetimes(DB))
+    [{Gen1, #{created_at := Created1, since := Since1, until := undefined}}] = maps:to_list(
+        emqx_ds:list_generations_with_lifetimes(DB)
     ),
+
     ?assertMatch(ok, emqx_ds:add_generation(DB)),
-    ?assertMatch(
-        [{_, _}, {_, _}],
-        maps:to_list(emqx_ds:list_generations_with_lifetimes(DB))
-    ).
+    [
+        {Gen1, #{created_at := Created1, since := Since1, until := Until1}},
+        {Gen2, #{created_at := Created2, since := Since2, until := undefined}}
+    ] = maps:to_list(emqx_ds:list_generations_with_lifetimes(DB)),
+    %% Check units of the return values (+/- 10s from test begin time):
+    ?give_or_take(BeginTime, 10_000, Created1),
+    ?give_or_take(BeginTime, 10_000, Created2),
+    ?give_or_take(BeginTime, 10_000, Since2),
+    ?give_or_take(BeginTime, 10_000, Until1).
 
 t_07_smoke_update_config(Config) ->
     DB = ?FUNCTION_NAME,

+ 11 - 1
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -155,7 +155,12 @@ list_generations_with_lifetimes(DB) ->
     lists:foldl(
         fun(Shard, Acc) ->
             maps:fold(
-                fun(GenId, Data, Acc1) ->
+                fun(GenId, Data0, Acc1) ->
+                    Data = maps:update_with(
+                        until,
+                        fun timeus_to_timestamp/1,
+                        maps:update_with(since, fun timeus_to_timestamp/1, Data0)
+                    ),
                     Acc1#{{Shard, GenId} => Data}
                 end,
                 Acc,
@@ -370,3 +375,8 @@ current_timestamp(ShardId) ->
 
 timestamp_to_timeus(TimestampMs) ->
     TimestampMs * 1000.
+
+timeus_to_timestamp(undefined) ->
+    undefined;
+timeus_to_timestamp(TimestampUs) ->
+    TimestampUs div 1000.

+ 0 - 1
apps/emqx_durable_storage/src/emqx_ds_buffer.erl

@@ -287,7 +287,6 @@ do_flush(
             lists:foreach(fun(From) -> gen_server:reply(From, ok) end, Replies),
             erlang:garbage_collect(),
             S#s{
-                callback_state = CallbackS,
                 n = 0,
                 n_bytes = 0,
                 queue = queue:new(),