소스 검색

feat(ds): Add REST API for durable storage

ieQu1 1 년 전
부모
커밋
a62db08676

+ 21 - 1
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -29,7 +29,9 @@
 -export([
     shards/1,
     my_shards/1,
+    shard_info/2,
     allocate_shards/1,
+    replica_set/2,
     sites/0,
     node/1,
     this_site/0,
@@ -52,7 +54,6 @@
     replica_set_transitions/2,
     update_replica_set/3,
     db_sites/1,
-    replica_set/2,
     target_set/2
 ]).
 
@@ -188,6 +189,25 @@ shards(DB) ->
     Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
     [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
 
+-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    #{replica_set := #{site() => #{status => up | joining}}}
+    | undefined.
+shard_info(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [] ->
+            undefined;
+        [#?SHARD_TAB{replica_set = Replicas}] ->
+            ReplicaSet = maps:from_list([
+                begin
+                    %% TODO:
+                    ReplInfo = #{status => up},
+                    {I, ReplInfo}
+                end
+             || I <- Replicas
+            ]),
+            #{replica_set => ReplicaSet}
+    end.
+
 -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 my_shards(DB) ->
     Site = this_site(),

+ 490 - 0
apps/emqx_management/src/emqx_mgmt_api_ds.erl

@@ -0,0 +1,490 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_ds).
+
+-behaviour(minirest_api).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
+
+-import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
+
+%% API:
+-export([
+    list_sites/2,
+    get_site/2,
+    list_dbs/2,
+    get_db/2,
+    db_replicas/2,
+    db_replica/2,
+
+    update_db_sites/3,
+    join/3,
+    leave/3
+]).
+
+%% behavior callbacks:
+-export([
+    namespace/0,
+    api_spec/0,
+    schema/1,
+    paths/0,
+    fields/1
+]).
+
+%% internal exports:
+-export([]).
+
+-export_type([]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(TAGS, [<<"Durable storage">>]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+namespace() ->
+    undefined.
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/ds/sites",
+        "/ds/sites/:site",
+        "/ds/storages",
+        "/ds/storages/:ds",
+        "/ds/storages/:ds/replicas",
+        "/ds/storages/:ds/replicas/:site"
+    ].
+
+schema("/ds/sites") ->
+    #{
+        'operationId' => list_sites,
+        get =>
+            #{
+                description => <<"List sites">>,
+                tags => ?TAGS,
+                responses =>
+                    #{
+                        200 => mk(array(binary()), #{desc => <<"List sites">>})
+                    }
+            }
+    };
+schema("/ds/sites/:site") ->
+    #{
+        'operationId' => get_site,
+        get =>
+            #{
+                description => <<"Get sites">>,
+                parameters => [param_site_id()],
+                tags => ?TAGS,
+                responses =>
+                    #{
+                        200 => mk(ref(site), #{desc => <<"Get information about the site">>}),
+                        404 => not_found(<<"Site">>)
+                    }
+            }
+    };
+schema("/ds/storages") ->
+    #{
+        'operationId' => list_dbs,
+        get =>
+            #{
+                description => <<"List durable storages">>,
+                tags => ?TAGS,
+                responses =>
+                    #{
+                        200 => mk(array(atom()), #{desc => <<"List durable storages">>})
+                    }
+            }
+    };
+schema("/ds/storages/:ds") ->
+    #{
+        'operationId' => get_db,
+        get =>
+            #{
+                description => <<"Get durable storage">>,
+                tags => ?TAGS,
+                parameters => [param_storage_id()],
+                responses =>
+                    #{
+                        200 => mk(ref(db), #{desc => <<"Get information about a durable storage">>}),
+                        400 => not_found(<<"Durable storage">>)
+                    }
+            }
+    };
+schema("/ds/storages/:ds/replicas") ->
+    Parameters = [param_storage_id()],
+    #{
+        'operationId' => db_replicas,
+        get =>
+            #{
+                description => <<"List replicas of the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        200 => mk(array(binary()), #{
+                            desc => <<"List sites that contain replicas of the durable storage">>
+                        }),
+                        400 => not_found(<<"Durable storage">>)
+                    }
+            },
+        put =>
+            #{
+                description => <<"Update replicas of the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        202 => mk(array(binary()), #{}),
+                        400 => bad_request()
+                    },
+                'requestBody' => mk(array(binary()), #{desc => <<"New list of sites">>})
+            }
+    };
+schema("/ds/storages/:ds/replicas/:site") ->
+    Parameters = [param_storage_id(), param_site_id()],
+    #{
+        'operationId' => db_replica,
+        put =>
+            #{
+                description => <<"Add site as a replica for the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        202 => <<"OK">>,
+                        400 => bad_request(),
+                        404 => not_found(<<"Object">>)
+                    }
+            },
+        delete =>
+            #{
+                description => <<"Remove site as a replica for the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        202 => <<"OK">>,
+                        400 => bad_request(),
+                        404 => not_found(<<"Object">>)
+                    }
+            }
+    }.
+
+fields(site) ->
+    [
+        {node,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Name of the EMQX handling the site">>,
+                    example => <<"'emqx@example.com'">>
+                }
+            )},
+        {up,
+            mk(
+                boolean(),
+                #{desc => <<"Site is up and running">>}
+            )},
+        {shards,
+            mk(
+                array(ref(sites_shard)),
+                #{desc => <<"Durable storages that have replicas at the site">>}
+            )}
+    ];
+fields(sites_shard) ->
+    [
+        {storage,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Durable storage ID">>,
+                    example => 'emqx_persistent_message'
+                }
+            )},
+        {id,
+            mk(
+                binary(),
+                #{
+                    desc => <<"Shard ID">>,
+                    example => <<"1">>
+                }
+            )},
+        {status,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Shard status">>,
+                    example => up
+                }
+            )}
+    ];
+fields(db) ->
+    [
+        {name,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Name of the durable storage">>,
+                    example => 'emqx_persistent_message'
+                }
+            )},
+        {shards,
+            mk(
+                array(ref(db_shard)),
+                #{desc => <<"List of storage shards">>}
+            )}
+    ];
+fields(db_shard) ->
+    [
+        {id,
+            mk(
+                binary(),
+                #{
+                    desc => <<"Shard ID">>,
+                    example => <<"1">>
+                }
+            )},
+        {replicas,
+            mk(
+                hoconsc:array(ref(db_site)),
+                #{desc => <<"List of sites containing replicas of the storage">>}
+            )}
+    ];
+fields(db_site) ->
+    [
+        {site,
+            mk(
+                binary(),
+                #{
+                    desc => <<"Site ID">>,
+                    example => example_site()
+                }
+            )},
+        {status,
+            mk(
+                enum([up, joining]),
+                #{desc => <<"Status of the replica">>}
+            )}
+    ].
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+list_sites(get, _Params) ->
+    {200, emqx_ds_replication_layer_meta:sites()}.
+
+get_site(get, #{bindings := #{site := Site}}) ->
+    case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of
+        false ->
+            ?NOT_FOUND(<<"Site not found: ", Site/binary>>);
+        true ->
+            Node = emqx_ds_replication_layer_meta:node(Site),
+            IsUp = lists:member(Node, [node() | nodes()]),
+            Shards = shards_of_site(Site),
+            ?OK(#{
+                node => Node,
+                up => IsUp,
+                shards => Shards
+            })
+    end.
+
+list_dbs(get, _Params) ->
+    ?OK(dbs()).
+
+get_db(get, #{bindings := #{ds := DB}}) ->
+    ?OK(#{
+        name => DB,
+        shards => list_shards(DB)
+    }).
+
+db_replicas(get, #{bindings := #{ds := DB}}) ->
+    Replicas = lists:flatmap(
+        fun(Shard) ->
+            #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
+            maps:keys(RS)
+        end,
+        emqx_ds_replication_layer_meta:shards(DB)
+    ),
+    ?OK(lists:usort(Replicas));
+db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
+    case update_db_sites(DB, Sites, rest) of
+        ok ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
+    end.
+
+db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
+    case join(DB, Site, rest) of
+        ok ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
+    end;
+db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
+    case leave(DB, Site, rest) of
+        ok ->
+            {202, <<"OK">>}
+        %% {error, Description} ->
+        %%     ?BAD_REQUEST(400, Description)
+    end.
+
+-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
+    ok | {error, binary()}.
+update_db_sites(DB, Sites, Via) when is_list(Sites) ->
+    UnknownSites = Sites -- emqx_ds_replication_layer_meta:sites(),
+    case {UnknownSites, Sites} of
+        {[], [_ | _]} ->
+            ?SLOG(warning, #{
+                msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
+            }),
+            %% TODO: Do stuff
+            ok;
+        {_, []} ->
+            {error, <<"Empty replica list">>};
+        {UnknownSites, _} ->
+            Message = io_lib:format(
+                "Unknown sites: ~p",
+                [lists:map(fun binary_to_list/1, UnknownSites)]
+            ),
+            {error, iolist_to_binary(Message)}
+    end;
+update_db_sites(_, _, _) ->
+    {error, <<"Bad type">>}.
+
+-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
+join(DB, Site, Via) ->
+    case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of
+        true ->
+            ?SLOG(warning, #{
+                msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
+            }),
+            %% TODO: Do stuff
+            ok;
+        false ->
+            Message = io_lib:format("Unknown site: ~s", [Site]),
+            {error, iolist_to_binary(Message)}
+    end.
+
+-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
+leave(DB, Site, Via) ->
+    %% TODO: Do stuff
+    ?SLOG(warning, #{
+        msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via
+    }),
+    ok.
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+%% site_info(Site) ->
+%%     #{}.
+
+not_found(What) ->
+    emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<What/binary, " not found">>).
+
+bad_request() ->
+    emqx_dashboard_swagger:error_codes(['BAD_REQUEST'], <<"Bad request">>).
+
+param_site_id() ->
+    Info = #{
+        required => true,
+        in => path,
+        desc => <<"Site ID">>,
+        example => example_site()
+    },
+    {site, mk(binary(), Info)}.
+
+param_storage_id() ->
+    Info = #{
+        required => true,
+        in => path,
+        desc => <<"Durable storage ID">>,
+        example => emqx_persistent_message
+    },
+    {ds, mk(enum(dbs()), Info)}.
+
+example_site() ->
+    try
+        emqx_ds_replication_layer_meta:this_site()
+    catch
+        _:_ ->
+            <<"AFA18CB1C22F0157">>
+    end.
+
+dbs() ->
+    [emqx_persistent_message].
+
+shards_of_site(Site) ->
+    lists:flatmap(
+        fun({DB, Shard}) ->
+            case emqx_ds_replication_layer_meta:shard_info(DB, Shard) of
+                #{replica_set := #{Site := Info}} ->
+                    [
+                        #{
+                            storage => DB,
+                            id => Shard,
+                            status => maps:get(status, Info)
+                        }
+                    ];
+                _ ->
+                    []
+            end
+        end,
+        [
+            {DB, Shard}
+         || DB <- dbs(),
+            Shard <- emqx_ds_replication_layer_meta:shards(DB)
+        ]
+    ).
+
+list_shards(DB) ->
+    [
+        begin
+            #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
+            Replicas = maps:fold(
+                fun(Site, #{status := Status}, Acc) ->
+                    [
+                        #{
+                            site => Site,
+                            status => Status
+                        }
+                        | Acc
+                    ]
+                end,
+                [],
+                RS
+            ),
+            #{
+                id => Shard,
+                replicas => Replicas
+            }
+        end
+     || Shard <- emqx_ds_replication_layer_meta:shards(DB)
+    ].

