Просмотр исходного кода

chore(sessds): consolidate `ifdef`s to make code easier to follow

Andrew Mayorov 1 год назад
Родитель
Сommit
2bd4f7ddfc

+ 75 - 210
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl

@@ -219,6 +219,7 @@
 }.
 
 -ifdef(STORE_STATE_IN_DS).
+
 -define(session_topic_ns, <<"session">>).
 -define(metadata_domain, metadata).
 -define(metadata_domain_bin, <<"metadata">>).
@@ -240,7 +241,6 @@
     | ?awaiting_rel_domain.
 
 -type sub_id() :: nil().
--type srs() :: #srs{}.
 -type data() ::
     #{
         domain := ?metadata_domain,
@@ -264,7 +264,7 @@
         domain := ?stream_domain,
         session_id := emqx_persistent_session_ds:id(),
         key := {non_neg_integer(), emqx_ds:stream()},
-        val := srs()
+        val := emqx_persistent_session_ds:stream_state()
     }
     | #{
         domain := ?rank_domain,
@@ -293,8 +293,12 @@
     {?ranks, ?rank_domain},
     {?awaiting_rel, ?awaiting_rel_domain}
 ]).
+
+-define(stream_tab, ?stream_domain).
+
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
+
 -define(session_tab, emqx_ds_session_tab).
 -define(subscription_tab, emqx_ds_session_subscriptions).
 -define(subscription_states_tab, emqx_ds_session_subscription_states).
@@ -311,6 +315,7 @@
     {?ranks, ?rank_tab},
     {?awaiting_rel, ?awaiting_rel_tab}
 ]).
+
 %% END ifdef(STORE_STATE_IN_DS).
 -endif.
 
@@ -854,14 +859,6 @@ del_stream(Key, Rec) ->
 fold_streams(Fun, Acc, Rec) ->
     gen_fold(?streams, Fun, Acc, Rec).
 
--ifdef(STORE_STATE_IN_DS).
--spec iter_streams(_StartAfter :: stream_key() | beginning, t()) ->
-    iter(stream_key(), emqx_persistent_session_ds:stream_state()).
-iter_streams(After, Rec) ->
-    gen_iter_after(?streams, After, Rec).
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
-
 -spec n_streams(t()) -> non_neg_integer().
 n_streams(Rec) ->
     gen_size(?streams, Rec).
@@ -920,21 +917,6 @@ n_awaiting_rel(Rec) ->
 
 %%
 
