Просмотр исходного кода

feat(ds): hardwire emqx_durable_storage message persistence

Only message persistence is currently implemented, irrespectively of
whether there are persistent sessions around or not.
Andrew Mayorov 2 лет назад
Родитель
Сommit
7e76914599

+ 1 - 0
apps/emqx/rebar.config

@@ -23,6 +23,7 @@
 %% `git_subdir` dependency in other projects.
 {deps, [
     {emqx_utils, {path, "../emqx_utils"}},
+    {emqx_durable_storage, {path, "../emqx_durable_storage"}},
     {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
     {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
     {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},

+ 2 - 1
apps/emqx/src/emqx.app.src

@@ -16,7 +16,8 @@
         sasl,
         os_mon,
         lc,
-        hocon
+        hocon,
+        emqx_durable_storage
     ]},
     {mod, {emqx_app, []}},
     {env, []},

+ 1 - 0
apps/emqx/src/emqx_app.erl

@@ -37,6 +37,7 @@
 start(_Type, _Args) ->
     ok = maybe_load_config(),
     ok = emqx_persistent_session:init_db_backend(),
+    _ = emqx_persistent_session_ds:init(),
     ok = maybe_start_quicer(),
     ok = emqx_bpapi:start(),
     ok = emqx_alarm_handler:load(),

+ 1 - 0
apps/emqx/src/emqx_broker.erl

@@ -225,6 +225,7 @@ publish(Msg) when is_record(Msg, message) ->
             [];
         Msg1 = #message{topic = Topic} ->
             emqx_persistent_session:persist_message(Msg1),
+            _ = emqx_persistent_session_ds:persist_message(Msg1),
             route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
     end.
 

+ 87 - 0
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -0,0 +1,87 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_session_ds).
+
+-export([init/0]).
+
+-export([persist_message/1]).
+
+-export([
+    serialize_message/1,
+    deserialize_message/1
+]).
+
+%% FIXME
+-define(DS_SHARD, <<"local">>).
+
+-define(WHEN_ENABLED(DO),
+    case is_store_enabled() of
+        true -> DO;
+        false -> {skipped, disabled}
+    end
+).
+
+%%
+
+init() ->
+    ?WHEN_ENABLED(
+        begin
+            _ = emqx_ds_storage_layer_sup:start_shard(?DS_SHARD),
+            ok
+        end
+    ).
+
+%%
+
+-spec persist_message(emqx_types:message()) ->
+    ok | {skipped, _Reason} | {error, _TODO}.
+persist_message(Msg) ->
+    ?WHEN_ENABLED(
+        case needs_persistence(Msg) andalso find_subscribers(Msg) of
+            [_ | _] ->
+                store_message(Msg);
+            % [] ->
+            %     {skipped, no_subscribers};
+            false ->
+                {skipped, needs_no_persistence}
+        end
+    ).
+
+needs_persistence(Msg) ->
+    not (emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg)).
+
+store_message(Msg) ->
+    ID = emqx_message:id(Msg),
+    Timestamp = emqx_guid:timestamp(ID),
+    Topic = emqx_topic:words(emqx_message:topic(Msg)),
+    emqx_ds_storage_layer:store(?DS_SHARD, ID, Timestamp, Topic, serialize_message(Msg)).
+
+find_subscribers(_Msg) ->
+    [node()].
+
+%%
+
+serialize_message(Msg) ->
+    term_to_binary(emqx_message:to_map(Msg)).
+
+deserialize_message(Bin) ->
+    emqx_message:from_map(binary_to_term(Bin)).
+
+%%
+
+is_store_enabled() ->
+    emqx_config:get([persistent_session_store, ds]).

+ 8 - 0
apps/emqx/src/emqx_schema.erl

@@ -319,6 +319,14 @@ fields("persistent_session_store") ->
                     desc => ?DESC(persistent_session_store_enabled)
                 }
             )},
