Jelajahi Sumber

Merge pull request #12833 from ieQu1/dev/ds-cluster-api

feat(ds): Add REST API for durable storage
ieQu1 1 tahun lalu
induk
melakukan
3f7b14c861

+ 31 - 8
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -29,7 +29,9 @@
 -export([
     shards/1,
     my_shards/1,
+    shard_info/2,
     allocate_shards/1,
+    replica_set/2,
     sites/0,
     node/1,
     this_site/0,
@@ -52,7 +54,6 @@
     replica_set_transitions/2,
     update_replica_set/3,
     db_sites/1,
-    replica_set/2,
     target_set/2
 ]).
 
@@ -72,7 +73,7 @@
     n_shards/1
 ]).
 
--export_type([site/0]).
+-export_type([site/0, update_cluster_result/0]).
 
 -include_lib("stdlib/include/qlc.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
@@ -118,6 +119,12 @@
 %% Membership transition of shard's replica set:
 -type transition() :: {add | del, site()}.
 
+-type update_cluster_result() ::
+    ok
+    | {error, {nonexistent_db, emqx_ds:db()}}
+    | {error, {nonexistent_sites, [site()]}}
+    | {error, _}.
+
 %% Peristent term key:
 -define(emqx_ds_builtin_site, emqx_ds_builtin_site).
 
@@ -182,6 +189,25 @@ shards(DB) ->
     Recs = mnesia:dirty_match_object(?SHARD_TAB, ?SHARD_PAT({DB, '_'})),
     [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
 
+-spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
+    #{replica_set := #{site() => #{status => up | joining}}}
+    | undefined.
+shard_info(DB, Shard) ->
+    case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
+        [] ->
+            undefined;
+        [#?SHARD_TAB{replica_set = Replicas}] ->
+            ReplicaSet = maps:from_list([
+                begin
+                    %% TODO:
+                    ReplInfo = #{status => up},
+                    {I, ReplInfo}
+                end
+             || I <- Replicas
+            ]),
+            #{replica_set => ReplicaSet}
+    end.
+
 -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
 my_shards(DB) ->
     Site = this_site(),
@@ -243,20 +269,17 @@ drop_db(DB) ->
 %%===============================================================================
 
 %% @doc Join a site to the set of sites the DB is replicated across.
--spec join_db_site(emqx_ds:db(), site()) ->
-    ok | {error, nonexistent_db | nonexistent_sites}.
+-spec join_db_site(emqx_ds:db(), site()) -> update_cluster_result().
 join_db_site(DB, Site) ->
     transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{add, Site}]]).
 
 %% @doc Make a site leave the set of sites the DB is replicated across.
--spec leave_db_site(emqx_ds:db(), site()) ->
-    ok | {error, nonexistent_db | nonexistent_sites}.
+-spec leave_db_site(emqx_ds:db(), site()) -> update_cluster_result().
 leave_db_site(DB, Site) ->
     transaction(fun ?MODULE:modify_db_sites_trans/2, [DB, [{del, Site}]]).
 
 %% @doc Assign a set of sites to the DB for replication.
--spec assign_db_sites(emqx_ds:db(), [site()]) ->
-    ok | {error, nonexistent_db | nonexistent_sites}.
+-spec assign_db_sites(emqx_ds:db(), [site()]) -> update_cluster_result().
 assign_db_sites(DB, Sites) ->
     transaction(fun ?MODULE:assign_db_sites_trans/2, [DB, Sites]).
 

+ 481 - 0
apps/emqx_management/src/emqx_mgmt_api_ds.erl

