Explorar el Código

feat(mgmt): allow to manage any DS database through API / CLI

Andrew Mayorov hace 1 año
padre
commit
e747604420

+ 6 - 1
apps/emqx_ds_builtin_raft/src/emqx_ds_replication_layer_meta.erl

@@ -33,7 +33,8 @@
     open_db/2,
     db_config/1,
     update_db_config/2,
-    drop_db/1
+    drop_db/1,
+    dbs/0
 ]).
 
 %% Site / shard allocation:
@@ -373,6 +374,10 @@ update_db_config(DB, DefaultOpts) ->
 drop_db(DB) ->
     transaction(fun ?MODULE:drop_db_trans/1, [DB]).
 
+-spec dbs() -> [emqx_ds:db()].
+dbs() ->
+    mnesia:dirty_all_keys(?META_TAB).
+
 %%===============================================================================
 %% Site / shard allocation API
 %%===============================================================================

+ 62 - 67
apps/emqx_management/src/emqx_mgmt_api_ds.erl

@@ -46,6 +46,11 @@
     fields/1
 ]).
 
+-export([
+    check_enabled/2,
+    check_db_exists/2
+]).
+
 -include_lib("emqx/include/logger.hrl").
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
@@ -66,7 +71,13 @@ namespace() ->
     undefined.
 
 api_spec() ->
