Explorar o código

Merge pull request #11191 from keynslug/ft/EMQX-9591/msg-persistence-minimal

feat(ds): hardwire emqx_durable_storage message persistence
Andrew Mayorov %!s(int64=2) %!d(string=hai) anos
pai
achega
21fbf79c29

+ 1 - 0
apps/emqx/rebar.config

@@ -23,6 +23,7 @@
 %% `git_subdir` dependency in other projects.
 %% `git_subdir` dependency in other projects.
 {deps, [
 {deps, [
     {emqx_utils, {path, "../emqx_utils"}},
     {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_durable_storage, {path, "../emqx_durable_storage"}},
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},

+ 2 - 1
apps/emqx/src/emqx.app.src

@@ -16,7 +16,8 @@
         sasl,
         sasl,
         os_mon,
         os_mon,
         lc,
         lc,
-        hocon
+        hocon,
+        emqx_durable_storage
     ]},
     ]},
     {mod, {emqx_app, []}},
     {mod, {emqx_app, []}},
     {env, []},
     {env, []},

+ 1 - 0
apps/emqx/src/emqx_app.erl

@@ -39,6 +39,7 @@
 start(_Type, _Args) ->
 start(_Type, _Args) ->
     ok = maybe_load_config(),
     ok = maybe_load_config(),
     ok = emqx_persistent_session:init_db_backend(),
     ok = emqx_persistent_session:init_db_backend(),
+    _ = emqx_persistent_session_ds:init(),
     ok = maybe_start_quicer(),
     ok = maybe_start_quicer(),
     ok = emqx_bpapi:start(),
     ok = emqx_bpapi:start(),
     ok = emqx_alarm_handler:load(),
     ok = emqx_alarm_handler:load(),

+ 1 - 0
apps/emqx/src/emqx_broker.erl

@@ -225,6 +225,7 @@ publish(Msg) when is_record(Msg, message) ->
             [];
             [];
         Msg1 = #message{topic = Topic} ->
         Msg1 = #message{topic = Topic} ->
             emqx_persistent_session:persist_message(Msg1),
             emqx_persistent_session:persist_message(Msg1),
+            _ = emqx_persistent_session_ds:persist_message(Msg1),
             route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
             route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
     end.
     end.
 
 

+ 86 - 0
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -0,0 +1,86 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds).
+
+-export([init/0]).
+
+-export([persist_message/1]).
+
+-export([
+    serialize_message/1,
+    deserialize_message/1
+]).
+
+%% FIXME
+-define(DS_SHARD, <<"local">>).
+
+-define(WHEN_ENABLED(DO),
+    case is_store_enabled() of
+        true -> DO;
+        false -> {skipped, disabled}
+    end
+).
+
+%%
+
+init() ->
+    ?WHEN_ENABLED(
+        ok = emqx_ds:ensure_shard(?DS_SHARD, #{
+            dir => filename:join([emqx:data_dir(), ds, messages, ?DS_SHARD])
+        })
+    ).
+
+%%
+
+-spec persist_message(emqx_types:message()) ->
+    ok | {skipped, _Reason} | {error, _TODO}.
+persist_message(Msg) ->
+    ?WHEN_ENABLED(
+        case needs_persistence(Msg) andalso find_subscribers(Msg) of
+            [_ | _] ->
+                store_message(Msg);
+            % [] ->
+            %     {skipped, no_subscribers};
+            false ->
+                {skipped, needs_no_persistence}
+        end
+    ).
+
+needs_persistence(Msg) ->
+    not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
+
+store_message(Msg) ->
+    ID = emqx_message:id(Msg),
+    Timestamp = emqx_guid:timestamp(ID),
+    Topic = emqx_topic:words(emqx_message:topic(Msg)),
+    emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
+
+find_subscribers(_Msg) ->
+    [node()].
+
+%%
+
+serialize_message(Msg) ->
+    term_to_binary(emqx_message:to_map(Msg)).
+
+deserialize_message(Bin) ->
+    emqx_message:from_map(binary_to_term(Bin)).
+
+%%
+
+is_store_enabled() ->
+    emqx_config:get([persistent_session_store, ds]).

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -319,6 +319,14 @@ fields("persistent_session_store") ->
                     desc => ?DESC(persistent_session_store_enabled)
                     desc => ?DESC(persistent_session_store_enabled)
                 }
                 }
             )},
             )},
