Просмотр исходного кода

fix(connector_api): return status reason when status is inconsistent

Fixes https://emqx.atlassian.net/browse/EMQX-11581
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
4a71aa58ce

+ 12 - 2
apps/emqx_connector/src/emqx_connector_api.erl

@@ -589,14 +589,24 @@ pick_connectors_by_id(Type, Name, ConnectorsAllNodes) ->
 format_connector_info([FirstConnector | _] = Connectors) ->
     Res = maps:remove(node, FirstConnector),
     NodeStatus = node_status(Connectors),
-    redact(Res#{
+    StatusReason = first_status_reason(Connectors),
+    Info0 = Res#{
         status => aggregate_status(NodeStatus),
         node_status => NodeStatus
-    }).
+    },
+    Info = emqx_utils_maps:put_if(Info0, status_reason, StatusReason, StatusReason =/= undefined),
+    redact(Info).
 
 node_status(Connectors) ->
     [maps:with([node, status, status_reason], B) || B <- Connectors].
 
+first_status_reason(Connectors) ->
+    StatusReasons = [Reason || #{status_reason := Reason} <- Connectors, Reason =/= undefined],
+    case StatusReasons of
+        [Reason | _] -> Reason;
+        _ -> undefined
+    end.
+
 aggregate_status(AllStatus) ->
     Head = fun([A | _]) -> A end,
     HeadVal = maps:get(status, Head(AllStatus), connecting),

+ 63 - 45
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -23,6 +23,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/test_macros.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))).
 -define(RESOURCE(NAME, TYPE), #{
@@ -103,48 +104,6 @@
 }).
 -define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)).
 
-%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>).
-%% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{
-%%     <<"server">> => SERVER,
-%%     <<"username">> => <<"user1">>,
-%%     <<"password">> => <<"">>,
-%%     <<"proto_ver">> => <<"v5">>,
-%%     <<"egress">> => #{
-%%         <<"remote">> => #{
-%%             <<"topic">> => <<"emqx/${topic}">>,
-%%             <<"qos">> => <<"${qos}">>,
-%%             <<"retain">> => false
-%%         }
-%%     }
-%% }).
-%% -define(MQTT_CONNECTOR(SERVER), ?MQTT_CONNECTOR(SERVER, <<"mqtt_egress_test_connector">>)).
-
-%% -define(CONNECTOR_TYPE_HTTP, <<"kafka_producer">>).
-%% -define(HTTP_CONNECTOR(URL, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_HTTP)#{
-%%     <<"url">> => URL,
-%%     <<"local_topic">> => <<"emqx_webhook/#">>,
-%%     <<"method">> => <<"post">>,
-%%     <<"body">> => <<"${payload}">>,
-%%     <<"headers">> => #{
-%%         % NOTE
-%%         % The Pascal-Case is important here.
-%%         % The reason is kinda ridiculous: `emqx_connector_resource:create_dry_run/2` converts
-%%         % connector config keys into atoms, and the atom 'Content-Type' exists in the ERTS
-%%         % when this happens (while the 'content-type' does not).
-%%         <<"Content-Type">> => <<"application/json">>
-%%     }
-%% }).
-%% -define(HTTP_CONNECTOR(URL), ?HTTP_CONNECTOR(URL, ?CONNECTOR_NAME)).
-
-%% -define(URL(PORT, PATH),
-%%         list_to_binary(
-%%           io_lib:format(
-%%             "http://localhost:~s/~s",
-%%             [integer_to_list(PORT), PATH]
-%%            )
-%%          )
-%%        ).
-
 -define(APPSPECS, [
     emqx_conf,
     emqx,
@@ -178,11 +137,14 @@ groups() ->
         t_fail_delete_with_action,
         t_actions_field
     ],
+    ClusterOnlyTests = [
+        t_inconsistent_state
+    ],
     ClusterLaterJoinOnlyTCs = [
         % t_cluster_later_join_metrics
     ],
     [
-        {single, [], AllTCs -- ClusterLaterJoinOnlyTCs},
+        {single, [], (AllTCs -- ClusterLaterJoinOnlyTCs) -- ClusterOnlyTests},
         {cluster_later_join, [], ClusterLaterJoinOnlyTCs},
         {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs}
     ].
@@ -268,6 +230,8 @@ init_mocks(_TestCase) ->
         fun
             (<<"connector:", ?CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) ->
                 {ok, bad_connector_state};
+            (_I, #{bootstrap_hosts := <<"nope:9092">>}) ->
+                {ok, worst_connector_state};
             (_I, _C) ->
                 {ok, connector_state}
         end
@@ -277,8 +241,17 @@ init_mocks(_TestCase) ->
         ?CONNECTOR_IMPL,
         on_get_status,
         fun
-            (_, bad_connector_state) -> connecting;
-            (_, _) -> connected
+            (_, bad_connector_state) ->
+                connecting;
+            (_, worst_connector_state) ->
+                {?status_disconnected, worst_connector_state, [
+                    #{
+                        host => <<"nope:9092">>,
+                        reason => unresolvable_hostname
+                    }
+                ]};
+            (_, _) ->
+                connected
         end
     ),
     meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}),
@@ -858,6 +831,51 @@ t_raw_config_response_defaults(Config) ->
     ),
     ok.
 
+t_inconsistent_state(Config) ->
+    [_, Node2] = ?config(cluster_nodes, Config),
+    Params = ?KAFKA_CONNECTOR(?CONNECTOR_NAME),
+    ?assertMatch(
+        {ok, 201, #{<<"enable">> := true, <<"resource_opts">> := #{}}},
+        request_json(
+            post,
+            uri(["connectors"]),
+            Params,
+            Config
+        )
+    ),
+    BadParams = maps:without(
+        [<<"name">>, <<"type">>],
+        Params#{<<"bootstrap_hosts">> := <<"nope:9092">>}
+    ),
+    {ok, _} = erpc:call(
+        Node2,
+        emqx,
+        update_config,
+        [[connectors, ?CONNECTOR_TYPE, ?CONNECTOR_NAME], BadParams, #{}]
+    ),
+
+    ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
+    ?assertMatch(
+        {ok, 200, #{
+            <<"status">> := <<"inconsistent">>,
+            <<"node_status">> := [
+                #{<<"status">> := <<"connected">>},
+                #{
+                    <<"status">> := <<"disconnected">>,
+                    <<"status_reason">> := _
+                }
+            ],
+            <<"status_reason">> := _
+        }},
+        request_json(
+            get,
+            uri(["connectors", ConnectorID]),
+            Config
+        )
+    ),
+
+    ok.
+
 %%% helpers
 listen_on_random_port() ->
     SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],