-    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+    emqx_dashboard_swagger:spec(?MODULE, #{
+        check_schema => true,
+        filter => emqx_dashboard_swagger:compose_filters(
+            fun ?MODULE:check_enabled/2,
+            fun ?MODULE:check_db_exists/2
+        )
+    }).
 
 paths() ->
     [
@@ -300,16 +311,25 @@ fields(db_site) ->
 %% Internal exports
 %%================================================================================
 
-list_sites(get, _Params) ->
+check_enabled(Request, _ReqMeta) ->
     case is_enabled() of
-        true ->
-            {200, emqx_ds_replication_layer_meta:sites()};
-        false ->
-            err_disabled()
+        true -> {ok, Request};
+        false -> ?NOT_FOUND(<<"Durable storage is disabled">>)
     end.
 
+check_db_exists(Request = #{bindings := #{ds := DB}}, _ReqMeta) ->
+    case db_config(DB) of
+        #{} -> {ok, Request};
+        undefined -> ?NOT_FOUND(emqx_utils:format("DB '~p' does not exist", [DB]))
+    end;
+check_db_exists(Request, _ReqMeta) ->
+    {ok, Request}.
+
+list_sites(get, _Params) ->
+    {200, emqx_ds_replication_layer_meta:sites()}.
+
 get_site(get, #{bindings := #{site := Site}}) ->
-    case is_enabled() andalso lists:member(Site, emqx_ds_replication_layer_meta:sites()) of
+    case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of
         false ->
             ?NOT_FOUND(<<"Site not found: ", Site/binary>>);
         true ->
@@ -324,70 +344,40 @@ get_site(get, #{bindings := #{site := Site}}) ->
     end.
 
 list_dbs(get, _Params) ->
-    case is_enabled() of
-        true ->
-            ?OK(dbs());
-        false ->
-            err_disabled()
-    end.
+    ?OK(dbs()).
 
 get_db(get, #{bindings := #{ds := DB}}) ->
-    case is_enabled() of
-        true ->
-            ?OK(#{
-                name => DB,
-                shards => list_shards(DB)
-            });
-        false ->
-            err_disabled()
-    end.
+    ?OK(#{
+        name => DB,
+        shards => list_shards(DB)
+    }).
 
 db_replicas(get, #{bindings := #{ds := DB}}) ->
-    case is_enabled() of
-        true ->
-            Replicas = emqx_ds_replication_layer_meta:db_sites(DB),
-            ?OK(Replicas);
-        false ->
-            err_disabled()
-    end;
+    Replicas = emqx_ds_replication_layer_meta:db_sites(DB),
+    ?OK(Replicas);
 db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
-    case is_enabled() of
-        true ->
-            case update_db_sites(DB, Sites, rest) of
-                {ok, _} ->
-                    {202, <<"OK">>};
-                {error, Description} ->
-                    ?BAD_REQUEST(400, Description)
-            end;
-        false ->
-            err_disabled()
+    case update_db_sites(DB, Sites, rest) of
+        {ok, _} ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
     end.
 
 db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
-    case is_enabled() of
-        true ->
-            case join(DB, Site, rest) of
-                {ok, _} ->
-                    {202, <<"OK">>};
-                {error, Description} ->
-                    ?BAD_REQUEST(400, Description)
-            end;
-        false ->
-            err_disabled()
+    case join(DB, Site, rest) of
+        {ok, _} ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
     end;
 db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
-    case is_enabled() of
-        true ->
-            case leave(DB, Site, rest) of
-                {ok, Sites} when is_list(Sites) ->
-                    {202, <<"OK">>};
-                {ok, unchanged} ->
-                    ?NOT_FOUND(<<"Site is not part of replica set">>);
-                {error, Description} ->
-                    ?BAD_REQUEST(400, Description)
-            end;
-        false ->
-            err_disabled()
+    case leave(DB, Site, rest) of
+        {ok, Sites} when is_list(Sites) ->
+            {202, <<"OK">>};
+        {ok, unchanged} ->
+            ?NOT_FOUND(<<"Site is not part of replica set">>);
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
     end.
 
 -spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
@@ -456,7 +446,7 @@ param_storage_id() ->
         desc => <<"Durable storage ID">>,
         example => ?PERSISTENT_MESSAGE_DB
     },
-    {ds, mk(enum(dbs()), Info)}.
+    {ds, mk(atom(), Info)}.
 
 example_site() ->
     try
@@ -467,7 +457,15 @@ example_site() ->
     end.
 
 dbs() ->
-    [?PERSISTENT_MESSAGE_DB].
+    [DB || DB <- emqx_ds_replication_layer_meta:dbs(), db_config(DB) =/= undefined].
+
+db_config(DB) ->
+    case emqx_ds_replication_layer_meta:db_config(DB) of
+        Config = #{backend := builtin_raft} ->
+            Config;
+        _ ->
+            undefined
+    end.
 
 shards_of_site(Site) ->
     lists:flatmap(
@@ -536,9 +534,6 @@ meta_result_to_binary({error, Err}) ->
     {error, iolist_to_binary(IOList)}.
 
 is_enabled() ->
-    emqx_persistent_message:is_persistence_enabled().
-
-err_disabled() ->
-    ?NOT_FOUND(<<"Durable storage is disabled">>).
+    [] =/= dbs().
 
 -endif.

+ 8 - 8
apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl

@@ -97,11 +97,11 @@ t_get_site(_) ->
     ).
 
 t_get_db(_) ->
-    %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
-    Path400 = api_path(["ds", "storages", "unknown_ds"]),
+    %% Unknown DBs must result in error 404:
+    Path404 = api_path(["ds", "storages", "unknown_ds"]),
     ?assertMatch(
-        {error, {_, 400, _}},
-        request_api(get, Path400)
+        {error, {_, 404, _}},
+        request_api(get, Path404)
     ),
     %% Valid path:
     Path = api_path(["ds", "storages", "messages"]),
@@ -130,11 +130,11 @@ t_get_db(_) ->
     ).
 
 t_get_replicas(_) ->
-    %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
-    Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]),
+    %% Unknown DBs must result in error 404:
+    Path404 = api_path(["ds", "storages", "unknown_ds", "replicas"]),
     ?assertMatch(
-        {error, {_, 400, _}},
-        request_api(get, Path400)
+        {error, {_, 404, _}},
+        request_api(get, Path404)
     ),
     %% Valid path:
     Path = api_path(["ds", "storages", "messages", "replicas"]),