+        {"ds",
+            sc(
+                boolean(),
+                #{
+                    default => false,
+                    importance => ?IMPORTANCE_HIDDEN
+                }
+            )},
         {"on_disc",
             sc(
                 boolean(),

+ 123 - 0
apps/emqx/test/emqx_persistent_messages_SUITE.erl

@@ -0,0 +1,123 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_persistent_messages_SUITE).
+
+-include_lib("stdlib/include/assert.hrl").
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-define(NOW,
+    (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}]))
+).
+
+-define(HERE(FMT, ARGS),
+    io:format(
+        user,
+        "*** " ?MODULE_STRING ":~p/~p ~s @ ~p *** " ++ FMT ++ "~n",
+        [?FUNCTION_NAME, ?FUNCTION_ARITY, ?NOW, node() | ARGS]
+    )
+).
+
+all() ->
+    [t_messages_persisted].
+
+init_per_suite(Config) ->
+    {ok, _} = application:ensure_all_started(emqx_durable_storage),
+    ok = emqx_common_test_helpers:start_apps([], fun
+        (emqx) ->
+            emqx_common_test_helpers:boot_modules(all),
+            emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>),
+            emqx_app:set_config_loader(?MODULE);
+        (_) ->
+            ok
+    end),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_common_test_helpers:stop_apps([]),
+    ok.
+
+t_messages_persisted(_Config) ->
+    C1 = connect(<<?MODULE_STRING "1">>, true, 30),
+    C2 = connect(<<?MODULE_STRING "2">>, false, 60),
+    C3 = connect(<<?MODULE_STRING "3">>, false, undefined),
+    C4 = connect(<<?MODULE_STRING "4">>, false, 0),
+
+    CP = connect(<<?MODULE_STRING "-pub">>, true, undefined),
+
+    {ok, _, [1]} = emqtt:subscribe(C1, <<"client/+/topic">>, qos1),
+    {ok, _, [0]} = emqtt:subscribe(C2, <<"client/+/topic">>, qos0),
+    {ok, _, [1]} = emqtt:subscribe(C2, <<"random/+">>, qos1),
+    {ok, _, [2]} = emqtt:subscribe(C3, <<"client/#">>, qos2),
+    {ok, _, [0]} = emqtt:subscribe(C4, <<"random/#">>, qos0),
+
+    Messages = [
+        M1 = {<<"client/1/topic">>, <<"1">>},
+        M2 = {<<"client/2/topic">>, <<"2">>},
+        M3 = {<<"client/3/topic/sub">>, <<"3">>},
+        M4 = {<<"client/4">>, <<"4">>},
+        M5 = {<<"random/5">>, <<"5">>},
+        M6 = {<<"random/6/topic">>, <<"6">>},
+        M7 = {<<"client/7/topic">>, <<"7">>},
+        M8 = {<<"client/8/topic/sub">>, <<"8">>},
+        M9 = {<<"random/9">>, <<"9">>},
+        M10 = {<<"random/10">>, <<"10">>}
+    ],
+
+    Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages],
+
+    ?HERE("Results = ~p", [Results]),
+
+    Persisted = consume(<<"local">>, {['#'], 0}),
+
+    ?HERE("Persisted = ~p", [Persisted]),
+
+    ?assertEqual(
+        % [M1, M2, M5, M7, M9, M10],
+        [M1, M2, M3, M4, M5, M6, M7, M8, M9, M10],
+        [{emqx_message:topic(M), emqx_message:payload(M)} || M <- Persisted]
+    ),
+
+    ok.
+
+%%
+
+connect(ClientId, CleanStart, EI) ->
+    {ok, Client} = emqtt:start_link([
+        {clientid, ClientId},
+        {proto_ver, v5},
+        {clean_start, CleanStart},
+        {properties,
+            maps:from_list(
+                [{'Session-Expiry-Interval', EI} || is_integer(EI)]
+            )}
+    ]),
+    {ok, _} = emqtt:connect(Client),
+    Client.
+
+consume(Shard, Replay) ->
+    {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay),
+    consume(It).
+
+consume(It) ->
+    case emqx_ds_storage_layer:next(It) of
+        {value, Msg, NIt} ->
+            [emqx_persistent_session_ds:deserialize_message(Msg) | consume(NIt)];
+        none ->
+            []
+    end.