Преглед изворни кода

feat(dsfdb): introduce new fdb backend

Co-authored-by: ieQu1 <99872536+ieQu1@users.noreply.github.com>
Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
Ilya Averyanov пре 1 година
родитељ
комит
d51130e68c

+ 1 - 1
apps/emqx/src/emqx.app.src

@@ -2,7 +2,7 @@
 {application, emqx, [
     {id, "emqx"},
     {description, "EMQX Core"},
-    {vsn, "5.4.1"},
+    {vsn, "5.4.2"},
     {modules, []},
     {registered, []},
     {applications, [

+ 2 - 2
apps/emqx/src/emqx_persistent_session_ds.erl

@@ -1115,13 +1115,13 @@ handle_ds_reply(AsyncReply, Session0 = #{s := S0, stream_scheduler_s := SchedS0}
                     S = emqx_persistent_session_ds_state:put_stream(StreamKey, Srs, S3),
                     pull_now(Session#{s := S});
                 {{error, recoverable, Reason}, _Srs, Session} ->
-                    ?SLOG(debug, #{
+                    ?SLOG(info, #{
                         msg => "failed_to_fetch_batch",
                         stream => StreamKey,
                         reason => Reason,
                         class => recoverable
                     }),
-                    Session;
+                    pull_now(Session);
                 {{error, unrecoverable, Reason}, Srs, Session} ->
                     skip_batch(StreamKey, Srs, Session, ClientInfo, Reason)
             end

+ 2 - 1
apps/emqx_ds_backends/rebar.config.script

@@ -22,7 +22,8 @@ EEDeps =
     ],
 PlatformDeps =
     [
-        {emqx_fdb_ds, {path, "../emqx_fdb_ds"}}
+        {emqx_fdb_ds, {path, "../emqx_fdb_ds"}},
+        {emqx_ds_fdb_backend, {path, "../emqx_ds_fdb_backend"}}
     ],
 case Profile of
   ee ->

+ 2 - 2
apps/emqx_ds_backends/src/emqx_ds_backends.app.src.script

@@ -17,7 +17,7 @@ Backends = case Profile of
              ee ->
                [emqx_ds_builtin_local, emqx_ds_builtin_raft];
              platform ->
-               [emqx_ds_builtin_local, emqx_ds_builtin_raft, emqx_fdb_ds]
+               [emqx_ds_builtin_local, emqx_ds_builtin_raft, emqx_fdb_ds, emqx_ds_fdb_backend]
            end,
 
 io:format(user, "DS backends available for this release (~p): ~0p~n", [Profile, Backends]),
@@ -29,5 +29,5 @@ io:format(user, "DS backends available for this release (~p): ~0p~n", [Profile,
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_durable_storage | Backends]},
-    {env, []}
+    {env, [{available_backends, Backends}]}
 ]}.

+ 133 - 52
apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl

@@ -25,8 +25,6 @@
 -include("../../emqx/include/asserts.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(N_SHARDS, 1).
-
 opts(Config) ->
     proplists:get_value(ds_conf, Config).
 
@@ -55,33 +53,108 @@ t_01_smoke_store(Config) ->
     ).
 
 %% A simple smoke test that verifies that getting the list of streams
-%% doesn't crash and that iterators can be opened.
-t_02_smoke_get_streams_start_iter(Config) ->
-    DB = ?FUNCTION_NAME,
-    ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
-    StartTime = 0,
-    TopicFilter = ['#'],
-    [{Rank, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
-    ?assertMatch({_, _}, Rank),
-    ?assertMatch({ok, _Iter}, emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime)).
-
-%% A simple smoke test that verifies that it's possible to iterate
+%% doesn't crash, iterators can be opened, and that it's possible to iterate
 %% over messages.
-t_03_smoke_iterate(Config) ->
+t_02_smoke_iterate(Config) ->
     DB = ?FUNCTION_NAME,
     ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
     StartTime = 0,
     TopicFilter = ['#'],
     Msgs = [
         message(<<"foo/bar">>, <<"1">>, 0),
-        message(<<"foo">>, <<"2">>, 1),
-        message(<<"bar/bar">>, <<"3">>, 2)
+        message(<<"foo/bar">>, <<"2">>, 1),
+        message(<<"foo/bar">>, <<"3">>, 2)
     ],
     ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs, #{sync => true})),
+    timer:sleep(1000),
     [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
     {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
     {ok, Iter, Batch} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
-    ?assertEqual(Msgs, Batch, {Iter0, Iter}).
+    emqx_ds_test_helpers:diff_messages(Msgs, Batch).
+
+%% A simple smoke test that verifies that poll request is fulfilled
+%% immediately when the new data is present at the time of poll
+%% request.
+t_03_smoke_poll_immediate(Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
+            %% Store one message to create a stream:
+            Msgs1 = [message(<<"foo/bar">>, <<"0">>, 0)],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1, #{sync => true})),
+            timer:sleep(1_000),
+            %% Create the iterator:
+            StartTime = 0,
+            TopicFilter = ['#'],
+            [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+            {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
+            {ok, Iter1, Batch1} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
+            emqx_ds_test_helpers:diff_messages(Msgs1, Batch1),
+            %% Publish some messages:
+            Msgs2 = [
+                message(<<"foo/bar">>, <<"1">>, 0),
+                message(<<"foo/bar">>, <<"2">>, 1),
+                message(<<"foo/bar">>, <<"3">>, 2)
+            ],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs2, #{sync => true})),
+            timer:sleep(1_000),
+            %% Now poll the iterator:
+            UserData = ?FUNCTION_NAME,
+            {ok, Ref} = emqx_ds:poll(DB, [{UserData, Iter1}], #{timeout => 5_000}),
+            receive
+                #poll_reply{ref = Ref, userdata = UserData, payload = Payload} ->
+                    {ok, Iter, Batch2} = Payload,
+                    emqx_ds_test_helpers:diff_messages(Msgs2, Batch2),
+                    %% Now verify that the received iterator is valid:
+                    ?assertMatch({ok, _, []}, emqx_ds:next(DB, Iter, 10))
+            after 1_000 ->
+                error(no_poll_reply)
+            end
+        end,
+        []
+    ).
+
+%% A simple test that verifies that poll request is fulfilled after
+%% new data is added to the stream
+t_03_smoke_poll_new_data(Config) ->
+    DB = ?FUNCTION_NAME,
+    ?check_trace(
+        begin
+            ?assertMatch(ok, emqx_ds_open_db(DB, opts(Config))),
+            %% Store one message to create a stream:
+            Msgs1 = [message(<<"foo/bar">>, <<"0">>, 0)],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs1, #{sync => true})),
+            timer:sleep(1_000),
+            %% Create the iterator:
+            StartTime = 0,
+            TopicFilter = ['#'],
+            [{_, Stream}] = emqx_ds:get_streams(DB, TopicFilter, StartTime),
+            {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime),
+            {ok, Iter1, Batch1} = emqx_ds_test_helpers:consume_iter(DB, Iter0),
+            emqx_ds_test_helpers:diff_messages(Msgs1, Batch1),
+            %% Now poll the iterator:
+            UserData = ?FUNCTION_NAME,
+            {ok, Ref} = emqx_ds:poll(DB, [{UserData, Iter1}], #{timeout => 5_000}),
+            %% Publish some messages:
+            Msgs2 = [
+                message(<<"foo/bar">>, <<"1">>, 0),
+                message(<<"foo/bar">>, <<"2">>, 1),
+                message(<<"foo/bar">>, <<"3">>, 2)
+            ],
+            ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs2, #{sync => true})),
+            receive
+                #poll_reply{ref = Ref, userdata = UserData, payload = Payload} ->
+                    {ok, Iter, Batch2} = Payload,
+                    emqx_ds_test_helpers:diff_messages(Msgs2, Batch2),
+                    %% Now verify that the received iterator is valid:
+                    ?assertMatch({ok, _, []}, emqx_ds:next(DB, Iter, 10))
+            after 5_000 ->
+                error(no_poll_reply)
+            end
+        end,
+        []
+    ).
 
 %% Verify that iterators survive restart of the application. This is
 %% an important property, since the lifetime of the iterators is tied
@@ -233,6 +306,7 @@ t_08_smoke_list_drop_generation(Config) ->
     ok.
 
 t_09_atomic_store_batch(Config) ->
+    ct:pal("store batch ~p", [Config]),
     DB = ?FUNCTION_NAME,
     ?check_trace(
         begin
@@ -686,38 +760,39 @@ delete(DB, It0, Selector, BatchSize, Acc) ->
 %% CT callbacks
 
 all() ->
-    [{group, builtin_local}, {group, builtin_raft}].
+    [{group, Backend} || Backend <- backends()].
+
+exclude(emqx_ds_fdb_backend) ->
+    [
+        t_09_atomic_store_batch,
+        t_11_batch_preconditions,
+        t_12_batch_precondition_conflicts,
+        t_smoke_delete_next
+    ];
+exclude(_) ->
+    [].
 
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
+    [{Backend, TCs -- exclude(Backend)} || Backend <- backends()].
+
+init_per_group(emqx_fdb_ds, Config) ->
+    {skip, fixme};
+init_per_group(emqx_ds_builtin_raft, Config) ->
+    %% Raft backend is an odd one, as its main module is named
+    %% `emqx_ds_replication_layer' for historical reasons:
     [
-        {builtin_local, TCs},
-        {builtin_raft, TCs}
+        {backend, emqx_ds_replication_layer},
+        {ds_conf, emqx_ds_replication_layer:test_db_config(Config)}
+        | Config
+    ];
+init_per_group(Backend, Config) ->
+    [
+        {backend, Backend},
+        {ds_conf, Backend:test_db_config(Config)}
+        | Config
     ].
 
-init_per_group(builtin_local, Config) ->
-    Conf = #{
-        backend => builtin_local,
-        storage => {emqx_ds_storage_reference, #{}},
-        n_shards => ?N_SHARDS
-    },
-    [{ds_conf, Conf} | Config];
-init_per_group(builtin_raft, Config) ->
-    case emqx_ds_test_helpers:skip_if_norepl() of
-        false ->
-            Conf = #{
-                backend => builtin_raft,
-                storage => {emqx_ds_storage_reference, #{}},
-                n_shards => ?N_SHARDS,
-                n_sites => 1,
-                replication_factor => 3,
-                replication_options => #{}
-            },
-            [{ds_conf, Conf} | Config];
-        Yes ->
-            Yes
-    end.
-
 end_per_group(_Group, Config) ->
     Config.
 
@@ -731,17 +806,18 @@ suite() ->
     [{timetrap, 50_000}].
 
 init_per_testcase(TC, Config) ->
-    Apps = emqx_cth_suite:start(
-        [emqx_durable_storage, emqx_ds_backends],
-        #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
-    ),
-    ct:pal("Apps: ~p", [Apps]),
-    [{apps, Apps} | Config].
+    Backend = proplists:get_value(backend, Config),
+    Apps = emqx_cth_suite:start(Backend:test_applications(Config), #{
+        work_dir => emqx_cth_suite:work_dir(TC, Config)
+    }),
+    ct:pal("Started apps: ~p", [Apps]),
+    timer:sleep(1000),
+    Config ++ [{apps, Apps}].
 
 end_per_testcase(TC, Config) ->
-    ok = emqx_ds:drop_db(TC),
-    ok = emqx_cth_suite:stop(?config(apps, Config)),
-    _ = mnesia:delete_schema([node()]),
+    catch ok = emqx_ds:drop_db(TC),
+    catch ok = emqx_cth_suite:stop(?config(apps, Config)),
+    catch mnesia:delete_schema([node()]),
     snabbkaffe:stop(),
     ok.
 
@@ -750,3 +826,8 @@ emqx_ds_open_db(X1, X2) ->
         ok -> timer:sleep(1000);
         Other -> Other
     end.
+
+backends() ->
+    application:load(emqx_ds_backends),
+    {ok, L} = application:get_env(emqx_ds_backends, available_backends),
+    L.

+ 1 - 1
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.app.src

@@ -2,7 +2,7 @@
 {application, emqx_ds_builtin_local, [
     {description, "A DS backend that stores all data locally and thus doesn't support clustering."},
     % strict semver, bump manually!
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, gproc, mria, rocksdb, emqx_durable_storage, emqx_utils]},

+ 25 - 0
apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl

@@ -60,6 +60,10 @@
     make_batch/3
 ]).
 
+-ifdef(TEST).
+-export([test_applications/1, test_db_config/1]).
+-endif.
+
 -export_type([db_opts/0, shard/0, iterator/0, delete_iterator/0]).
 
 -include_lib("emqx_utils/include/emqx_message.hrl").
@@ -520,3 +524,24 @@ timeus_to_timestamp(undefined) ->
     undefined;
 timeus_to_timestamp(TimestampUs) ->
     TimestampUs div 1000.
+
+%%================================================================================
+%% Common test options
+%%================================================================================
+
+-ifdef(TEST).
+
+test_applications(_Config) ->
+    [
+        emqx_durable_storage,
+        emqx_ds_backends
+    ].
+
+test_db_config(_Config) ->
+    #{
+        backend => builtin_local,
+        storage => {emqx_ds_storage_reference, #{}},
+        n_shards => 1
+    }.
+
+-endif.

+ 1 - 1
apps/emqx_ds_builtin_raft/src/emqx_ds_builtin_raft.app.src

@@ -2,7 +2,7 @@
 {application, emqx_ds_builtin_raft, [
     {description, "Raft replication layer for the durable storage"},
     % strict semver, bump manually!
-    {vsn, "0.2.0"},
+    {vsn, "0.2.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, gproc, mria, ra, emqx_durable_storage]},

+ 24 - 0
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer.erl

@@ -78,6 +78,10 @@
     ]
 ).
 
+-ifdef(TEST).
+-export([test_applications/1, test_db_config/1]).
+-endif.
+
 -export_type([
     shard_id/0,
     builtin_db_opts/0,
@@ -1202,3 +1206,23 @@ clientid_size(ClientID) when is_binary(ClientID) ->
     byte_size(ClientID);
 clientid_size(ClientID) ->
     erlang:external_size(ClientID).
+
+-ifdef(TEST).
+
+test_db_config(_Config) ->
+    #{
+        backend => builtin_raft,
+        storage => {emqx_ds_storage_reference, #{}},
+        n_shards => 1,
+        n_sites => 1,
+        replication_factor => 3,
+        replication_options => #{}
+    }.
+
+test_applications(_Config) ->
+    [
+        emqx_durable_storage,
+        emqx_ds_backends
+    ].
+
+-endif.

+ 3 - 0
apps/emqx_durable_storage/include/emqx_ds_metrics.hrl

@@ -68,4 +68,7 @@
 %% by a single beam:
 -define(DS_POLL_REQUEST_SHARING, emqx_ds_poll_request_sharing).
 
+-define(DS_POLL_WAITING_QUEUE_LEN, emqx_ds_poll_waitq_len).
+-define(DS_POLL_PENDING_QUEUE_LEN, emqx_ds_poll_pendingq_len).
+
 -endif.

+ 9 - 7
apps/emqx_durable_storage/src/emqx_ds_beamformer.erl

@@ -218,9 +218,6 @@
 -spec poll(node(), return_addr(_ItKey), _Shard, _Iterator, emqx_ds:poll_opts()) ->
     ok.
 poll(Node, ReturnAddr, Shard, Iterator, Opts = #{timeout := Timeout}) ->
-    ?tp(emqx_ds_beamformer_poll, #{
-        node => Node, return_addr => ReturnAddr, shard => Shard, it => Iterator, timeout => Timeout
-    }),
     CBM = emqx_ds_beamformer_sup:cbm(Shard),
     #{
         stream := Stream,
@@ -230,8 +227,8 @@ poll(Node, ReturnAddr, Shard, Iterator, Opts = #{timeout := Timeout}) ->
         message_matcher := MsgMatcher
     } = CBM:unpack_iterator(Shard, Iterator),
     Deadline = erlang:monotonic_time(millisecond) + Timeout,
-    logger:debug(#{
-        msg => poll, shard => Shard, key => DSKey, timeout => Timeout, deadline => Deadline
+    ?tp(beamformer_poll, #{
+        shard => Shard, key => DSKey, timeout => Timeout, deadline => Deadline
     }),
     %% Try to maximize likelyhood of sending similar iterators to the
     %% same worker:
@@ -322,8 +319,13 @@ handle_call(
     _From,
     S = #s{pending_queue = PendingTab, wait_queue = WaitingTab, metrics_id = Metrics}
 ) ->
-    NQueued = ets:info(PendingTab, size) + ets:info(WaitingTab, size),
-    case NQueued >= S#s.pending_request_limit of
+    %% FIXME
+    %% this is a potentially costly operation
+    PQLen = ets:info(PendingTab, size),
+    WQLen = ets:info(WaitingTab, size),
+    emqx_ds_builtin_metrics:set_pendingq_len(Metrics, PQLen),
+    emqx_ds_builtin_metrics:set_waitq_len(Metrics, WQLen),
+    case PQLen + WQLen >= S#s.pending_request_limit of
         true ->
             emqx_ds_builtin_metrics:inc_poll_requests_dropped(Metrics, 1),
             Reply = {error, recoverable, too_many_requests},

+ 11 - 1
apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl

@@ -35,6 +35,8 @@
     observe_next_time/2,
 
     observe_sharing/2,
+    set_waitq_len/2,
+    set_pendingq_len/2,
     inc_poll_requests/2,
     inc_poll_requests_fulfilled/2,
     inc_poll_requests_dropped/2,
@@ -97,7 +99,9 @@
     {counter, ?DS_POLL_REQUESTS_FULFILLED},
     {counter, ?DS_POLL_REQUESTS_DROPPED},
     {counter, ?DS_POLL_REQUESTS_EXPIRED},
-    {slide, ?DS_POLL_REQUEST_SHARING}
+    {slide, ?DS_POLL_REQUEST_SHARING},
+    {counter, ?DS_POLL_PENDING_QUEUE_LEN},
+    {counter, ?DS_POLL_WAITING_QUEUE_LEN}
 ]).
 
 -define(SHARD_METRICS, ?BEAMFORMER_METRICS ++ ?BUFFER_METRICS).
@@ -174,6 +178,12 @@ observe_next_time(DB, NextTime) ->
 observe_sharing(Id, Sharing) ->
     catch emqx_metrics_worker:observe(?WORKER, Id, ?DS_POLL_REQUEST_SHARING, Sharing).
 
+set_waitq_len(Id, Len) ->
+    emqx_metrics_worker:set(?WORKER, Id, ?DS_POLL_WAITING_QUEUE_LEN, Len).
+
+set_pendingq_len(Id, Len) ->
+    emqx_metrics_worker:set(?WORKER, Id, ?DS_POLL_PENDING_QUEUE_LEN, Len).
+
 inc_poll_requests(Id, NPolls) ->
     catch emqx_metrics_worker:inc(?WORKER, Id, ?DS_POLL_REQUESTS, NPolls).
 

+ 2 - 0
apps/emqx_durable_storage/src/emqx_ds_msg_serializer.erl

@@ -29,6 +29,8 @@
 
 -include_lib("emqx_utils/include/emqx_message.hrl").
 -include_lib("typerefl/include/types.hrl").
+
+-elvis([{elvis_style, atom_naming_convention, disable}]).
 -include("../gen_src/DurableMessage.hrl").
 
 -ifdef(TEST).

+ 1 - 1
apps/emqx_durable_storage/src/emqx_durable_storage.app.src

@@ -2,7 +2,7 @@
 {application, emqx_durable_storage, [
     {description, "Message persistence and subscription replays for EMQX"},
     % strict semver, bump manually!
-    {vsn, "0.4.0"},
+    {vsn, "0.4.1"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils, gen_rpc]},

+ 1 - 1
apps/emqx_machine/src/emqx_machine.app.src

@@ -3,7 +3,7 @@
     {id, "emqx_machine"},
     {description, "The EMQX Machine"},
     % strict semver, bump manually!
-    {vsn, "0.3.5"},
+    {vsn, "0.3.6"},
     {modules, []},
     {registered, []},
     {applications, [kernel, stdlib, emqx_ctl, redbug]},

+ 4 - 2
apps/emqx_machine/src/emqx_machine_boot.erl

@@ -191,10 +191,12 @@ runtime_deps() ->
         %% apps, we may apply the same tactic for `emqx_connector' and inject individual bridges
         %% as its dependencies.
         {emqx_connector, fun(App) -> lists:prefix("emqx_bridge_", atom_to_list(App)) end},
-        %% emqx_fdb is an EE app
-        {emqx_durable_storage, emqx_fdb},
+        %% emqx_fdb_ds is an EE app
+        {emqx_durable_storage, emqx_fdb_ds},
         %% emqx_ds_builtin is an EE app
         {emqx_ds_backends, emqx_ds_builtin_raft},
+        %% emqx_ds_fdb_backend is an EE app
+        {emqx_ds_backends, emqx_ds_fdb_backend},
         {emqx_dashboard, emqx_license}
     ].
 

+ 1 - 1
apps/emqx_prometheus/src/emqx_prometheus.app.src

@@ -2,7 +2,7 @@
 {application, emqx_prometheus, [
     {description, "Prometheus for EMQX"},
     % strict semver, bump manually!
-    {vsn, "5.2.5"},
+    {vsn, "5.2.6"},
     {modules, []},
     {registered, [emqx_prometheus_sup]},
     {applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},

+ 3 - 1
apps/emqx_prometheus/src/emqx_prometheus.erl

@@ -530,7 +530,9 @@ emqx_collect(K = ?DS_POLL_REQUESTS, D) -> counter_metrics(?MG(K, D, []));
 emqx_collect(K = ?DS_POLL_REQUESTS_FULFILLED, D) -> counter_metrics(?MG(K, D, []));
 emqx_collect(K = ?DS_POLL_REQUESTS_DROPPED, D) -> counter_metrics(?MG(K, D, []));
 emqx_collect(K = ?DS_POLL_REQUESTS_EXPIRED, D) -> counter_metrics(?MG(K, D, []));
-emqx_collect(K = ?DS_POLL_REQUEST_SHARING, D) -> gauge_metrics(?MG(K, D, [])).
+emqx_collect(K = ?DS_POLL_REQUEST_SHARING, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_POLL_WAITING_QUEUE_LEN, D) -> gauge_metrics(?MG(K, D, []));
+emqx_collect(K = ?DS_POLL_PENDING_QUEUE_LEN, D) -> gauge_metrics(?MG(K, D, [])).
 
 %%--------------------------------------------------------------------
 %% Indicators

+ 6 - 0
apps/emqx_utils/src/emqx_metrics_worker.erl

@@ -33,6 +33,7 @@
     inc/4,
     observe/4,
     get/3,
+    set/4,
     get_gauge/3,
     set_gauge/5,
     shift_gauge/5,
@@ -242,6 +243,11 @@ inc(Name, Id, Metric) ->
 inc(Name, Id, Metric, Val) ->
     counters:add(get_ref(Name, Id), idx_metric(Name, Id, counter, Metric), Val).
 
+%% Set value of counter explicitly, so it can behave as a gauge.
+-spec set(handler_name(), metric_id(), metric_name(), integer()) -> ok.
+set(Name, Id, Metric, Val) ->
+    counters:put(get_ref(Name, Id), idx_metric(Name, Id, counter, Metric), Val).
+
 %% Add a sample to the slide.
 %%
 %% Slide is short for "sliding window average" type of metric.

+ 1 - 1
apps/emqx_utils/src/emqx_utils.app.src

@@ -2,7 +2,7 @@
 {application, emqx_utils, [
     {description, "Miscellaneous utilities for EMQX apps"},
     % strict semver, bump manually!
-    {vsn, "5.4.0"},
+    {vsn, "5.4.1"},
     {modules, [
         emqx_utils,
         emqx_utils_api,

+ 31 - 2
mix.exs

@@ -346,7 +346,7 @@ defmodule EMQXUmbrella.MixProject do
   end
 
   # need to remove those when listing `/apps/`...
-  defp enterprise_umbrella_apps(_release_type) do
+  defp enterprise_umbrella_apps(:standard) do
     MapSet.new([
       :emqx_connector_aggregator,
       :emqx_bridge_kafka,
@@ -405,6 +405,19 @@ defmodule EMQXUmbrella.MixProject do
     ])
   end
 
+  defp enterprise_umbrella_apps(:platform) do
+    MapSet.union(
+      enterprise_umbrella_apps(:standard),
+      MapSet.new([
+        :emqx_fdb_ds,
+        :emqx_fdb_cli,
+        :emqx_fdb_management,
+        :emqx_event_history,
+        :emqx_ds_fdb_backend
+      ])
+    )
+  end
+
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
     [
       {:hstreamdb_erl,
@@ -696,7 +709,23 @@ defmodule EMQXUmbrella.MixProject do
     end)
   end
 
-  defp excluded_apps(_release_type) do
+  defp excluded_apps(:standard) do
+    %{
+      mnesia_rocksdb: enable_rocksdb?(),
+      quicer: enable_quicer?(),
+      jq: enable_jq?(),
+      observer: is_app?(:observer),
+      emqx_fdb_ds: false,
+      emqx_fdb_cli: false,
+      emqx_fdb_management: false,
+      emqx_event_history: false,
+      emqx_ds_fdb_backend: false
+    }
+    |> Enum.reject(&elem(&1, 1))
+    |> Enum.map(&elem(&1, 0))
+  end
+
+  defp excluded_apps(:platform) do
     %{
       mnesia_rocksdb: enable_rocksdb?(),
       quicer: enable_quicer?(),

+ 15 - 1
rebar.config.erl

@@ -130,6 +130,7 @@ is_community_umbrella_app("apps/emqx_cluster_link") -> false;
 is_community_umbrella_app("apps/emqx_ds_builtin_raft") -> false;
 is_community_umbrella_app("apps/emqx_auth_kerberos") -> false;
 is_community_umbrella_app("apps/emqx_auth_cinfo") -> false;
+is_community_umbrella_app("apps/emqx_ds_fdb_backend") -> false;
 is_community_umbrella_app(_) -> true.
 
 %% BUILD_WITHOUT_JQ
@@ -447,7 +448,20 @@ relx_apps(ReleaseType, Edition) ->
             [{App, load} || App <- BusinessApps, not lists:member(App, ExcludedApps)]),
     Apps.
 
-excluded_apps(_RelType) ->
+excluded_apps(standard) ->
+    OptionalApps = [
+        {quicer, is_quicer_supported()},
+        {jq, is_jq_supported()},
+        {observer, is_app(observer)},
+        {mnesia_rocksdb, is_rocksdb_supported()},
+        {emqx_fdb_ds, false},
+        {emqx_ds_fdb_backend, false},
+        {emqx_fdb_cli, false},
+        {emqx_fdb_management, false},
+        {emqx_event_history, false}
+    ],
+    [App || {App, false} <- OptionalApps];
+excluded_apps(platform) ->
     OptionalApps = [
         {quicer, is_quicer_supported()},
         {jq, is_jq_supported()},

+ 9 - 0
scripts/ct/run.sh

@@ -129,6 +129,15 @@ if [ -z "${PROFILE+x}" ]; then
         apps/emqx_rule_engine)
             export PROFILE='emqx-enterprise'
             ;;
+        apps/emqx_fdb*)
+            export PROFILE='emqx-platform'
+            ;;
+        apps/emqx_ds_fdb_backend)
+            export PROFILE='emqx-platform'
+            ;;
+        apps/emqx_event_history)
+            export PROFILE='emqx-platform'
+            ;;
         apps/*)
             if [[ -f "${WHICH_APP}/BSL.txt" ]]; then
                 export PROFILE='emqx-enterprise'