Просмотр исходного кода

refactor(ds): Use a simple improper list to represent the streams

ieQu1 2 лет назад
Родитель
Сommit
2e56810ea2

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -23,6 +23,7 @@
 {emqx_ds,1}.
 {emqx_ds,2}.
 {emqx_ds,3}.
+{emqx_ds,4}.
 {emqx_eviction_agent,1}.
 {emqx_eviction_agent,2}.
 {emqx_exhook,1}.

+ 47 - 24
apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl

@@ -43,7 +43,9 @@
     do_drop_db_v1/1,
     do_store_batch_v1/4,
     do_get_streams_v1/4,
+    do_get_streams_v2/4,
     do_make_iterator_v1/5,
+    do_make_iterator_v2/5,
     do_update_iterator_v2/4,
     do_next_v1/4,
     do_add_generation_v2/1,
@@ -51,7 +53,9 @@
     do_drop_generation_v3/3
 ]).
 
--export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]).
+-export_type([
+    shard_id/0, builtin_db_opts/0, stream_v1/0, stream/0, iterator/0, message_id/0, batch/0
+]).
 
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include("emqx_ds_replication_layer.hrl").
@@ -72,17 +76,19 @@
 
 %% This enapsulates the stream entity from the replication level.
 %%
-%% TODO: currently the stream is hardwired to only support the
-%% internal rocksdb storage. In the future we want to add another
-%% implementations for emqx_ds, so this type has to take this into
-%% account.
--opaque stream() ::
+%% TODO: this type is obsolete and is kept only for compatibility with
+%% v3 BPAPI. Remove it when emqx_ds_proto_v4 is gone (EMQX 5.6)
+-opaque stream_v1() ::
     #{
         ?tag := ?STREAM,
         ?shard := emqx_ds_replication_layer:shard_id(),
-        ?enc := emqx_ds_storage_layer:stream()
+        ?enc := emqx_ds_storage_layer:stream_v1()
     }.
 
+-define(stream_v2(SHARD, INNER), [2, SHARD | INNER]).
+
+-opaque stream() :: nonempty_maybe_improper_list().
+
 -opaque iterator() ::
     #{
         ?tag := ?IT,
@@ -121,7 +127,7 @@ open_db(DB, CreateOpts) ->
 -spec add_generation(emqx_ds:db()) -> ok | {error, _}.
 add_generation(DB) ->
     Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
-    _ = emqx_ds_proto_v3:add_generation(Nodes, DB),
+    _ = emqx_ds_proto_v4:add_generation(Nodes, DB),
     ok.
 
 -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
@@ -140,7 +146,7 @@ list_generations_with_lifetimes(DB) ->
                     AccInner#{{Shard, GenId} => Data}
                 end,
                 GensAcc,
-                emqx_ds_proto_v3:list_generations_with_lifetimes(Node, DB, Shard)
+                emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)
             )
         end,
         #{},
@@ -152,12 +158,12 @@ drop_generation(DB, {Shard, GenId}) ->
     %% TODO: drop generation in all nodes in the replica set, not only in the leader,
     %% after we have proper replication in place.
     Node = node_of_shard(DB, Shard),
-    emqx_ds_proto_v3:drop_generation(Node, DB, Shard, GenId).
+    emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId).
 
 -spec drop_db(emqx_ds:db()) -> ok | {error, _}.
 drop_db(DB) ->
     Nodes = list_nodes(),
-    _ = emqx_ds_proto_v3:drop_db(Nodes, DB),
+    _ = emqx_ds_proto_v4:drop_db(Nodes, DB),
     _ = emqx_ds_replication_layer_meta:drop_db(DB),
     emqx_ds_builtin_sup:stop_db(DB),
     ok.
@@ -174,16 +180,12 @@ get_streams(DB, TopicFilter, StartTime) ->
     lists:flatmap(
         fun(Shard) ->
             Node = node_of_shard(DB, Shard),
-            Streams = emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, StartTime),
+            Streams = emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime),
             lists:map(
-                fun({RankY, Stream}) ->
+                fun({RankY, StorageLayerStream}) ->
                     RankX = Shard,
                     Rank = {RankX, RankY},
-                    {Rank, #{
-                        ?tag => ?STREAM,
-                        ?shard => Shard,
-                        ?enc => Stream
-                    }}
+                    {Rank, ?stream_v2(Shard, StorageLayerStream)}
                 end,
                 Streams
             )
