Ver código fonte

Merge pull request #13539 from thalesmg/20240729-r58-ds-session-data

feat(session): move session data to DS
Thales Macedo Garitezi 1 ano atrás
pai
commit
044b4f0e3a

+ 19 - 6
apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl

@@ -14,6 +14,8 @@
 
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 -import(emqx_common_test_helpers, [on_exit/1]).
 
 
+-define(DURABLE_SESSION_STATE, emqx_persistent_session).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% CT boilerplate
 %% CT boilerplate
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -206,6 +208,14 @@ force_last_alive_at(ClientId, Time) ->
     _ = emqx_persistent_session_ds_state:commit(S),
     _ = emqx_persistent_session_ds_state:commit(S),
     ok.
     ok.
 
 
+stop_and_commit(Client) ->
+    {ok, {ok, _}} =
+        ?wait_async_action(
+            emqtt:stop(Client),
+            #{?snk_kind := persistent_session_ds_terminate}
+        ),
+    ok.
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Testcases
 %% Testcases
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
@@ -327,7 +337,7 @@ t_session_unsubscription_idempotency(Config) ->
                     15_000
                     15_000
                 ),
                 ),
 
 
-            ok = emqtt:stop(Client1),
+            ok = stop_and_commit(Client1),
 
 
             ok
             ok
         end,
         end,
@@ -659,11 +669,14 @@ t_session_replay_retry(_Config) ->
 
 
     %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
     %% Make `emqx_ds` believe that roughly half of the shards are unavailable.
     ok = emqx_ds_test_helpers:mock_rpc_result(
     ok = emqx_ds_test_helpers:mock_rpc_result(
-        fun(_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
-            case erlang:phash2(Shard) rem 2 of
-                0 -> unavailable;
-                1 -> passthrough
-            end
+        fun
+            (_Node, emqx_ds_replication_layer, _Function, [?DURABLE_SESSION_STATE, _Shard | _]) ->
+                passthrough;
+            (_Node, emqx_ds_replication_layer, _Function, [_DB, Shard | _]) ->
+                case erlang:phash2(Shard) rem 2 of
+                    0 -> unavailable;
+                    1 -> passthrough
+                end
         end
         end
     ),
     ),
 
 

+ 12 - 1
apps/emqx/src/emqx_persistent_message.erl

@@ -47,7 +47,7 @@ init() ->
         ?SLOG(notice, #{msg => "Session durability is enabled"}),
         ?SLOG(notice, #{msg => "Session durability is enabled"}),
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, get_db_config()),
         ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, get_db_config()),
         ok = emqx_persistent_session_ds_router:init_tables(),
         ok = emqx_persistent_session_ds_router:init_tables(),
-        ok = emqx_persistent_session_ds:create_tables(),
+        ok = initialize_session_ds_state(),
         ok
         ok
     end).
     end).
 
 
@@ -69,6 +69,17 @@ get_db_config() ->
 force_ds(Zone) ->
 force_ds(Zone) ->
     emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
     emqx_config:get_zone_conf(Zone, [durable_sessions, force_persistence]).
 
 
+-ifdef(STORE_STATE_IN_DS).
+initialize_session_ds_state() ->
+    Config = emqx_ds_schema:db_config([durable_storage, sessions]),
+    ok = emqx_persistent_session_ds_state:open_db(Config).
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
+initialize_session_ds_state() ->
+    ok = emqx_persistent_session_ds_state:create_tables().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 -spec add_handler() -> ok.
 -spec add_handler() -> ok.

+ 9 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -82,7 +82,11 @@
 ]).
 ]).
 
 
 %% session table operations
 %% session table operations
--export([create_tables/0, sync/1]).
+-export([sync/1]).
+-ifndef(STORE_STATE_IN_DS).
+-export([create_tables/0]).
+%% END ifndef(STORE_STATE_IN_DS).
+-endif.
 
 
 %% internal export used by session GC process
 %% internal export used by session GC process
 -export([destroy_session/1]).
 -export([destroy_session/1]).