+ 180 - 0
apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl

@@ -0,0 +1,180 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_ds_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(emqx_mgmt_api_test_util, [api_path/1, request_api/2, request_api_with_body/3]).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, "session_persistence.enable = true"},
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _} = emqx_common_test_http:create_default_app(),
+    [{suite_apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(_, Config) ->
+    Config.
+
+t_get_sites(_) ->
+    Path = api_path(["ds", "sites"]),
+    {ok, Response} = request_api(get, Path),
+    ?assertEqual(
+        [emqx_ds_replication_layer_meta:this_site()],
+        emqx_utils_json:decode(Response, [return_maps])
+    ).
+
+t_get_storages(_) ->
+    Path = api_path(["ds", "storages"]),
+    {ok, Response} = request_api(get, Path),
+    ?assertEqual(
+        [<<"emqx_persistent_message">>],
+        emqx_utils_json:decode(Response, [return_maps])
+    ).
+
+t_get_site(_) ->
+    %% Unknown sites must result in error 404:
+    Path404 = api_path(["ds", "sites", "unknown_site"]),
+    ?assertMatch(
+        {error, {_, 404, _}},
+        request_api(get, Path404)
+    ),
+    %% Valid path:
+    Path = api_path(["ds", "sites", emqx_ds_replication_layer_meta:this_site()]),
+    {ok, Response} = request_api(get, Path),
+    ThisNode = atom_to_binary(node()),
+    ?assertMatch(
+        #{
+            <<"node">> := ThisNode,
+            <<"up">> := true,
+            <<"shards">> :=
+                [
+                    #{
+                        <<"storage">> := <<"emqx_persistent_message">>,
+                        <<"id">> := _,
+                        <<"status">> := <<"up">>
+                    }
+                    | _
+                ]
+        },
+        emqx_utils_json:decode(Response, [return_maps])
+    ).
+
+t_get_db(_) ->
+    %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
+    Path400 = api_path(["ds", "storages", "unknown_ds"]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        request_api(get, Path400)
+    ),
+    %% Valid path:
+    Path = api_path(["ds", "storages", "emqx_persistent_message"]),
+    {ok, Response} = request_api(get, Path),
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    ?assertMatch(
+        #{
+            <<"name">> := <<"emqx_persistent_message">>,
+            <<"shards">> :=
+                [
+                    #{
+                        <<"id">> := _,
+                        <<"replicas">> :=
+                            [
+                                #{
+                                    <<"site">> := ThisSite,
+                                    <<"status">> := <<"up">>
+                                }
+                                | _
+                            ]
+                    }
+                    | _
+                ]
+        },
+        emqx_utils_json:decode(Response)
+    ).
+
+t_get_replicas(_) ->
+    %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
+    Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        request_api(get, Path400)
+    ),
+    %% Valid path:
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
+    {ok, Response} = request_api(get, Path),
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    ?assertEqual(
+        [ThisSite],
+        emqx_utils_json:decode(Response)
+    ).
+
+t_put_replicas(_) ->
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
+    %% Error cases:
+    ?assertMatch(
+        {ok, 400, #{<<"message">> := <<"Unknown sites: [\"invalid_site\"]">>}},
+        parse_error(request_api_with_body(put, Path, [<<"invalid_site">>]))
+    ),
+    %% Success case:
+    ?assertMatch(
+        {ok, 202, <<"OK">>},
+        request_api_with_body(put, Path, [emqx_ds_replication_layer_meta:this_site()])
+    ).
+
+t_join(_) ->
+    Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        parse_error(request_api(put, Path400))
+    ),
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
+    ?assertMatch(
+        {ok, "OK"},
+        request_api(put, Path)
+    ).
+
+t_leave(_) ->
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
+    ?assertMatch(
+        {ok, "OK"},
+        request_api(delete, Path)
+    ).
+
+parse_error({ok, Code, JSON}) ->
+    {ok, Code, emqx_utils_json:decode(JSON)};
+parse_error(Err) ->
+    Err.