Bläddra i källkod

fix(dsstate): stream iteration and printing sessions with internal keys

Thales Macedo Garitezi 1 år sedan
förälder
incheckning
d8cd0621ec

+ 10 - 5
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -14,6 +14,8 @@
 
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 
+-define(DURABLE_SESSION_STATE, emqx_persistent_session).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %% CT boilerplate
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -667,11 +669,14 @@ t_session_replay_retry(_Config) ->
 
 
     %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
     %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
     ok = emqx_ds_test_helpers:mock_rpc_result(
     ok = emqx_ds_test_helpers:mock_rpc_result(
-        fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
-            case erlang:phash2(Shard) rem 2 of
-                0 -> unavailable;
-                1 -> passthrough
-            end
+        fun
+            (_Node, emqx_ds_replication_layer, _Function, [?DURABLE_SESSION_STATE, _Shard | _]) ->
+                passthrough;
+            (_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
+                case erlang:phash2(Shard) rem 2 of
+                    0 -> unavailable;
+                    1 -> passthrough
+                end
         end
         end
     ),
     ),
 
 

+ 82 - 3
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -114,7 +114,15 @@
 
 
 -type message() :: emqx_types:message().
 -type message() :: emqx_types:message().
 
 
+-ifdef(STORE_STATE_IN_DS).
+-opaque iter(K, V) :: #{
+    it := gb_trees:iter(internal_key(K), V), inv_key_mapping := #{internal_key(K) => K}
+}.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 -opaque iter(K, V) :: gb_trees:iter(K, V).
 -opaque iter(K, V) :: gb_trees:iter(K, V).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -ifdef(STORE_STATE_IN_DS).
 -ifdef(STORE_STATE_IN_DS).
 -opaque session_iterator() :: #{its := [emqx_ds:iterator()]} | '$end_of_table'.
 -opaque session_iterator() :: #{its := [emqx_ds:iterator()]} | '$end_of_table'.
@@ -837,11 +845,18 @@ fold_streams(Fun, Acc, Rec) ->
 
 
 -spec iter_streams(_StartAfter :: stream_key() | beginning, t()) ->
 -spec iter_streams(_StartAfter :: stream_key() | beginning, t()) ->
     iter(stream_key(), emqx_persistent_session_ds:stream_state()).
     iter(stream_key(), emqx_persistent_session_ds:stream_state()).
+-ifdef(STORE_STATE_IN_DS).
+iter_streams(After, Rec) ->
+    gen_iter_after(?streams, After, Rec).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 iter_streams(After, Rec) ->
 iter_streams(After, Rec) ->
     %% NOTE
     %% NOTE
     %% No special handling for `beginning', as it always compares less
     %% No special handling for `beginning', as it always compares less
     %% than any `stream_key()'.
     %% than any `stream_key()'.
     gen_iter_after(?streams, After, Rec).
     gen_iter_after(?streams, After, Rec).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec n_streams(t()) -> non_neg_integer().
 -spec n_streams(t()) -> non_neg_integer().
 n_streams(Rec) ->
 n_streams(Rec) ->
@@ -902,8 +917,21 @@ n_awaiting_rel(Rec) ->
 %%
 %%
 
 
 -spec iter_next(iter(K, V)) -> {K, V, iter(K, V)} | none.
 -spec iter_next(iter(K, V)) -> {K, V, iter(K, V)} | none.