@@ -194,9 +196,9 @@ get_streams(DB, TopicFilter, StartTime) ->
 -spec make_iterator(emqx_ds:db(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(DB, Stream, TopicFilter, StartTime) ->
-    #{?tag := ?STREAM, ?shard := Shard, ?enc := StorageStream} = Stream,
+    ?stream_v2(Shard, StorageStream) = Stream,
     Node = node_of_shard(DB, Shard),
-    case emqx_ds_proto_v3:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
+    case emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of
         {ok, Iter} ->
             {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}};
         Err = {error, _} ->
@@ -213,7 +215,7 @@ update_iterator(DB, OldIter, DSKey) ->
     #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter,
     Node = node_of_shard(DB, Shard),
     case
-        emqx_ds_proto_v3:update_iterator(
+        emqx_ds_proto_v4:update_iterator(
             Node,
             DB,
             Shard,
@@ -239,7 +241,7 @@ next(DB, Iter0, BatchSize) ->
     %%
     %% This kind of trickery should be probably done here in the
     %% replication layer. Or, perhaps, in the logic layer.
-    case emqx_ds_proto_v3:next(Node, DB, Shard, StorageIter0, BatchSize) of
+    case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of
         {ok, StorageIter, Batch} ->
             Iter = Iter0#{?enc := StorageIter},
             {ok, Iter, Batch};
@@ -311,14 +313,35 @@ do_drop_db_v1(DB) ->
 do_store_batch_v1(DB, Shard, #{?tag := ?BATCH, ?batch_messages := Messages}, Options) ->
     emqx_ds_storage_layer:store_batch({DB, Shard}, Messages, Options).
 
+%% Remove me in EMQX 5.6
+-dialyzer({nowarn_function, do_get_streams_v1/4}).
 -spec do_get_streams_v1(
     emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
+) ->
+    [{integer(), emqx_ds_storage_layer:stream_v1()}].
+do_get_streams_v1(_DB, _Shard, _TopicFilter, _StartTime) ->
+    error(obsolete_api).
+
+-spec do_get_streams_v2(
+    emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:topic_filter(), emqx_ds:time()
 ) ->
     [{integer(), emqx_ds_storage_layer:stream()}].
-do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
+do_get_streams_v2(DB, Shard, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:get_streams({DB, Shard}, TopicFilter, StartTime).
 
+-dialyzer({nowarn_function, do_make_iterator_v1/5}).
 -spec do_make_iterator_v1(
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:stream_v1(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+do_make_iterator_v1(_DB, _Shard, _Stream, _TopicFilter, _StartTime) ->
+    error(obsolete_api).
+
+-spec do_make_iterator_v2(
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
     emqx_ds_storage_layer:stream(),
@@ -326,7 +349,7 @@ do_get_streams_v1(DB, Shard, TopicFilter, StartTime) ->
     emqx_ds:time()
 ) ->
     {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
-do_make_iterator_v1(DB, Shard, Stream, TopicFilter, StartTime) ->
+do_make_iterator_v2(DB, Shard, Stream, TopicFilter, StartTime) ->
     emqx_ds_storage_layer:make_iterator({DB, Shard}, Stream, TopicFilter, StartTime).
 
 -spec do_update_iterator_v2(

+ 4 - 9
apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -89,11 +89,7 @@
 
 -type s() :: #s{}.
 
--type stream() ::
-    #{
-        ?tag := ?STREAM,
-        ?storage_key := emqx_ds_lts:msg_storage_key()
-    }.
+-type stream() :: emqx_ds_lts:msg_storage_key().
 
 -type iterator() ::
     #{
@@ -251,8 +247,7 @@ store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
     emqx_ds:time()
 ) -> [stream()].
 get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
-    Indexes = emqx_ds_lts:match_topics(Trie, TopicFilter),
-    [#{?tag => ?STREAM, ?storage_key => I} || I <- Indexes].
+    emqx_ds_lts:match_topics(Trie, TopicFilter).
 
 -spec make_iterator(
     emqx_ds_storage_layer:shard_id(),
@@ -262,7 +257,7 @@ get_streams(_Shard, #s{trie = Trie}, TopicFilter, _StartTime) ->
     emqx_ds:time()
 ) -> {ok, iterator()}.
 make_iterator(
-    _Shard, _Data, #{?tag := ?STREAM, ?storage_key := StorageKey}, TopicFilter, StartTime
+    _Shard, _Data, StorageKey, TopicFilter, StartTime
 ) ->
     %% Note: it's a good idea to keep the iterator structure lean,
     %% since it can be stored on a remote node that could update its

+ 11 - 9
apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl

@@ -43,6 +43,7 @@
     generation/0,
     cf_refs/0,
     stream/0,
+    stream_v1/0,
     iterator/0,
     shard_id/0,
     options/0,
@@ -54,6 +55,8 @@
 
 -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}).
 
+-define(stream_v2(GENERATION, INNER), [GENERATION | INNER]).
+
 %%================================================================================
 %% Type declarations
 %%================================================================================
@@ -80,14 +83,17 @@
 
 -type gen_id() :: 0..16#ffff.
 
-%% Note: this might be stored permanently on a remote node.
--opaque stream() ::
+%% TODO: kept for BPAPI compatibility. Remove me on EMQX v5.6
+-opaque stream_v1() ::
     #{
         ?tag := ?STREAM,
         ?generation := gen_id(),
         ?enc := term()
     }.
 
+%% Note: this might be stored permanently on a remote node.
+-opaque stream() :: nonempty_maybe_improper_list(gen_id(), term()).
+
 %% Note: this might be stred permanently on a remote node.
 -opaque iterator() ::
     #{
@@ -221,12 +227,8 @@ get_streams(Shard, TopicFilter, StartTime) ->
                 {ok, #{module := Mod, data := GenData}} ->
                     Streams = Mod:get_streams(Shard, GenData, TopicFilter, StartTime),
                     [
-                        {GenId, #{
-                            ?tag => ?STREAM,
-                            ?generation => GenId,
-                            ?enc => Stream
-                        }}
-                     || Stream <- Streams
+                        {GenId, ?stream_v2(GenId, InnerStream)}
+                     || InnerStream <- Streams
                     ];
                 {error, not_found} ->
                     %% race condition: generation was dropped before getting its streams?
@@ -239,7 +241,7 @@ get_streams(Shard, TopicFilter, StartTime) ->
 -spec make_iterator(shard_id(), stream(), emqx_ds:topic_filter(), emqx_ds:time()) ->
     emqx_ds:make_iterator_result(iterator()).
 make_iterator(
-    Shard, #{?tag := ?STREAM, ?generation := GenId, ?enc := Stream}, TopicFilter, StartTime
+    Shard, ?stream_v2(GenId, Stream), TopicFilter, StartTime
 ) ->
     case generation_get_safe(Shard, GenId) of
         {ok, #{module := Mod, data := GenData}} ->

+ 7 - 4
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -28,7 +28,7 @@
 ]).
 
 %% behavior callbacks:
--export([introduced_in/0]).
+-export([introduced_in/0, deprecated_since/0]).
 
 %%================================================================================
 %% API funcions
@@ -45,7 +45,7 @@ drop_db(Node, DB) ->
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
-    [{integer(), emqx_ds_storage_layer:stream()}].
+    [{integer(), emqx_ds_storage_layer:stream_v1()}].
 get_streams(Node, DB, Shard, TopicFilter, Time) ->
     erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
 
@@ -53,7 +53,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
     node(),
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:stream(),
+    emqx_ds_storage_layer:stream_v1(),
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
@@ -95,3 +95,6 @@ store_batch(Node, DB, Shard, Batch, Options) ->
 
 introduced_in() ->
     "5.4.0".
+
+deprecated_since() ->
+    "5.5.0".

+ 7 - 4
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v2.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -32,7 +32,7 @@
 ]).
 
 %% behavior callbacks:
--export([introduced_in/0]).
+-export([introduced_in/0, deprecated_since/0]).
 
 %%================================================================================
 %% API funcions
@@ -50,7 +50,7 @@ drop_db(Node, DB) ->
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
-    [{integer(), emqx_ds_storage_layer:stream()}].
+    [{integer(), emqx_ds_storage_layer:stream_v1()}].
 get_streams(Node, DB, Shard, TopicFilter, Time) ->
     erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
 
@@ -58,7 +58,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
     node(),
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:stream(),
+    emqx_ds_storage_layer:stream_v1(),
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
@@ -122,3 +122,6 @@ add_generation(Node, DB) ->
 
 introduced_in() ->
     "5.5.0".
+
+deprecated_since() ->
+    "5.5.0".

+ 7 - 4
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v3.erl

@@ -34,7 +34,7 @@
 ]).
 
 %% behavior callbacks:
--export([introduced_in/0]).
+-export([introduced_in/0, deprecated_since/0]).
 
 %%================================================================================
 %% API funcions
@@ -52,7 +52,7 @@ drop_db(Node, DB) ->
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
-    [{integer(), emqx_ds_storage_layer:stream()}].
+    [{integer(), emqx_ds_storage_layer:stream_v1()}].
 get_streams(Node, DB, Shard, TopicFilter, Time) ->
     erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v1, [DB, Shard, TopicFilter, Time]).
 
