浏览代码

fix(dsstate): preserve internal stream key order relations

Thales Macedo Garitezi 1 年之前
父节点
当前提交
bf7cf8efb8

+ 18 - 2
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -160,7 +160,11 @@
 -record(pmap, {table, key_mapping = #{}, cache, dirty}).
 
 -ifdef(STORE_STATE_IN_DS).
--type internal_key(_K) :: integer().
+-type internal_key(_K) ::
+    %% `?stream_domain'
+    binary()
+    %% other domains
+    | integer().
 -else.
 -type internal_key(K) :: K.
 -endif.
@@ -1061,12 +1065,18 @@ val_decode(_Domain, Bin) ->
 -spec key_encode(domain(), term()) -> binary().
 key_encode(?metadata_domain, _Key) ->
     ?metadata_domain_bin;
+key_encode(?stream_domain, Key) ->
+    %% The generated binary might still contain `$/', which would be confused with an
+    %% extra topic level.
+    binary:encode_hex(Key, uppercase);
 key_encode(_Domain, Key) ->
     integer_to_binary(Key).
 
 -spec key_decode(domain(), binary()) -> term().
 key_decode(?metadata_domain, Bin) ->
     Bin;
+key_decode(?stream_domain, Bin) ->
+    binary:decode_hex(Bin);
 key_decode(_Domain, Bin) ->
     binary_to_integer(Bin).
 %% ELSE ifdef(STORE_STATE_IN_DS).
@@ -1212,7 +1222,7 @@ get_or_gen_internal_key(K, KeyMapping, _Domain, _Cache) when
     IntK = maps:get(K, KeyMapping),
     {IntK, KeyMapping};
 get_or_gen_internal_key(K, KeyMapping0, Domain, Cache) ->
-    IntK = erlang:unique_integer(),
+    IntK = gen_internal_key(Domain, K),
     case cache_has_key(Domain, IntK, Cache) of
         true ->
             %% collision (node restarted?); just try again
@@ -1221,6 +1231,12 @@ get_or_gen_internal_key(K, KeyMapping0, Domain, Cache) ->
             KeyMapping = KeyMapping0#{K => IntK},
             {IntK, KeyMapping}
     end.
+
+gen_internal_key(?stream_domain, {Rank, _Stream}) ->
+    LSB = erlang:unique_integer(),
+    <<Rank:64, LSB:64>>;
+gen_internal_key(_Domain, _K) ->
+    erlang:unique_integer().
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
 pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) ->

+ 50 - 29
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -73,11 +73,48 @@ prop_consistency() ->
     ).
 
 -ifdef(STORE_STATE_IN_DS).
-domain_msg_roundtrip_proper_test_() ->
-    Props = [prop_domain_msg_roundtrip()],
-    Opts = [{numtests, 1000}, {to_file, user}, {max_size, 100}],
+%% Verifies that our internal keys generated for stream keys preserve the order relation
+%% between them.
+stream_order_internal_keys_proper_test_() ->
+    Props = [prop_stream_order_internal_keys()],
+    Opts = [{numtests, 100}, {to_file, user}, {max_size, 100}],
     {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
 
+prop_stream_order_internal_keys() ->
+    ?FORALL(
+        {Id, Streams0},
+        {session_id(), list({non_neg_integer(), value_gen(), stream_state()})},
+        try
+            init(),
+            Streams = lists:uniq(Streams0),
+            StreamKeys = [{R, S} || {R, S, _SS} <- Streams],
+            ExpectedRanks = lists:sort([R || {R, _S, _SS} <- Streams]),
+            S = lists:foldl(
+                fun({R, S, SS}, Acc) ->
+                    emqx_persistent_session_ds_state:put_stream({R, S}, SS, Acc)
+                end,
+                emqx_persistent_session_ds_state:create_new(Id),
+                Streams
+            ),
+            RevRanks = emqx_persistent_session_ds_state:fold_streams(
+                fun({R, _S}, _SS, Acc) -> [R | Acc] end,
+                [],
+                S
+            ),
+            Ranks = lists:reverse(RevRanks),
+            ?WHENFAIL(
+                io:format(
+                    user,
+                    "Expected ranks:\n  ~p\nRanks:\n  ~p\nStream keys:\n  ~p\n",
+                    [ExpectedRanks, Ranks, StreamKeys]
+                ),
+                ExpectedRanks =:= Ranks
+            )
+        after
+            clean()
+        end
+    ).
+
 prop_domain_msg_roundtrip() ->
     ?FORALL(
         {SessionId, Domain, Val},
@@ -156,16 +193,16 @@ seqno() ->
     range(1, 100).
 
 stream_id() ->
-    range(1, 1).
+    {range(1, 3), oneof([#{}, {}])}.
 
-stream() ->
+stream_state() ->
     oneof([#{}]).
 
 put_req() ->
     oneof([
         ?LET(
             {Id, Stream},
-            {stream_id(), stream()},
+            {stream_id(), stream_state()},
             {#s.streams, put_stream, Id, Stream}
         ),
         ?LET(
@@ -194,7 +231,7 @@ del_req() ->
     ]).
 
 value_gen() ->
-    oneof([proper_types:map(), tuple()]).
+    oneof([#{}, loose_tuple(oneof([range(1, 3), binary()]))]).
 
 session_id_gen() ->
     frequency([
@@ -225,28 +262,12 @@ domain_gen() ->
 
 key_gen(?metadata_domain) ->
     <<"metadata">>;
-%% key_gen(?subscription_domain) ->
-%%     {emqx_proper_types:normal_topic_filter(), []};
-%% key_gen(?subscription_state_domain) ->
-%%     integer();
-%% key_gen(?stream_domain) ->
-%%     ?LET(
-%%         {Id, X, Y, Z, T},
-%%         {
-%%             integer(),
-%%             oneof([integer(), binary()]),
-%%             oneof([integer(), binary()]),
-%%             oneof([integer(), binary()]),
-%%             tuple()
-%%         },
-%%         {Id, [X, Y, Z | T]}
-%%     );
-%% key_gen(?rank_domain) ->
-%%     {integer(), binary()};
-%% key_gen(?seqno_domain) ->
-%%     integer();
-%% key_gen(?awaiting_rel_domain) ->
-%%     range(1, 16#FFFF).
+key_gen(?stream_domain) ->
+    ?LET(
+        {Rank, X},
+        {integer(), integer()},
+        <<Rank:64, X:64>>
+    );
 key_gen(_) ->
     integer().