@@ -0,0 +1,481 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_ds).
+
+-behaviour(minirest_api).
+
+-include_lib("emqx/include/logger.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_utils/include/emqx_utils_api.hrl").
+
+-import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
+
+%% API:
+-export([
+    list_sites/2,
+    get_site/2,
+    list_dbs/2,
+    get_db/2,
+    db_replicas/2,
+    db_replica/2,
+
+    update_db_sites/3,
+    join/3,
+    leave/3
+]).
+
+%% behavior callbacks:
+-export([
+    namespace/0,
+    api_spec/0,
+    schema/1,
+    paths/0,
+    fields/1
+]).
+
+%% internal exports:
+-export([]).
+
+-export_type([]).
+
+%%================================================================================
+%% Type declarations
+%%================================================================================
+
+-define(TAGS, [<<"Durable storage">>]).
+
+%%================================================================================
+%% behavior callbacks
+%%================================================================================
+
+namespace() ->
+    undefined.
+
+api_spec() ->
+    emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
+
+paths() ->
+    [
+        "/ds/sites",
+        "/ds/sites/:site",
+        "/ds/storages",
+        "/ds/storages/:ds",
+        "/ds/storages/:ds/replicas",
+        "/ds/storages/:ds/replicas/:site"
+    ].
+
+schema("/ds/sites") ->
+    #{
+        'operationId' => list_sites,
+        get =>
+            #{
+                description => <<"List sites">>,
+                tags => ?TAGS,
+                responses =>
+                    #{
+                        200 => mk(array(binary()), #{desc => <<"List sites">>})
+                    }
+            }
+    };
+schema("/ds/sites/:site") ->
+    #{
+        'operationId' => get_site,
+        get =>
+            #{
+                description => <<"Get sites">>,
+                parameters => [param_site_id()],
+                tags => ?TAGS,
+                responses =>
+                    #{
+                        200 => mk(ref(site), #{desc => <<"Get information about the site">>}),
+                        404 => not_found(<<"Site">>)
+                    }
+            }
+    };
+schema("/ds/storages") ->
+    #{
+        'operationId' => list_dbs,
+        get =>
+            #{
+                description => <<"List durable storages">>,
+                tags => ?TAGS,
+                responses =>
+                    #{
+                        200 => mk(array(atom()), #{desc => <<"List durable storages">>})
+                    }
+            }
+    };
+schema("/ds/storages/:ds") ->
+    #{
+        'operationId' => get_db,
+        get =>
+            #{
+                description => <<"Get durable storage">>,
+                tags => ?TAGS,
+                parameters => [param_storage_id()],
+                responses =>
+                    #{
+                        200 => mk(ref(db), #{desc => <<"Get information about a durable storage">>}),
+                        400 => not_found(<<"Durable storage">>)
+                    }
+            }
+    };
+schema("/ds/storages/:ds/replicas") ->
+    Parameters = [param_storage_id()],
+    #{
+        'operationId' => db_replicas,
+        get =>
+            #{
+                description => <<"List replicas of the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        200 => mk(array(binary()), #{
+                            desc => <<"List sites that contain replicas of the durable storage">>
+                        }),
+                        400 => not_found(<<"Durable storage">>)
+                    }
+            },
+        put =>
+            #{
+                description => <<"Update replicas of the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        202 => mk(array(binary()), #{}),
+                        400 => bad_request()
+                    },
+                'requestBody' => mk(array(binary()), #{desc => <<"New list of sites">>})
+            }
+    };
+schema("/ds/storages/:ds/replicas/:site") ->
+    Parameters = [param_storage_id(), param_site_id()],
+    #{
+        'operationId' => db_replica,
+        put =>
+            #{
+                description => <<"Add site as a replica for the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        202 => <<"OK">>,
+                        400 => bad_request(),
+                        404 => not_found(<<"Object">>)
+                    }
+            },
+        delete =>
+            #{
+                description => <<"Remove site as a replica for the durable storage">>,
+                tags => ?TAGS,
+                parameters => Parameters,
+                responses =>
+                    #{
+                        202 => <<"OK">>,
+                        400 => bad_request(),
+                        404 => not_found(<<"Object">>)
+                    }
+            }
+    }.
+
+fields(site) ->
+    [
+        {node,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Name of the EMQX handling the site">>,
+                    example => <<"'emqx@example.com'">>
+                }
+            )},
+        {up,
+            mk(
+                boolean(),
+                #{desc => <<"Site is up and running">>}
+            )},
+        {shards,
+            mk(
+                array(ref(sites_shard)),
+                #{desc => <<"Durable storages that have replicas at the site">>}
+            )}
+    ];
+fields(sites_shard) ->
+    [
+        {storage,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Durable storage ID">>,
+                    example => 'emqx_persistent_message'
+                }
+            )},
+        {id,
+            mk(
+                binary(),
+                #{
+                    desc => <<"Shard ID">>,
+                    example => <<"1">>
+                }
+            )},
+        {status,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Shard status">>,
+                    example => up
+                }
+            )}
+    ];
+fields(db) ->
+    [
+        {name,
+            mk(
+                atom(),
+                #{
+                    desc => <<"Name of the durable storage">>,
+                    example => 'emqx_persistent_message'
+                }
+            )},
+        {shards,
+            mk(
+                array(ref(db_shard)),
+                #{desc => <<"List of storage shards">>}
+            )}
+    ];
+fields(db_shard) ->
+    [
+        {id,
+            mk(
+                binary(),
+                #{
+                    desc => <<"Shard ID">>,
+                    example => <<"1">>
+                }
+            )},
+        {replicas,
+            mk(
+                hoconsc:array(ref(db_site)),
+                #{desc => <<"List of sites containing replicas of the storage">>}
+            )}
+    ];
+fields(db_site) ->
+    [
+        {site,
+            mk(
+                binary(),
+                #{
+                    desc => <<"Site ID">>,
+                    example => example_site()
+                }
+            )},
+        {status,
+            mk(
+                enum([up, joining]),
+                #{desc => <<"Status of the replica">>}
+            )}
+    ].
+
+%%================================================================================
+%% Internal exports
+%%================================================================================
+
+list_sites(get, _Params) ->
+    {200, emqx_ds_replication_layer_meta:sites()}.
+
+get_site(get, #{bindings := #{site := Site}}) ->
+    case lists:member(Site, emqx_ds_replication_layer_meta:sites()) of
+        false ->
+            ?NOT_FOUND(<<"Site not found: ", Site/binary>>);
+        true ->
+            Node = emqx_ds_replication_layer_meta:node(Site),
+            IsUp = lists:member(Node, [node() | nodes()]),
+            Shards = shards_of_site(Site),
+            ?OK(#{
+                node => Node,
+                up => IsUp,
+                shards => Shards
+            })
+    end.
+
+list_dbs(get, _Params) ->
+    ?OK(dbs()).
+
+get_db(get, #{bindings := #{ds := DB}}) ->
+    ?OK(#{
+        name => DB,
+        shards => list_shards(DB)
+    }).
+
+db_replicas(get, #{bindings := #{ds := DB}}) ->
+    Replicas = lists:flatmap(
+        fun(Shard) ->
+            #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
+            maps:keys(RS)
+        end,
+        emqx_ds_replication_layer_meta:shards(DB)
+    ),
+    ?OK(lists:usort(Replicas));
+db_replicas(put, #{bindings := #{ds := DB}, body := Sites}) ->
+    case update_db_sites(DB, Sites, rest) of
+        ok ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
+    end.
+
+db_replica(put, #{bindings := #{ds := DB, site := Site}}) ->
+    case join(DB, Site, rest) of
+        ok ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
+    end;
+db_replica(delete, #{bindings := #{ds := DB, site := Site}}) ->
+    case leave(DB, Site, rest) of
+        ok ->
+            {202, <<"OK">>};
+        {error, Description} ->
+            ?BAD_REQUEST(400, Description)
+    end.
+
+-spec update_db_sites(emqx_ds:db(), [emqx_ds_replication_layer_meta:site()], rest | cli) ->
+    ok | {error, binary()}.
+update_db_sites(DB, Sites, Via) when is_list(Sites) ->
+    ?SLOG(warning, #{
+        msg => "durable_storage_rebalance_request", ds => DB, sites => Sites, via => Via
+    }),
+    meta_result_to_binary(emqx_ds_replication_layer_meta:assign_db_sites(DB, Sites));
+update_db_sites(_, _, _) ->
+    {error, <<"Bad type">>}.
+
+-spec join(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
+join(DB, Site, Via) ->
+    ?SLOG(warning, #{
+        msg => "durable_storage_join_request", ds => DB, site => Site, via => Via
+    }),
+    meta_result_to_binary(emqx_ds_replication_layer_meta:join_db_site(DB, Site)).
+
+-spec leave(emqx_ds:db(), emqx_ds_replication_layer_meta:site(), rest | cli) -> ok | {error, _}.
+leave(DB, Site, Via) ->
+    ?SLOG(warning, #{
+        msg => "durable_storage_leave_request", ds => DB, site => Site, via => Via
+    }),
+    meta_result_to_binary(emqx_ds_replication_layer_meta:leave_db_site(DB, Site)).
+
+%%================================================================================
+%% Internal functions
+%%================================================================================
+
+%% site_info(Site) ->
+%%     #{}.
+
+not_found(What) ->
+    emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<What/binary, " not found">>).
+
+bad_request() ->
+    emqx_dashboard_swagger:error_codes(['BAD_REQUEST'], <<"Bad request">>).
+
+param_site_id() ->
+    Info = #{
+        required => true,
+        in => path,
+        desc => <<"Site ID">>,
+        example => example_site()
+    },
+    {site, mk(binary(), Info)}.
+
+param_storage_id() ->
+    Info = #{
+        required => true,
+        in => path,
+        desc => <<"Durable storage ID">>,
+        example => emqx_persistent_message
+    },
+    {ds, mk(enum(dbs()), Info)}.
+
+example_site() ->
+    try
+        emqx_ds_replication_layer_meta:this_site()
+    catch
+        _:_ ->
+            <<"AFA18CB1C22F0157">>
+    end.
+
+dbs() ->
+    [emqx_persistent_message].
+
+shards_of_site(Site) ->
+    lists:flatmap(
+        fun({DB, Shard}) ->
+            case emqx_ds_replication_layer_meta:shard_info(DB, Shard) of
+                #{replica_set := #{Site := Info}} ->
+                    [
+                        #{
+                            storage => DB,
+                            id => Shard,
+                            status => maps:get(status, Info)
+                        }
+                    ];
+                _ ->
+                    []
+            end
+        end,
+        [
+            {DB, Shard}
+         || DB <- dbs(),
+            Shard <- emqx_ds_replication_layer_meta:shards(DB)
+        ]
+    ).
+
+list_shards(DB) ->
+    [
+        begin
+            #{replica_set := RS} = emqx_ds_replication_layer_meta:shard_info(DB, Shard),
+            Replicas = maps:fold(
+                fun(Site, #{status := Status}, Acc) ->
+                    [
+                        #{
+                            site => Site,
+                            status => Status
+                        }
+                        | Acc
+                    ]
+                end,
+                [],
+                RS
+            ),
+            #{
+                id => Shard,
+                replicas => Replicas
+            }
+        end
+     || Shard <- emqx_ds_replication_layer_meta:shards(DB)
+    ].
+
+meta_result_to_binary(ok) ->
+    ok;
+meta_result_to_binary({error, {nonexistent_sites, UnknownSites}}) ->
+    Msg = ["Unknown sites: " | lists:join(", ", UnknownSites)],
+    {error, iolist_to_binary(Msg)};
+meta_result_to_binary({error, {nonexistent_db, DB}}) ->
+    IOList = io_lib:format("Unknown storage: ~p", [DB]),
+    {error, iolist_to_binary(IOList)};
+meta_result_to_binary({error, Err}) ->
+    IOList = io_lib:format("Error: ~p", [Err]),
+    {error, iolist_to_binary(IOList)}.