@@ -832,8 +836,11 @@ get_client_subscription(ClientId, TopicFilter) ->
 %% Session tables operations
 %% Session tables operations
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
+-ifndef(STORE_STATE_IN_DS).
 create_tables() ->
 create_tables() ->
     emqx_persistent_session_ds_state:create_tables().
     emqx_persistent_session_ds_state:create_tables().
+%% END ifndef(STORE_STATE_IN_DS).
+-endif.
 
 
 %% @doc Force syncing of the transient state to persistent storage
 %% @doc Force syncing of the transient state to persistent storage
 sync(ClientId) ->
 sync(ClientId) ->
@@ -939,7 +946,7 @@ session_ensure_new(
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
     S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
     S6 = set_clientinfo(ClientInfo, S5),
     S6 = set_clientinfo(ClientInfo, S5),
     S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
     S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
-    S = emqx_persistent_session_ds_state:commit(S7),
+    S = emqx_persistent_session_ds_state:commit(S7, #{ensure_new => true}),
     #{
     #{
         id => Id,
         id => Id,
         props => Conf,
         props => Conf,

Diferenças do arquivo suprimidas por serem muito extensas
+ 855 - 12
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl


+ 2 - 0
apps/emqx/src/emqx_persistent_session_ds/session_internals.hrl

@@ -19,6 +19,8 @@
 -include("emqx_persistent_message.hrl").
 -include("emqx_persistent_message.hrl").
 -include("emqx_durable_session_metadata.hrl").
 -include("emqx_durable_session_metadata.hrl").
 
 
+-define(DURABLE_SESSION_STATE, emqx_persistent_session).
+
 -define(SESSION_TAB, emqx_ds_session).
 -define(SESSION_TAB, emqx_ds_session).
 -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
 -define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions).
 -define(SESSION_STREAM_TAB, emqx_ds_stream_tab).
 -define(SESSION_STREAM_TAB, emqx_ds_stream_tab).

+ 154 - 4
apps/emqx/test/emqx_persistent_session_ds_state_tests.erl

@@ -21,12 +21,13 @@
 -include_lib("proper/include/proper.hrl").
 -include_lib("proper/include/proper.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
 
--define(tab, ?MODULE).
-
 %%================================================================================
 %%================================================================================
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
 
 
+-define(tab, ?MODULE).
+-define(DB, emqx_persistent_session).
+
 %% Note: here `committed' != `dirty'. It means "has been committed at
 %% Note: here `committed' != `dirty'. It means "has been committed at
 %% least once since the creation", and it's used by the iteration
 %% least once since the creation", and it's used by the iteration
 %% test.
 %% test.
@@ -34,6 +35,15 @@
 
 
 -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
 -type state() :: #{emqx_persistent_session_ds:id() => #s{}}.
 
 
+-define(metadata_domain, metadata).
+-define(metadata_domain_bin, <<"metadata">>).
+-define(subscription_domain, subscription).
+-define(subscription_state_domain, subscription_state).
+-define(stream_domain, stream).
+-define(rank_domain, rank).
+-define(seqno_domain, seqno).
+-define(awaiting_rel_domain, awaiting_rel).
+
 %%================================================================================
 %%================================================================================
 %% Properties
 %% Properties
 %%================================================================================
 %%================================================================================
@@ -62,6 +72,51 @@ prop_consistency() ->
         end
         end
     ).
     ).
 
 