@@ -60,7 +60,7 @@ get_streams(Node, DB, Shard, TopicFilter, Time) ->
     node(),
     emqx_ds:db(),
     emqx_ds_replication_layer:shard_id(),
-    emqx_ds_storage_layer:stream(),
+    emqx_ds_storage_layer:stream_v1(),
     emqx_ds:topic_filter(),
     emqx_ds:time()
 ) ->
@@ -144,4 +144,7 @@ drop_generation(Node, DB, Shard, GenId) ->
 %%================================================================================
 
 introduced_in() ->
-    "5.6.0".
+    "5.5.0".
+
+deprecated_since() ->
+    "5.5.1".

+ 147 - 0
apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl

@@ -0,0 +1,147 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_ds_proto_v4).
+
+-behavior(emqx_bpapi).
+
+-include_lib("emqx_utils/include/bpapi.hrl").
+%% API:
+-export([
+    drop_db/2,
+    store_batch/5,
+    get_streams/5,
+    make_iterator/6,
+    next/5,
+    update_iterator/5,
+    add_generation/2,
+
+    %% introduced in v3
+    list_generations_with_lifetimes/3,
+    drop_generation/4
+]).
+
+%% behavior callbacks:
+-export([introduced_in/0]).
+
+%%================================================================================
+%% API funcions
+%%================================================================================
+
+-spec drop_db([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+drop_db(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]).
+
+-spec get_streams(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    [{integer(), emqx_ds_storage_layer:stream()}].
+get_streams(Node, DB, Shard, TopicFilter, Time) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_get_streams_v2, [DB, Shard, TopicFilter, Time]).
+
+-spec make_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:stream(),
+    emqx_ds:topic_filter(),
+    emqx_ds:time()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_make_iterator_v2, [
+        DB, Shard, Stream, TopicFilter, StartTime
+    ]).
+
+-spec next(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    pos_integer()
+) ->
+    {ok, emqx_ds_storage_layer:iterator(), [{emqx_ds:message_key(), [emqx_types:message()]}]}
+    | {ok, end_of_stream}
+    | {error, _}.
+next(Node, DB, Shard, Iter, BatchSize) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_next_v1, [DB, Shard, Iter, BatchSize]).
+
+-spec store_batch(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_replication_layer:batch(),
+    emqx_ds:message_store_opts()
+) ->
+    emqx_ds:store_batch_result().
+store_batch(Node, DB, Shard, Batch, Options) ->
+    emqx_rpc:call(Shard, Node, emqx_ds_replication_layer, do_store_batch_v1, [
+        DB, Shard, Batch, Options
+    ]).
+
+-spec update_iterator(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:iterator(),
+    emqx_ds:message_key()
+) ->
+    {ok, emqx_ds_storage_layer:iterator()} | {error, _}.
+update_iterator(Node, DB, Shard, OldIter, DSKey) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_update_iterator_v2, [
+        DB, Shard, OldIter, DSKey
+    ]).
+
+-spec add_generation([node()], emqx_ds:db()) ->
+    [{ok, ok} | {error, _}].
+add_generation(Node, DB) ->
+    erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v2, [DB]).
+
+%%--------------------------------------------------------------------------------
+%% Introduced in V3
+%%--------------------------------------------------------------------------------
+
+-spec list_generations_with_lifetimes(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id()
+) ->
+    #{
+        emqx_ds:ds_specific_generation_rank() => emqx_ds:generation_info()
+    }.
+list_generations_with_lifetimes(Node, DB, Shard) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_list_generations_with_lifetimes_v3, [DB, Shard]).
+
+-spec drop_generation(
+    node(),
+    emqx_ds:db(),
+    emqx_ds_replication_layer:shard_id(),
+    emqx_ds_storage_layer:gen_id()
+) ->
+    ok | {error, _}.
+drop_generation(Node, DB, Shard, GenId) ->
+    erpc:call(Node, emqx_ds_replication_layer, do_drop_generation_v3, [DB, Shard, GenId]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+introduced_in() ->
+    "5.5.1".