--ifdef(STORE_STATE_IN_DS).
--spec iter_next(iter(K, V)) -> {K, V, iter(K, V)} | none.
-iter_next(#{it := InnerIt0, inv_key_mapping := InvKeyMapping} = It0) ->
-    case gen_iter_next(InnerIt0) of
-        none ->
-            none;
-        {IntKey, Value, InnerIt} ->
-            Key = maps:get(IntKey, InvKeyMapping),
-            {Key, Value, It0#{it := InnerIt}}
-    end.
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
-
-%%
-
 -spec make_session_iterator() -> session_iterator().
 -ifdef(STORE_STATE_IN_DS).
 make_session_iterator() ->
@@ -1116,14 +1098,17 @@ update_pmaps(Fun, Map) ->
         ?pmaps
     ).
 
-%%
+%% PMaps
+
+-spec pmap_get(K, pmap(K, V)) -> V | undefined.
+-spec pmap_del(K, pmap(K, V)) -> pmap(K, V).
+-spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A.
+-spec pmap_format(pmap(_K, _V)) -> map().
+-spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
 
-%% @doc Open a PMAP and fill the clean area with the data from DB.
-%% This functtion should be ran in a transaction.
 -ifdef(STORE_STATE_IN_DS).
-invert_key_mapping(KeyMapping) ->
-    maps:fold(fun(K, IntK, AccIn) -> AccIn#{IntK => K} end, #{}, KeyMapping).
 
+%% @doc Open a PMAP and fill the clean area with the data from DB.
 -spec pmap_open(domain(), [data()], #{K => internal_key(K)}) -> pmap(K, _V).
 pmap_open(Domain, Data0, KeyMapping0) ->
     InvKeyMapping = invert_key_mapping(KeyMapping0),
@@ -1145,21 +1130,10 @@ pmap_open(Domain, Data0, KeyMapping0) ->
         cache = Clean,
         dirty = #{}
     }.
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
--spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
-pmap_open(Table, SessionId) ->
-    Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)),
-    #pmap{
-        table = Table,
-        cache = Clean,
-        dirty = #{}
-    }.
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--spec pmap_get(K, pmap(K, V)) -> V | undefined.
--ifdef(STORE_STATE_IN_DS).
+invert_key_mapping(KeyMapping) ->
+    maps:fold(fun(K, IntK, AccIn) -> AccIn#{IntK => K} end, #{}, KeyMapping).
+
 pmap_get(K, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) when
     is_map_key(K, KeyMapping)
 ->
@@ -1167,15 +1141,7 @@ pmap_get(K, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) when
     cache_get(Table, IntK, Cache);
 pmap_get(_K, _Pmap) ->
     undefined.
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
-pmap_get(K, #pmap{table = Table, cache = Cache}) ->
-    cache_get(Table, K, Cache).
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V).
--ifdef(STORE_STATE_IN_DS).
 pmap_put(
     K, V, Pmap = #pmap{table = Table, key_mapping = KeyMapping0, dirty = Dirty, cache = Cache}
 ) ->
@@ -1207,18 +1173,7 @@ gen_internal_key(?stream_domain, {Rank, _Stream}) ->
     <<Rank:64, LSB:64>>;
 gen_internal_key(_Domain, _K) ->
     erlang:unique_integer().
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
-pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) ->
-    Pmap#pmap{
-        cache = cache_put(Table, K, V, Cache),
-        dirty = Dirty#{K => dirty}
-    }.
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--spec pmap_del(K, pmap(K, V)) -> pmap(K, V).
--ifdef(STORE_STATE_IN_DS).
 pmap_del(
     Key,
     Pmap = #pmap{table = Table, key_mapping = KeyMapping, dirty = Dirty, cache = Cache}
@@ -1230,36 +1185,13 @@ pmap_del(
     };
 pmap_del(_Key, Pmap) ->
     Pmap.
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
-pmap_del(
-    Key,
-    Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}
-) ->
-    Pmap#pmap{
-        cache = cache_remove(Table, Key, Cache),
-        dirty = Dirty#{Key => del}
-    }.
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A.
--ifdef(STORE_STATE_IN_DS).
 pmap_fold(Fun, Acc, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
     cache_fold(Table, Fun, Acc, KeyMapping, Cache).
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
-pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) ->
-    cache_fold(Table, Fun, Acc, Cache).
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--ifdef(STORE_STATE_IN_DS).
 -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) ->
     {[emqx_ds:operation()], pmap(K, V)}.