+-ifdef(STORE_STATE_IN_DS).
+%% Verifies that our internal keys generated for stream keys preserve the order relation
+%% between them.
+stream_order_internal_keys_proper_test_() ->
+    Props = [prop_stream_order_internal_keys()],
+    Opts = [{numtests, 100}, {to_file, user}, {max_size, 100}],
+    {timeout, 300, [?_assert(proper:quickcheck(Prop, Opts)) || Prop <- Props]}.
+
+prop_stream_order_internal_keys() ->
+    ?FORALL(
+        {Id, Streams0},
+        {session_id(), list({non_neg_integer(), value_gen(), stream_state()})},
+        try
+            init(),
+            Streams = lists:uniq(Streams0),
+            StreamKeys = [{R, S} || {R, S, _SS} <- Streams],
+            ExpectedRanks = lists:sort([R || {R, _S, _SS} <- Streams]),
+            S = lists:foldl(
+                fun({R, S, SS}, Acc) ->
+                    emqx_persistent_session_ds_state:put_stream({R, S}, SS, Acc)
+                end,
+                emqx_persistent_session_ds_state:create_new(Id),
+                Streams
+            ),
+            RevRanks = emqx_persistent_session_ds_state:fold_streams(
+                fun({R, _S}, _SS, Acc) -> [R | Acc] end,
+                [],
+                S
+            ),
+            Ranks = lists:reverse(RevRanks),
+            ?WHENFAIL(
+                io:format(
+                    user,
+                    "Expected ranks:\n  ~p\nRanks:\n  ~p\nStream keys:\n  ~p\n",
+                    [ExpectedRanks, Ranks, StreamKeys]
+                ),
+                ExpectedRanks =:= Ranks
+            )
+        after
+            clean()
+        end
+    ).
+%% -ifdef(STORE_STATE_IN_DS).
+-endif.
+
 %%================================================================================
 %%================================================================================
 %% Generators
 %% Generators
 %%================================================================================
 %%================================================================================
@@ -109,17 +164,26 @@ seqno_track() ->
 seqno() ->
 seqno() ->
     range(1, 100).
     range(1, 100).
 
 