+ 42 - 1
apps/emqx_management/src/emqx_mgmt_cli.erl

@@ -810,9 +810,50 @@ ds(CMD) ->
 
 do_ds(["info"]) ->
     emqx_ds_replication_layer_meta:print_status();
+do_ds(["set_replicas", DBStr | SitesStr]) ->
+    case emqx_utils:safe_to_existing_atom(DBStr) of
+        {ok, DB} ->
+            Sites = lists:map(fun list_to_binary/1, SitesStr),
+            case emqx_mgmt_api_ds:update_db_sites(DB, Sites, cli) of
+                ok ->
+                    emqx_ctl:print("ok~n");
+                {error, Description} ->
+                    emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
+            end;
+        {error, _} ->
+            emqx_ctl:print("Unknown durable storage")
+    end;
+do_ds(["join", DBStr, Site]) ->
+    case emqx_utils:safe_to_existing_atom(DBStr) of
+        {ok, DB} ->
+            case emqx_mgmt_api_ds:join(DB, list_to_binary(Site), cli) of
+                ok ->
+                    emqx_ctl:print("ok~n");
+                {error, Description} ->
+                    emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
+            end;
+        {error, _} ->
+            emqx_ctl:print("Unknown durable storage~n")
+    end;
+do_ds(["leave", DBStr, Site]) ->
+    case emqx_utils:safe_to_existing_atom(DBStr) of
+        {ok, DB} ->
+            case emqx_mgmt_api_ds:leave(DB, list_to_binary(Site), cli) of
+                ok ->
+                    emqx_ctl:print("ok~n");
+                {error, Description} ->
+                    emqx_ctl:print("Unable to update replicas: ~s~n", [Description])
+            end;
+        {error, _} ->
+            emqx_ctl:print("Unknown durable storage~n")
+    end;
 do_ds(_) ->
     emqx_ctl:usage([
-        {"ds info", "Show overview of the embedded durable storage state"}
+        {"ds info", "Show overview of the embedded durable storage state"},
+        {"ds set_replicas <storage> <site1> <site2> ...",
+            "Change the replica set of the durable storage"},
+        {"ds join <storage> <site>", "Add site to the replica set of the storage"},
+        {"ds leave <storage> <site>", "Remove site from the replica set of the storage"}
     ]).
 
 %%--------------------------------------------------------------------