-pmap_commit(
-    SessionId, Pmap = #pmap{table = Domain, dirty = Dirty, cache = Cache}
-) ->
+pmap_commit(SessionId, Pmap = #pmap{table = Domain, dirty = Dirty, cache = Cache}) ->
     Out =
         maps:fold(
             fun
@@ -1284,8 +1216,46 @@ matcher(SessionId, Domain, Key) ->
         payload = '_',
         timestamp = ?TS
     }.
+
+pmap_format(#pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
+    InvKeyMapping = invert_key_mapping(KeyMapping),
+    cache_format(Table, InvKeyMapping, Cache).
+
+pmap_size(#pmap{table = Table, cache = Cache}) ->
+    cache_size(Table, Cache).
+
 %% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
+
+%% @doc Open a PMAP and fill the clean area with the data from DB.
+%% This functtion should be ran in a transaction.
+-spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
+pmap_open(Table, SessionId) ->
+    Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)),
+    #pmap{
+        table = Table,
+        cache = Clean,
+        dirty = #{}
+    }.
+
+pmap_get(K, #pmap{table = Table, cache = Cache}) ->
+    cache_get(Table, K, Cache).
+
+pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) ->
+    Pmap#pmap{
+        cache = cache_put(Table, K, V, Cache),
+        dirty = Dirty#{K => dirty}
+    }.
+
+pmap_del(Key, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) ->
+    Pmap#pmap{
+        cache = cache_remove(Table, Key, Cache),
+        dirty = Dirty#{Key => del}
+    }.
+
+pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) ->
+    cache_fold(Table, Fun, Acc, Cache).
+
 -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
 pmap_commit(
     SessionId, Pmap = #pmap{table = Tab, dirty = Dirty, cache = Cache}
@@ -1303,64 +1273,35 @@ pmap_commit(
     Pmap#pmap{
         dirty = #{}
     }.
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--spec pmap_format(pmap(_K, _V)) -> map().
--ifdef(STORE_STATE_IN_DS).
-pmap_format(#pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
-    InvKeyMapping = invert_key_mapping(KeyMapping),
-    cache_format(Table, InvKeyMapping, Cache).
-%% ELSE ifdef(STORE_STATE_IN_DS).
--else.
 pmap_format(#pmap{table = Table, cache = Cache}) ->
     cache_format(Table, Cache).
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
 
--spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
 pmap_size(#pmap{table = Table, cache = Cache}) ->
     cache_size(Table, Cache).
 
--ifdef(STORE_STATE_IN_DS).
-pmap_iter_after(beginning, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
-    %% NOTE: Only valid for gbt-backed PMAPs.
-    gbt = cache_data_type(Table),
-    It = gb_trees:iterator(Cache),
-    InvKeyMapping = invert_key_mapping(KeyMapping),
-    #{it => It, inv_key_mapping => InvKeyMapping};
-pmap_iter_after(AfterExt, #pmap{table = Table, key_mapping = KeyMapping, cache = Cache}) ->
-    %% NOTE: Only valid for gbt-backed PMAPs.
-    gbt = cache_data_type(Table),
-    AfterInt = maps:get(AfterExt, KeyMapping, undefined),
-    It = gbt_iter_after(AfterInt, Cache),
-    InvKeyMapping = invert_key_mapping(KeyMapping),
-    #{it => It, inv_key_mapping => InvKeyMapping}.
 %% END ifdef(STORE_STATE_IN_DS).
 -endif.
 
 %%
 
--ifdef(STORE_STATE_IN_DS).
--define(stream_tab, ?stream_domain).
-
-cache_data_type(?stream_tab) -> gbt;
-cache_data_type(_Table) -> map.
-
-cache_from_list(?stream_tab, L) ->
-    gbt_from_list(L);
 cache_from_list(_Table, L) ->
     maps:from_list(L).
-cache_put(?stream_tab, K, V, Cache) ->
-    gbt_put(K, V, Cache);
+
+cache_get(_Table, K, Cache) ->
+    maps:get(K, Cache, undefined).
+
 cache_put(_Table, K, V, Cache) ->
     maps:put(K, V, Cache).
-cache_remove(?stream_tab, K, Cache) ->
-    gbt_remove(K, Cache);
+
 cache_remove(_Table, K, Cache) ->
     maps:remove(K, Cache).
-cache_fold(?stream_tab, Fun, Acc, KeyMapping, Cache) ->
-    gbt_fold(Fun, Acc, KeyMapping, Cache);
+
+cache_size(_Table, Cache) ->
+    maps:size(Cache).
+
+-ifdef(STORE_STATE_IN_DS).
+
 cache_fold(_Table, FunIn, Acc, KeyMapping, Cache) ->
     InvKeyMapping = invert_key_mapping(KeyMapping),
     Fun = fun(IntK, V, AccIn) ->
@@ -1369,18 +1310,6 @@ cache_fold(_Table, FunIn, Acc, KeyMapping, Cache) ->
     end,
     maps:fold(Fun, Acc, Cache).
 
-cache_has_key(?stream_tab, Key, Cache) ->
-    gb_trees:is_defined(Key, Cache);
-cache_has_key(_Domain, Key, Cache) ->
-    is_map_key(Key, Cache).
-cache_format(?stream_tab, InvKeyMapping, Cache) ->
-    lists:map(
-        fun({IntK, V}) ->
-            K = maps:get(IntK, InvKeyMapping),
-            {K, V}
-        end,
-        gbt_format(Cache)
-    );
 cache_format(_Table, InvKeyMapping, Cache) ->
     maps:fold(
         fun(IntK, V, Acc) ->
@@ -1390,87 +1319,23 @@ cache_format(_Table, InvKeyMapping, Cache) ->
         #{},
         Cache
     ).
-cache_size(?stream_tab, Cache) ->
-    gbt_size(Cache);
-cache_size(_Table, Cache) ->
-    maps:size(Cache).
-%% Below ndef(STORE_STATE_IN_DS)
+
+cache_has_key(_Domain, Key, Cache) ->
+    is_map_key(Key, Cache).
+
+%% ELSE ifdef(STORE_STATE_IN_DS).
 -else.
-cache_from_list(_Table, L) ->
-    maps:from_list(L).
-cache_put(_Table, K, V, Cache) ->
-    maps:put(K, V, Cache).
-cache_remove(_Table, K, Cache) ->
-    maps:remove(K, Cache).
+
 cache_fold(_Table, Fun, Acc, Cache) ->
     maps:fold(Fun, Acc, Cache).
+
 cache_format(_Table, Cache) ->
     Cache.
-cache_size(_Table, Cache) ->
-    maps:size(Cache).
-%% END ifdef(STORE_STATE_IN_DS).
--endif.
-
-cache_get(_Table, K, Cache) ->
-    maps:get(K, Cache, undefined).
-
--ifdef(STORE_STATE_IN_DS).
-%% PMAP Cache implementation backed by `gb_trees'.
-%% Supports iteration starting from specific key.
-
-gbt_from_list(L) ->
-    lists:foldl(
-        fun({K, V}, Acc) -> gb_trees:insert(K, V, Acc) end,
-        gb_trees:empty(),
-        L
-    ).
-
-gbt_get(K, Cache, undefined) ->
-    case gb_trees:lookup(K, Cache) of
-        none -> undefined;
-        {_, V} -> V
-    end.
-
-gbt_put(K, V, Cache) ->
-    gb_trees:enter(K, V, Cache).
-
-gbt_remove(K, Cache) ->
-    gb_trees:delete_any(K, Cache).
-
-gbt_format(Cache) ->
-    gb_trees:to_list(Cache).
-
-gbt_fold(Fun, Acc, KeyMapping, Cache) ->
-    InvKeyMapping = invert_key_mapping(KeyMapping),
-    It = gb_trees:iterator(Cache),
-    gbt_fold_iter(Fun, Acc, InvKeyMapping, It).
 
-gbt_fold_iter(Fun, Acc, InvKeyMapping, It0) ->
-    case gb_trees:next(It0) of
-        {IntK, V, It} ->
-            K = maps:get(IntK, InvKeyMapping),
-            gbt_fold_iter(Fun, Fun(K, V, Acc), InvKeyMapping, It);
-        _ ->
-            Acc
-    end.
 %% END ifdef(STORE_STATE_IN_DS).
 -endif.
 
 -ifdef(STORE_STATE_IN_DS).
-gbt_size(Cache) ->
-    gb_trees:size(Cache).
-
-gbt_iter_after(After, Cache) ->
-    It0 = gb_trees:iterator_from(After, Cache),
-    case gb_trees:next(It0) of
-        {After, _, It} ->
-            It;
-        _ ->
-            It0
-    end.
-
-gbt_iter_next(It) ->
-    gb_trees:next(It).
 
 session_restore(SessionId) ->
     Empty = maps:from_keys(