+-ifdef(STORE_STATE_IN_DS).
+stream_id() ->
+    {range(1, 3), oneof([#{}, {}])}.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 stream_id() ->
 stream_id() ->
+    %% Note: this does not match the stream id type used in practice, which is a
+    %% `{emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}'
     range(1, 1).
     range(1, 1).
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
-stream() ->
+stream_state() ->
     oneof([#{}]).
     oneof([#{}]).
 
 
 put_req() ->
 put_req() ->
     oneof([
     oneof([
         ?LET(
         ?LET(
             {Id, Stream},
             {Id, Stream},
-            {stream_id(), stream()},
+            {stream_id(), stream_state()},
             {#s.streams, put_stream, Id, Stream}
             {#s.streams, put_stream, Id, Stream}
         ),
         ),
         ?LET(
         ?LET(
@@ -147,6 +211,47 @@ del_req() ->
         {#s.subs, del_subscription, topic()}
         {#s.subs, del_subscription, topic()}
     ]).
     ]).
 
 
+value_gen() ->
+    oneof([#{}, loose_tuple(oneof([range(1, 3), binary()]))]).
+
+session_id_gen() ->
+    frequency([
+        {5, clientid()},
+        {1, <<"a/">>},
+        {1, <<"a/b">>},
+        {1, <<"a/+">>},
+        {1, <<"a/+/#">>},
+        {1, <<"#">>},
+        {1, <<"+">>},
+        {1, <<"/">>}
+    ]).
+
+clientid() ->
+    %% empty string is not valid...
+    ?SUCHTHAT(ClientId, emqx_proper_types:clientid(), ClientId =/= <<>>).
+
+domain_gen() ->
+    oneof([
+        ?metadata_domain,
+        ?subscription_domain,
+        ?subscription_state_domain,
+        ?stream_domain,
+        ?rank_domain,
+        ?seqno_domain,
+        ?awaiting_rel_domain
+    ]).
+
+key_gen(?metadata_domain) ->
+    <<"metadata">>;
+key_gen(?stream_domain) ->
+    ?LET(
+        {Rank, X},
+        {integer(), integer()},
+        <<Rank:64, X:64>>
+    );
+key_gen(_) ->
+    integer().
+
 command(S) ->
 command(S) ->
     case maps:size(S) > 0 of
     case maps:size(S) > 0 of
         true ->
         true ->
@@ -316,12 +421,57 @@ get_state(SessionId) ->
 put_state(SessionId, S) ->
 put_state(SessionId, S) ->
     ets:insert(?tab, {SessionId, S}).
     ets:insert(?tab, {SessionId, S}).
 
 
+-ifdef(STORE_STATE_IN_DS).
+init() ->
+    _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
+    mria:start(),
+    {ok, _} = application:ensure_all_started(emqx_ds_backends),
+    Dir = binary_to_list(filename:join(["/tmp", emqx_guid:to_hexstr(emqx_guid:gen())])),
+    persistent_term:put({?MODULE, data_dir}, Dir),
+    application:set_env(emqx_durable_storage, db_data_dir, Dir),
+    Defaults = #{
+        backend => builtin_local,
+        force_monotonic_timestamps => false,
+        atomic_batches => true,
+        storage =>
+            {emqx_ds_storage_bitfield_lts, #{
+                topic_index_bytes => 4,
+                epoch_bits => 10,
+                bits_per_topic_level => 64
+            }},
+        n_shards => 16,
+        n_sites => 1,
+        replication_factor => 3,
+        replication_options => #{}
+    },
+    ok = emqx_persistent_session_ds_state:open_db(Defaults),
+    ok.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 init() ->
 init() ->
     _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
     _ = ets:new(?tab, [named_table, public, {keypos, 1}]),
     mria:start(),
     mria:start(),
     emqx_persistent_session_ds_state:create_tables().
     emqx_persistent_session_ds_state:create_tables().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.
 
 
+-ifdef(STORE_STATE_IN_DS).
+clean() ->
+    ets:delete(?tab),
+    emqx_ds:drop_db(?DB),
+    application:stop(emqx_ds_backends),
+    application:stop(emqx_ds_builtin_local),
+    mria:stop(),
+    mria_mnesia:delete_schema(),
+    Dir = persistent_term:get({?MODULE, data_dir}),
+    persistent_term:erase({?MODULE, data_dir}),
+    ok = file:del_dir_r(Dir),
+    ok.
+%% ELSE ifdef(STORE_STATE_IN_DS).
+-else.
 clean() ->
 clean() ->
     ets:delete(?tab),
     ets:delete(?tab),
     mria:stop(),
     mria:stop(),
     mria_mnesia:delete_schema().
     mria_mnesia:delete_schema().
+%% END ifdef(STORE_STATE_IN_DS).
+-endif.

+ 2 - 1
apps/emqx/test/emqx_proper_types.erl

@@ -50,7 +50,8 @@
     printable_utf8/0,
     printable_utf8/0,
     printable_codepoint/0,
     printable_codepoint/0,
     raw_duration/0,
     raw_duration/0,
-    large_raw_duration/0
+    large_raw_duration/0,
+    clientid/0
 ]).
 ]).
 
 
 %% Generic Types
 %% Generic Types

+ 28 - 26
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -104,27 +104,32 @@ init_per_group(general, Config) ->
         | Config
         | Config
     ];
     ];
 init_per_group(persistent_sessions, Config) ->
 init_per_group(persistent_sessions, Config) ->
-    AppSpecs = [
-        {emqx,
-            "durable_sessions.enable = true\n"
-            "durable_sessions.disconnected_session_count_refresh_interval = 100ms"},
-        emqx_management
-    ],
-    Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(),
-    Cluster = [
-        {emqx_mgmt_api_clients_SUITE1, #{apps => AppSpecs ++ [Dashboard]}},
-        {emqx_mgmt_api_clients_SUITE2, #{apps => AppSpecs}}
-    ],
-    Nodes =
-        [N1 | _] = emqx_cth_cluster:start(
-            Cluster,
-            #{work_dir => emqx_cth_suite:work_dir(Config)}
-        ),
-    [
-        {nodes, Nodes},
-        {api_auth_header, erpc:call(N1, emqx_mgmt_api_test_util, auth_header_, [])}
-        | Config
-    ];
+    case emqx_ds_test_helpers:skip_if_norepl() of
+        false ->
+            AppSpecs = [
+                {emqx,
+                    "durable_sessions.enable = true\n"
+                    "durable_sessions.disconnected_session_count_refresh_interval = 100ms"},
+                emqx_management
+            ],
+            Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(),
+            Cluster = [
+                {emqx_mgmt_api_clients_SUITE1, #{apps => AppSpecs ++ [Dashboard]}},
+                {emqx_mgmt_api_clients_SUITE2, #{apps => AppSpecs}}
+            ],
+            Nodes =
+                [N1 | _] = emqx_cth_cluster:start(
+                    Cluster,
+                    #{work_dir => emqx_cth_suite:work_dir(Config)}
+                ),
+            [
+                {nodes, Nodes},
+                {api_auth_header, erpc:call(N1, emqx_mgmt_api_test_util, auth_header_, [])}
+                | Config
+            ];
+        Yes ->
+            Yes
+    end;
 init_per_group(non_persistent_cluster, Config) ->
 init_per_group(non_persistent_cluster, Config) ->
     AppSpecs = [
     AppSpecs = [
         emqx,
         emqx,
@@ -327,11 +332,8 @@ t_persistent_sessions1(Config) ->
             C2 = connect_client(#{port => Port1, clientid => ClientId}),
             C2 = connect_client(#{port => Port1, clientid => ClientId}),
             assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config),
             assert_single_client(#{node => N1, clientid => ClientId, status => connected}, Config),
             %% 4) Client disconnects.
             %% 4) Client disconnects.
-            ok = emqtt:stop(C2),
             %% 5) Session is GC'ed, client is removed from list.
             %% 5) Session is GC'ed, client is removed from list.
-            ?tp(notice, "gc", #{}),
-            %% simulate GC
-            ok = erpc:call(N1, emqx_persistent_session_ds, destroy_session, [ClientId]),
+            disconnect_and_destroy_session(C2),
             ?retry(
             ?retry(
                 100,
                 100,
                 20,
                 20,
@@ -511,7 +513,7 @@ t_persistent_sessions5(Config) ->
                 list_request(#{limit => 2, page => 1}, Config)
                 list_request(#{limit => 2, page => 1}, Config)
             ),
             ),
             %% Disconnect persistent sessions
             %% Disconnect persistent sessions
-            lists:foreach(fun emqtt:stop/1, [C1, C2]),
+            lists:foreach(fun stop_and_commit/1, [C1, C2]),
 
 
             P3 =
             P3 =
                 ?retry(200, 10, begin
                 ?retry(200, 10, begin

+ 6 - 1
mix.exs

@@ -467,7 +467,12 @@ defmodule EMQXUmbrella.MixProject do
       {:d, :snk_kind, :msg}
       {:d, :snk_kind, :msg}
     ] ++
     ] ++
       singleton(test_env?(), {:d, :TEST}) ++
       singleton(test_env?(), {:d, :TEST}) ++
-      singleton(not enable_quicer?(), {:d, :BUILD_WITHOUT_QUIC})
+      singleton(not enable_quicer?(), {:d, :BUILD_WITHOUT_QUIC}) ++
+      singleton(store_state_in_ds?(), {:d, :STORE_STATE_IN_DS, true})
+  end
+
+  defp store_state_in_ds?() do
+    "1" == System.get_env("STORE_STATE_IN_DS")
   end
   end
 
 
   defp singleton(false, _value), do: []
   defp singleton(false, _value), do: []

+ 1 - 0
rebar.config.erl

@@ -223,6 +223,7 @@ common_compile_opts(Edition, _RelType, Vsn) ->
         {d, 'EMQX_RELEASE_EDITION', Edition}
         {d, 'EMQX_RELEASE_EDITION', Edition}
     ] ++
     ] ++
         [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++
         [{d, 'EMQX_BENCHMARK'} || os:getenv("EMQX_BENCHMARK") =:= "1"] ++
+        [{d, 'STORE_STATE_IN_DS'} || os:getenv("STORE_STATE_IN_DS") =:= "1"] ++
         [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()].
         [{d, 'BUILD_WITHOUT_QUIC'} || not is_quicer_supported()].
 
 
 warn_profile_env() ->
 warn_profile_env() ->