+ 180 - 0
apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl

@@ -0,0 +1,180 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_mgmt_api_ds_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(emqx_mgmt_api_test_util, [api_path/1, request_api/2, request_api_with_body/3]).
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx, "session_persistence.enable = true"},
+            emqx_management,
+            {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+        ],
+        #{work_dir => emqx_cth_suite:work_dir(Config)}
+    ),
+    {ok, _} = emqx_common_test_http:create_default_app(),
+    [{suite_apps, Apps} | Config].
+
+end_per_suite(Config) ->
+    ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
+
+init_per_testcase(_, Config) ->
+    Config.
+
+end_per_testcase(_, Config) ->
+    Config.
+
+t_get_sites(_) ->
+    Path = api_path(["ds", "sites"]),
+    {ok, Response} = request_api(get, Path),
+    ?assertEqual(
+        [emqx_ds_replication_layer_meta:this_site()],
+        emqx_utils_json:decode(Response, [return_maps])
+    ).
+
+t_get_storages(_) ->
+    Path = api_path(["ds", "storages"]),
+    {ok, Response} = request_api(get, Path),
+    ?assertEqual(
+        [<<"emqx_persistent_message">>],
+        emqx_utils_json:decode(Response, [return_maps])
+    ).
+
+t_get_site(_) ->
+    %% Unknown sites must result in error 404:
+    Path404 = api_path(["ds", "sites", "unknown_site"]),
+    ?assertMatch(
+        {error, {_, 404, _}},
+        request_api(get, Path404)
+    ),
+    %% Valid path:
+    Path = api_path(["ds", "sites", emqx_ds_replication_layer_meta:this_site()]),
+    {ok, Response} = request_api(get, Path),
+    ThisNode = atom_to_binary(node()),
+    ?assertMatch(
+        #{
+            <<"node">> := ThisNode,
+            <<"up">> := true,
+            <<"shards">> :=
+                [
+                    #{
+                        <<"storage">> := <<"emqx_persistent_message">>,
+                        <<"id">> := _,
+                        <<"status">> := <<"up">>
+                    }
+                    | _
+                ]
+        },
+        emqx_utils_json:decode(Response, [return_maps])
+    ).
+
+t_get_db(_) ->
+    %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
+    Path400 = api_path(["ds", "storages", "unknown_ds"]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        request_api(get, Path400)
+    ),
+    %% Valid path:
+    Path = api_path(["ds", "storages", "emqx_persistent_message"]),
+    {ok, Response} = request_api(get, Path),
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    ?assertMatch(
+        #{
+            <<"name">> := <<"emqx_persistent_message">>,
+            <<"shards">> :=
+                [
+                    #{
+                        <<"id">> := _,
+                        <<"replicas">> :=
+                            [
+                                #{
+                                    <<"site">> := ThisSite,
+                                    <<"status">> := <<"up">>
+                                }
+                                | _
+                            ]
+                    }
+                    | _
+                ]
+        },
+        emqx_utils_json:decode(Response)
+    ).
+
+t_get_replicas(_) ->
+    %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
+    Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        request_api(get, Path400)
+    ),
+    %% Valid path:
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
+    {ok, Response} = request_api(get, Path),
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    ?assertEqual(
+        [ThisSite],
+        emqx_utils_json:decode(Response)
+    ).
+
+t_put_replicas(_) ->
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas"]),
+    %% Error cases:
+    ?assertMatch(
+        {ok, 400, #{<<"message">> := <<"Unknown sites: invalid_site">>}},
+        parse_error(request_api_with_body(put, Path, [<<"invalid_site">>]))
+    ),
+    %% Success case:
+    ?assertMatch(
+        {ok, 202, <<"OK">>},
+        request_api_with_body(put, Path, [emqx_ds_replication_layer_meta:this_site()])
+    ).
+
+t_join(_) ->
+    Path400 = api_path(["ds", "storages", "emqx_persistent_message", "replicas", "unknown_site"]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        parse_error(request_api(put, Path400))
+    ),
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
+    ?assertMatch(
+        {ok, "OK"},
+        request_api(put, Path)
+    ).
+
+t_leave(_) ->
+    ThisSite = emqx_ds_replication_layer_meta:this_site(),
+    Path = api_path(["ds", "storages", "emqx_persistent_message", "replicas", ThisSite]),
+    ?assertMatch(
+        {error, {_, 400, _}},
+        request_api(delete, Path)
+    ).
+
+parse_error({ok, Code, JSON}) ->
+    {ok, Code, emqx_utils_json:decode(JSON)};
+parse_error(Err) ->
+    Err.