Jelajahi Sumber

fix(ds): Fix idle event generation in bitfield_lts layout

ieQu1 1 tahun lalu
induk
melakukan
29345aaa30

+ 1 - 6
apps/emqx/src/emqx_ds_schema.erl

@@ -246,7 +246,7 @@ fields(layout_builtin_reference) ->
                 reference,
                 reference,
                 #{
                 #{
                     'readOnly' => true,
                     'readOnly' => true,
-                    importance => ?IMPORTANCE_HIDDEN
+                    importance => ?IMPORTANCE_LOW
                 }
                 }
             )}
             )}
     ].
     ].
@@ -273,17 +273,12 @@ ds_schema(Options) ->
         Options
         Options
     ).
     ).
 
 
--ifndef(TEST).
-builtin_layouts() ->
-    [ref(layout_builtin_wildcard_optimized)].
--else.
 builtin_layouts() ->
 builtin_layouts() ->
     %% Reference layout stores everything in one stream, so it's not
     %% Reference layout stores everything in one stream, so it's not
     %% suitable for production use. However, it's very simple and
     %% suitable for production use. However, it's very simple and
     %% produces a very predictabale replay order, which can be useful
     %% produces a very predictabale replay order, which can be useful
     %% for testing and debugging:
     %% for testing and debugging:
     [ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
     [ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
--endif.
 
 
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 sc(Type, Meta) -> hoconsc:mk(Type, Meta).
 
 

+ 5 - 3
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -161,7 +161,7 @@
 
 
 %% GVar used for idle detection:
 %% GVar used for idle detection:
 -define(IDLE_DETECT, idle_detect).
 -define(IDLE_DETECT, idle_detect).
--define(EPOCH(S, TS), (TS bsl S#s.ts_bits)).
+-define(EPOCH(S, TS), (TS bsr S#s.ts_offset)).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -458,7 +458,7 @@ next(
     SafeCutoffTime =
     SafeCutoffTime =
         case HasCutoff of
         case HasCutoff of
             true ->
             true ->
-                (Now bsr TSOffset) bsl TSOffset;
+                ?EPOCH(Schema, Now) bsl TSOffset;
             false ->
             false ->
                 1 bsl TSBits - 1
                 1 bsl TSBits - 1
         end,
         end,
@@ -561,13 +561,15 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) ->
             LastWrittenTs = 0
             LastWrittenTs = 0
     end,
     end,
     case Latch of
     case Latch of
-        false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) ->
+        false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 ->
             ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
             ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}),
             [dummy_event];
             [dummy_event];
         _ ->
         _ ->
             []
             []
     end;
     end;
 handle_event(_ShardId, _Data, _Time, _Event) ->
 handle_event(_ShardId, _Data, _Time, _Event) ->
+    %% `dummy_event' goes here and does nothing. But it forces update
+    %% of `Time' in the replication layer.
     [].
     [].
 
 
 %%================================================================================
 %%================================================================================

+ 1 - 1
apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl

@@ -183,7 +183,7 @@ t_rebalance(Config) ->
             ],
             ],
             Stream1 = emqx_utils_stream:interleave(
             Stream1 = emqx_utils_stream:interleave(
                 [
                 [
-                    {50, Stream0},
+                    {10, Stream0},
                     emqx_utils_stream:const(add_generation)
                     emqx_utils_stream:const(add_generation)
                 ],
                 ],
                 false
                 false