+        {"ds",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {"on_disc",
         {"on_disc",
             sc(
             sc(
                 boolean(),
                 boolean(),

+ 116 - 0
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -0,0 +1,116 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_messages_SUITE).
+
+-include_lib("stdlib/include/assert.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(NOW,
+    (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
+).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_durable_storage),
+    ok = emqx_common_test_helpers:start_apps([], fun
+        (emqx) ->
+            emqx_common_test_helpers:boot_modules(all),
+            emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>),
+            emqx_app:set_config_loader(?MODULE);
+        (_) ->
+            ok
+    end),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([]),
+    application:stop(emqx_durable_storage),
+    ok.
+
+t_messages_persisted(_Config) ->
+    C1 = connect(<<?MODULE_STRING "1">>, true, 30),
+    C2 = connect(<<?MODULE_STRING "2">>, false, 60),
+    C3 = connect(<<?MODULE_STRING "3">>, false, undefined),
+    C4 = connect(<<?MODULE_STRING "4">>, false, 0),
+
+    CP = connect(<<?MODULE_STRING "-pub">>, true, undefined),
+
+    {ok, _, [1]} = emqtt:subscribe(C1, <<"client/+/topic">>, qos1),
+    {ok, _, [0]} = emqtt:subscribe(C2, <<"client/+/topic">>, qos0),
+    {ok, _, [1]} = emqtt:subscribe(C2, <<"random/+">>, qos1),
+    {ok, _, [2]} = emqtt:subscribe(C3, <<"client/#">>, qos2),
+    {ok, _, [0]} = emqtt:subscribe(C4, <<"random/#">>, qos0),
+
+    Messages = [
+        M1 = {<<"client/1/topic">>, <<"1">>},
+        M2 = {<<"client/2/topic">>, <<"2">>},
+        M3 = {<<"client/3/topic/sub">>, <<"3">>},
+        M4 = {<<"client/4">>, <<"4">>},
+        M5 = {<<"random/5">>, <<"5">>},
+        M6 = {<<"random/6/topic">>, <<"6">>},
+        M7 = {<<"client/7/topic">>, <<"7">>},
+        M8 = {<<"client/8/topic/sub">>, <<"8">>},
+        M9 = {<<"random/9">>, <<"9">>},
+        M10 = {<<"random/10">>, <<"10">>}
+    ],
+
+    Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages],
+
+    ct:pal("Results = ~p", [Results]),
+
+    Persisted = consume(<<"local">>, {['#'], 0}),
+
+    ct:pal("Persisted = ~p", [Persisted]),
+
+    ?assertEqual(
+        % [M1, M2, M5, M7, M9, M10],
+        [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10],
+        [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
+    ),
+
+    ok.
+
+%%
+
+connect(ClientId, CleanStart, EI) ->
+    {ok, Client} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {clean_start, CleanStart},
+        {properties,
+            maps:from_list(
+                [{'Session-Expiry-Interval', EI} || is_integer(EI)]
+            )}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
+consume(Shard, Replay) ->
+    {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
+    consume(It).
+
+consume(It) ->
+    case emqx_ds_storage_layer:next(It) of
+        {value, Msg, NIt} ->
+            [emqx_persistent_session_ds:deserialize_message(Msg) | consume(NIt)];
+        none ->
+            []
+    end.

+ 13 - 0
apps/emqx_durable_storage/src/emqx_ds.erl

@@ -16,6 +16,7 @@
 -module(emqx_ds).
 -module(emqx_ds).
 
 
 %% API:
 %% API:
+-export([ensure_shard/2]).
 %%   Messages:
 %%   Messages:
 -export([message_store/2, message_store/1, message_stats/0]).
 -export([message_store/2, message_store/1, message_stats/0]).
 %%   Iterator:
 %%   Iterator:
@@ -79,6 +80,18 @@
 %% API funcions
 %% API funcions
 %%================================================================================
 %%================================================================================
 
 
+-spec ensure_shard(shard(), emqx_ds_storage_layer:options()) ->
+    ok | {error, _Reason}.
+ensure_shard(Shard, Options) ->
+    case emqx_ds_storage_layer_sup:start_shard(Shard, Options) of
+        {ok, _Pid} ->
+            ok;
+        {error, {already_started, _Pid}} ->
+            ok;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
 %%--------------------------------------------------------------------------------
 %%--------------------------------------------------------------------------------
 %% Message
 %% Message
 %%--------------------------------------------------------------------------------
 %%--------------------------------------------------------------------------------

+ 1 - 1
apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl

@@ -175,7 +175,7 @@
     cf :: rocksdb:cf_handle(),
     cf :: rocksdb:cf_handle(),
     keymapper :: keymapper(),
     keymapper :: keymapper(),
     write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
     write_options = [{sync, true}] :: emqx_ds_storage_layer:db_write_options(),
-    read_options = [] :: emqx_ds_storage_layer:db_write_options()
+    read_options = [] :: emqx_ds_storage_layer:db_read_options()
 }).
 }).
 
 
 -record(it, {
 -record(it, {

+ 23 - 17
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -6,7 +6,7 @@
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
 %% API:
 %% API:
--export([start_link/1]).
+-export([start_link/2]).
 -export([create_generation/3]).
 -export([create_generation/3]).
 
 
 -export([store/5]).
 -export([store/5]).
@@ -18,7 +18,8 @@
 %% behaviour callbacks:
 %% behaviour callbacks:
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
 
 
--export_type([cf_refs/0, gen_id/0, db_write_options/0, state/0, iterator/0]).
+-export_type([cf_refs/0, gen_id/0, options/0, state/0, iterator/0]).
+-export_type([db_options/0, db_write_options/0, db_read_options/0]).
 
 
 -compile({inline, [meta_lookup/2]}).
 -compile({inline, [meta_lookup/2]}).
 
 
@@ -26,10 +27,16 @@
 %% Type declarations
 %% Type declarations
 %%================================================================================
 %%================================================================================
 
 
-%% see rocksdb:db_options()
-% -type options() :: proplists:proplist().
+-type options() :: #{
+    dir => file:filename()
+}.
 
 
+%% see rocksdb:db_options()
+-type db_options() :: proplists:proplist().
+%% see rocksdb:write_options()
 -type db_write_options() :: proplists:proplist().
 -type db_write_options() :: proplists:proplist().
+%% see rocksdb:read_options()
+-type db_read_options() :: proplists:proplist().
 
 
 -type cf_refs() :: [{string(), rocksdb:cf_handle()}].
 -type cf_refs() :: [{string(), rocksdb:cf_handle()}].
 
 
@@ -110,18 +117,16 @@
 %% API funcions
 %% API funcions
 %%================================================================================
 %%================================================================================
 
 
--spec start_link(emqx_ds:shard()) -> {ok, pid()}.
-start_link(Shard) ->
-    gen_server:start_link(?REF(Shard), ?MODULE, [Shard], []).
+-spec start_link(emqx_ds:shard(), emqx_ds_storage_layer:options()) -> {ok, pid()}.
+start_link(Shard, Options) ->
+    gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []).
 
 
 -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
 -spec create_generation(emqx_ds:shard(), emqx_ds:time(), emqx_ds_conf:backend_config()) ->
     {ok, gen_id()} | {error, nonmonotonic}.
     {ok, gen_id()} | {error, nonmonotonic}.
 create_generation(Shard, Since, Config = {_Module, _Options}) ->
 create_generation(Shard, Since, Config = {_Module, _Options}) ->
     gen_server:call(?REF(Shard), {create_generation, Since, Config}).
     gen_server:call(?REF(Shard), {create_generation, Since, Config}).
 
 
--spec store(
-    emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()
-) ->
+-spec store(emqx_ds:shard(), emqx_guid:guid(), emqx_ds:time(), emqx_ds:topic(), binary()) ->
     ok | {error, _}.
     ok | {error, _}.
 store(Shard, GUID, Time, Topic, Msg) ->
 store(Shard, GUID, Time, Topic, Msg) ->
     {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
     {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time),
@@ -181,9 +186,9 @@ discard_iterator(Shard, ReplayID) ->
 %% behaviour callbacks
 %% behaviour callbacks
 %%================================================================================
 %%================================================================================
 
 
-init([Shard]) ->
+init({Shard, Options}) ->
     process_flag(trap_exit, true),
     process_flag(trap_exit, true),
-    {ok, S0} = open_db(Shard),
+    {ok, S0} = open_db(Shard, Options),
     S = ensure_current_generation(S0),
     S = ensure_current_generation(S0),
     ok = populate_metadata(S),
     ok = populate_metadata(S),
     {ok, S}.
     {ok, S}.
@@ -265,16 +270,17 @@ create_gen(GenId, Since, {Module, Options}, S = #s{db = DBHandle, cf_generations
     },
     },
     {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
     {ok, Gen, S#s{cf_generations = NewCFs ++ CFs}}.
 
 
--spec open_db(emqx_ds:shard()) -> {ok, state()} | {error, _TODO}.
-open_db(Shard) ->
-    Filename = binary_to_list(Shard),
+-spec open_db(emqx_ds:shard(), options()) -> {ok, state()} | {error, _TODO}.
+open_db(Shard, Options) ->
+    DBDir = unicode:characters_to_list(maps:get(dir, Options, Shard)),
     DBOptions = [
     DBOptions = [
         {create_if_missing, true},
         {create_if_missing, true},
         {create_missing_column_families, true}
         {create_missing_column_families, true}
         | emqx_ds_conf:db_options()
         | emqx_ds_conf:db_options()
     ],
     ],
+    _ = filelib:ensure_dir(DBDir),
     ExistingCFs =
     ExistingCFs =
-        case rocksdb:list_column_families(Filename, DBOptions) of
+        case rocksdb:list_column_families(DBDir, DBOptions) of
             {ok, CFs} ->
             {ok, CFs} ->
                 [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
                 [{Name, []} || Name <- CFs, Name /= ?DEFAULT_CF, Name /= ?ITERATOR_CF];
             % DB is not present. First start
             % DB is not present. First start
@@ -286,7 +292,7 @@ open_db(Shard) ->
         {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
         {?ITERATOR_CF, ?ITERATOR_CF_OPTS}
         | ExistingCFs
         | ExistingCFs
     ],
     ],
-    case rocksdb:open(Filename, DBOptions, ColumnFamilies) of
+    case rocksdb:open(DBDir, DBOptions, ColumnFamilies) of
         {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
         {ok, DBHandle, [_CFDefault, CFIterator | CFRefs]} ->
             {CFNames, _} = lists:unzip(ExistingCFs),
             {CFNames, _} = lists:unzip(ExistingCFs),
             {ok, #s{
             {ok, #s{

+ 9 - 7
apps/emqx_durable_storage/src/emqx_ds_storage_layer_sup.erl

@@ -6,7 +6,7 @@
 -behaviour(supervisor).
 -behaviour(supervisor).
 
 
 %% API:
 %% API:
--export([start_link/0, start_shard/1, stop_shard/1]).
+-export([start_link/0, start_shard/2, stop_shard/1]).
 
 
 %% behaviour callbacks:
 %% behaviour callbacks:
 -export([init/1]).
 -export([init/1]).
@@ -25,9 +25,10 @@
 start_link() ->
 start_link() ->
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
     supervisor:start_link({local, ?SUP}, ?MODULE, []).
 
 
--spec start_shard(emqx_ds:shard()) -> supervisor:startchild_ret().
-start_shard(Shard) ->
-    supervisor:start_child(?SUP, shard_child_spec(Shard)).
+-spec start_shard(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+    supervisor:startchild_ret().
+start_shard(Shard, Options) ->
+    supervisor:start_child(?SUP, shard_child_spec(Shard, Options)).
 
 
 -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
 -spec stop_shard(emqx_ds:shard()) -> ok | {error, _}.
 stop_shard(Shard) ->
 stop_shard(Shard) ->
@@ -51,11 +52,12 @@ init([]) ->
 %% Internal functions
 %% Internal functions
 %%================================================================================
 %%================================================================================
 
 
--spec shard_child_spec(emqx_ds:shard()) -> supervisor:child_spec().
-shard_child_spec(Shard) ->
+-spec shard_child_spec(emqx_ds:shard(), emqx_ds_storage_layer:options()) ->
+    supervisor:child_spec().
+shard_child_spec(Shard, Options) ->
     #{
     #{
         id => Shard,
         id => Shard,
-        start => {emqx_ds_storage_layer, start_link, [Shard]},
+        start => {emqx_ds_storage_layer, start_link, [Shard, Options]},
         shutdown => 5_000,
         shutdown => 5_000,
         restart => permanent,
         restart => permanent,
         type => worker
         type => worker

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
     % strict semver, bump manually!
-    {vsn, "0.1.0"},
+    {vsn, "0.1.1"},
     {modules, []},
     {modules, []},
     {registered, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria]},
     {applications, [kernel, stdlib, rocksdb, gproc, mria]},

+ 2 - 2
apps/emqx_durable_storage/test/emqx_ds_storage_layer_SUITE.erl

@@ -33,7 +33,7 @@
 %% Smoke test for opening and reopening the database
 %% Smoke test for opening and reopening the database
 t_open(_Config) ->
 t_open(_Config) ->
     ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
     ok = emqx_ds_storage_layer_sup:stop_shard(?SHARD),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD).
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(?SHARD, #{}).
 
 
 %% Smoke test of store function
 %% Smoke test of store function
 t_store(_Config) ->
 t_store(_Config) ->
@@ -263,7 +263,7 @@ end_per_suite(_Config) ->
 
 
 init_per_testcase(TC, Config) ->
 init_per_testcase(TC, Config) ->
     ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
     ok = set_shard_config(shard(TC), ?DEFAULT_CONFIG),
-    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC)),
+    {ok, _} = emqx_ds_storage_layer_sup:start_shard(shard(TC), #{}),
     Config.
     Config.
 
 
 end_per_testcase(TC, _Config) ->
 end_per_testcase(TC, _Config) ->