Просмотр исходного кода

fix(dsreplmeta): check site status when fetching shard info

Fixes https://emqx.atlassian.net/browse/EMQX-12356
Thales Macedo Garitezi 1 год назад
Родитель
Сommit
8ce16fd7d9

+ 8 - 3
apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl

@@ -273,7 +273,7 @@ shards(DB) ->
     [Shard || #?SHARD_TAB{shard = {_, Shard}} <- Recs].
 
 -spec shard_info(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
-    #{replica_set := #{site() => #{status => up | joining}}}
+    #{replica_set := #{site() => #{status => up | down}}}
     | undefined.
 shard_info(DB, Shard) ->
     case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
@@ -282,8 +282,13 @@ shard_info(DB, Shard) ->
         [#?SHARD_TAB{replica_set = Replicas}] ->
             ReplicaSet = maps:from_list([
                 begin
-                    %% TODO:
-                    ReplInfo = #{status => up},
+                    Status =
+                        case mria:cluster_status(?MODULE:node(I)) of
+                            running -> up;
+                            stopped -> down;
+                            false -> down
+                        end,
+                    ReplInfo = #{status => Status},
                     {I, ReplInfo}
                 end
              || I <- Replicas

+ 112 - 1
apps/emqx_management/test/emqx_mgmt_api_ds_SUITE.erl

@@ -23,8 +23,22 @@
 
 -import(emqx_mgmt_api_test_util, [api_path/1, request_api/2, request_api_with_body/3]).
 
+-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
+
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    AllTCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {group, cluster},
+        AllTCs -- cluster_tests()
+    ].
+
+groups() ->
+    [{cluster, [], cluster_tests()}].
+
+cluster_tests() ->
+    [
+        t_get_db_node_down
+    ].
 
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start(
@@ -41,6 +55,38 @@ init_per_suite(Config) ->
 end_per_suite(Config) ->
     ok = emqx_cth_suite:stop(?config(suite_apps, Config)).
 
+init_per_group(cluster = Group, Config) ->
+    AppSpecs = [
+        {emqx, "durable_sessions.enable = true\n"},
+        emqx_management
+    ],
+    Dashboard = emqx_mgmt_api_test_util:emqx_dashboard(
+        %% Using different port so it doesn't clash with the master node's port.
+        "dashboard.listeners.http { enable = true, bind = 18084 }"
+    ),
+    Cluster = [
+        {mgmt_api_ds_SUITE1, #{role => core, apps => AppSpecs ++ [Dashboard]}},
+        {mgmt_api_ds_SUITE2, #{role => core, apps => AppSpecs}}
+    ],
+    ClusterOpts = #{work_dir => emqx_cth_suite:work_dir(Group, Config)},
+    NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts),
+    Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts),
+    [
+        {nodes, Nodes},
+        {node_specs, NodeSpecs},
+        {cluster_opts, ClusterOpts}
+        | Config
+    ];
+init_per_group(_Group, Config) ->
+    Config.
+
+end_per_group(cluster, Config) ->
+    Nodes = ?config(nodes, Config),
+    emqx_cth_cluster:stop(Nodes),
+    ok;
+end_per_group(_Group, _Config) ->
+    ok.
+
 init_per_testcase(_, Config) ->
     Config.
 
@@ -124,6 +170,62 @@ t_get_db(_) ->
         emqx_utils_json:decode(Response)
     ).
 
+%% Smoke test that checks the status of down replica nodes.
+t_get_db_node_down(Config) ->
+    [_N1, N2] = ?config(nodes, Config),
+    [_N1Spec, N2Spec] = ?config(node_specs, Config),
+    Path = api_path_cluster(["ds", "storages", "messages"], Config),
+    Params = "",
+
+    {Status1, Res1} = simple_request(get, Path, Params),
+    ?assertEqual(200, Status1),
+    #{<<"shards">> := Shards1} = Res1,
+    Replicas1 = [Replica || #{<<"replicas">> := Replicas} <- Shards1, Replica <- Replicas],
+    ?assert(
+        lists:all(fun(#{<<"status">> := Status}) -> Status =:= <<"up">> end, Replicas1),
+        #{replicas => Replicas1}
+    ),
+
+    %% Now make one of the nodes down.
+    StoppedSite = ?ON(N2, emqx_ds_replication_layer_meta:this_site()),
+    ok = emqx_cth_cluster:stop_node(N2),
+    ct:sleep(1_000),
+
+    {Status2, Res2} = simple_request(get, Path, Params),
+    ?assertEqual(200, Status2),
+    #{<<"shards">> := Shards2} = Res2,
+    Replicas2 = [Replica || #{<<"replicas">> := Replicas} <- Shards2, Replica <- Replicas],
+    ?assert(
+        lists:all(
+            fun(#{<<"site">> := Site, <<"status">> := Status}) ->
+                case Site =:= StoppedSite of
+                    true -> Status =:= <<"down">>;
+                    false -> Status =:= <<"up">>
+                end
+            end,
+            Replicas2
+        ),
+        #{
+            replicas => Replicas2,
+            stopped_site => StoppedSite
+        }
+    ),
+
+    %% Back online
+    emqx_cth_cluster:restart(N2Spec),
+    ct:sleep(1_000),
+
+    {Status3, Res3} = simple_request(get, Path, Params),
+    ?assertEqual(200, Status3),
+    #{<<"shards">> := Shards3} = Res3,
+    Replicas3 = [Replica || #{<<"replicas">> := Replicas} <- Shards3, Replica <- Replicas],
+    ?assert(
+        lists:all(fun(#{<<"status">> := Status}) -> Status =:= <<"up">> end, Replicas3),
+        #{replicas => Replicas3}
+    ),
+
+    ok.
+
 t_get_replicas(_) ->
     %% Unknown DBs must result in error 400 (since the DS parameter is an enum):
     Path400 = api_path(["ds", "storages", "unknown_ds", "replicas"]),
@@ -186,3 +288,12 @@ parse_error({ok, Code, JSON}) ->
     {ok, Code, emqx_utils_json:decode(JSON)};
 parse_error(Err) ->
     Err.
+
+simple_request(Method, Path, Params) ->
+    emqx_mgmt_api_test_util:simple_request(Method, Path, Params).
+
+api_path_cluster(Parts, Config) ->
+    [Node1 | _] = ?config(nodes, Config),
+    Port = erpc:call(Node1, emqx_config, get, [[dashboard, listeners, http, bind]]),
+    Host = "http://127.0.0.1:" ++ integer_to_list(Port),
+    emqx_mgmt_api_test_util:api_path(Host, Parts).

+ 34 - 0
apps/emqx_management/test/emqx_mgmt_api_test_util.erl

@@ -285,3 +285,37 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) ->
         FileNames
     ),
     erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]).
+
+simple_request(Method, Path, Params) ->
+    simple_request(Method, Path, Params, _Opts = #{}).
+
+simple_request(Method, Path, Params, _Opts) ->
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    Opts = #{return_all => true},
+    case
+        emqx_mgmt_api_test_util:request_api(
+            Method, Path, _QueryParams = "", AuthHeader, Params, Opts
+        )
+    of
+        {ok, {{_, Status, _}, _Headers, Body0}} ->
+            Body = maybe_json_decode(Body0),
+            {Status, Body};
+        {error, {{_, Status, _}, _Headers, Body0}} ->
+            Body =
+                case emqx_utils_json:safe_decode(Body0, [return_maps]) of
+                    {ok, Decoded0 = #{<<"message">> := Msg0}} ->
+                        Msg = maybe_json_decode(Msg0),
+                        Decoded0#{<<"message">> := Msg};
+                    {ok, Decoded0} ->
+                        Decoded0;
+                    {error, _} ->
+                        Body0
+                end,
+            {Status, Body}
+    end.
+
+maybe_json_decode(X) ->
+    case emqx_utils_json:safe_decode(X, [return_maps]) of
+        {ok, Decoded} -> Decoded;
+        {error, _} -> X
+    end.

+ 1 - 0
changes/ce/fix-13291.en.md

@@ -0,0 +1 @@
+Fixed an issue where durable storage sites that were down being reported as up.