+-ifdef(STORE_STATE_IN_DS).
+iter_next(#{it := InnerIt0, inv_key_mapping := InvKeyMapping} = It0) ->
+    case gen_iter_next(InnerIt0) of
+        none ->
+            none;
+        {IntKey, Value, InnerIt} ->
+            Key = maps:get(IntKey, InvKeyMapping),
+            {Key, Value, It0#{it := InnerIt}}
+    end.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 iter_next(It0) ->
 iter_next(It0) ->
     gen_iter_next(It0).
     gen_iter_next(It0).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 %%
 %%
 
 
@@ -1095,9 +1123,12 @@ update_pmaps(Fun, Map) ->
 %% @doc Open a PMAP and fill the clean area with the data from DB.
 %% @doc Open a PMAP and fill the clean area with the data from DB.
 %% This functtion should be ran in a transaction.
 %% This functtion should be ran in a transaction.
 -ifdef(STORE_STATE_IN_DS).
 -ifdef(STORE_STATE_IN_DS).
+invert_key_mapping(KeyMapping) ->
+    maps:fold(fun(K, IntK, AccIn) -> AccIn#{IntK => K} end, #{}, KeyMapping).
+
 -spec pmap_open(domain(), [data()], #{K => internal_key(K)}) -> pmap(K, _V).
 -spec pmap_open(domain(), [data()], #{K => internal_key(K)}) -> pmap(K, _V).
 pmap_open(Domain, Data0, KeyMapping0) ->
 pmap_open(Domain, Data0, KeyMapping0) ->
-    InvKeyMapping = maps:fold(fun(K, IntK, AccIn) -> AccIn#{IntK => K} end, #{}, KeyMapping0),
+    InvKeyMapping = invert_key_mapping(KeyMapping0),
     {Data, KeyMapping} =
     {Data, KeyMapping} =
         lists:mapfoldl(
         lists:mapfoldl(
             fun(#{key := IntK, val := V}, AccIn) ->
             fun(#{key := IntK, val := V}, AccIn) ->
@@ -1272,17 +1303,43 @@ pmap_commit(
 -endif.
 -endif.
 
 
 -spec pmap_format(pmap(_K, _V)) -> map().
 -spec pmap_format(pmap(_K, _V)) -> map().
+-ifdef(STORE_STATE_IN_DS).
+pmap_format(#pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
+    InvKeyMapping = invert_key_mapping(KeyMapping),
+    cache_format(Table, InvKeyMapping, Cache).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 pmap_format(#pmap{table = Table, cache = Cache}) ->
 pmap_format(#pmap{table = Table, cache = Cache}) ->
     cache_format(Table, Cache).
     cache_format(Table, Cache).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 -spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
 -spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
 pmap_size(#pmap{table = Table, cache = Cache}) ->
 pmap_size(#pmap{table = Table, cache = Cache}) ->
     cache_size(Table, Cache).
     cache_size(Table, Cache).
 
 
+-ifdef(STORE_STATE_IN_DS).
+pmap_iter_after(beginning, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
+    %% NOTE: Only valid for gbt-backed PMAPs.
+    gbt = cache_data_type(Table),
+    It = gb_trees:iterator(Cache),
+    InvKeyMapping = invert_key_mapping(KeyMapping),
+    #{it => It, inv_key_mapping => InvKeyMapping};
+pmap_iter_after(AfterExt, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
+    %% NOTE: Only valid for gbt-backed PMAPs.
+    gbt = cache_data_type(Table),
+    AfterInt = maps:get(AfterExt, KeyMapping, undefined),
+    It = gbt_iter_after(AfterInt, Cache),
+    InvKeyMapping = invert_key_mapping(KeyMapping),
+    #{it => It, inv_key_mapping => InvKeyMapping}.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 pmap_iter_after(After, #pmap{table = Table, cache = Cache}) ->
 pmap_iter_after(After, #pmap{table = Table, cache = Cache}) ->
     %% NOTE: Only valid for gbt-backed PMAPs.
     %% NOTE: Only valid for gbt-backed PMAPs.
     gbt = cache_data_type(Table),
     gbt = cache_data_type(Table),
     gbt_iter_after(After, Cache).
     gbt_iter_after(After, Cache).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 %%
 %%
 
 
@@ -1317,7 +1374,7 @@ cache_remove(_Table, K, Cache) ->
 cache_fold(?stream_tab, Fun, Acc, KeyMapping, Cache) ->
 cache_fold(?stream_tab, Fun, Acc, KeyMapping, Cache) ->
     gbt_fold(Fun, Acc, KeyMapping, Cache);
     gbt_fold(Fun, Acc, KeyMapping, Cache);
 cache_fold(_Table, FunIn, Acc, KeyMapping, Cache) ->
 cache_fold(_Table, FunIn, Acc, KeyMapping, Cache) ->
-    InvKeyMapping = maps:fold(fun(K, IntK, AccIn) -> AccIn#{IntK => K} end, #{}, KeyMapping),
+    InvKeyMapping = invert_key_mapping(KeyMapping),
     Fun = fun(IntK, V, AccIn) ->
     Fun = fun(IntK, V, AccIn) ->
         K = maps:get(IntK, InvKeyMapping),
         K = maps:get(IntK, InvKeyMapping),
         FunIn(K, V, AccIn)
         FunIn(K, V, AccIn)
@@ -1337,10 +1394,32 @@ cache_fold(_Table, Fun, Acc, Cache) ->
 %% END ifdef(STORE_STATE_IN_DS).
 %% END ifdef(STORE_STATE_IN_DS).
 -endif.
 -endif.
 
 
+-ifdef(STORE_STATE_IN_DS).
+cache_format(?stream_tab, InvKeyMapping, Cache) ->
+    lists:map(
+        fun({IntK, V}) ->
+            K = maps:get(IntK, InvKeyMapping),
+            {K, V}
+        end,
+        gbt_format(Cache)
+    );
+cache_format(_Table, InvKeyMapping, Cache) ->
+    maps:fold(
+        fun(IntK, V, Acc) ->
+            K = maps:get(IntK, InvKeyMapping),
+            Acc#{K => V}
+        end,
+        #{},
+        Cache
+    ).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 cache_format(?stream_tab, Cache) ->
 cache_format(?stream_tab, Cache) ->
     gbt_format(Cache);
     gbt_format(Cache);
 cache_format(_Table, Cache) ->
 cache_format(_Table, Cache) ->
     Cache.
     Cache.
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
 cache_size(?stream_tab, Cache) ->
 cache_size(?stream_tab, Cache) ->
     gbt_size(Cache);
     gbt_size(Cache);
@@ -1374,7 +1453,7 @@ gbt_format(Cache) ->
 
 
 -ifdef(STORE_STATE_IN_DS).
 -ifdef(STORE_STATE_IN_DS).
 gbt_fold(Fun, Acc, KeyMapping, Cache) ->
 gbt_fold(Fun, Acc, KeyMapping, Cache) ->
-    InvKeyMapping = maps:fold(fun(K, IntK, AccIn) -> AccIn#{IntK => K} end, #{}, KeyMapping),
+    InvKeyMapping = invert_key_mapping(KeyMapping),
     It = gb_trees:iterator(Cache),
     It = gb_trees:iterator(Cache),
     gbt_fold_iter(Fun, Acc, InvKeyMapping, It).
     gbt_fold_iter(Fun, Acc, InvKeyMapping, It).