Kaynağa Gözat

Merge pull request #12655 from thalesmg/fix-kconsu-status-r56-20240306

fix(kafka_consumer): check client connectivity
Thales Macedo Garitezi 1 yıl önce
ebeveyn
işleme
79d7821222

+ 18 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -454,6 +454,24 @@ probe_bridge_api(Kind, BridgeType, BridgeName, BridgeConfig) ->
     ct:pal("bridge probe (~s, http) result:\n  ~p", [Kind, Res]),
     Res.
 
+probe_connector_api(Config) ->
+    probe_connector_api(Config, _Overrides = #{}).
+
+probe_connector_api(Config, Overrides) ->
+    #{
+        connector_type := Type,
+        connector_name := Name
+    } = get_common_values(Config),
+    ConnectorConfig0 = get_value(connector_config, Config),
+    ConnectorConfig1 = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides),
+    Params = ConnectorConfig1#{<<"type">> => Type, <<"name">> => Name},
+    Path = emqx_mgmt_api_test_util:api_path(["connectors_probe"]),
+    ct:pal("probing connector (~s, http):\n  ~p", [Type, Params]),
+    Method = post,
+    Res = request(Method, Path, Params),
+    ct:pal("probing connector (~s, http) result:\n  ~p", [Type, Res]),
+    Res.
+
 list_bridges_http_api_v1() ->
     Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
     ct:pal("list bridges (http v1)"),

+ 44 - 4
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl

@@ -220,10 +220,17 @@ on_stop(ConnectorResId, State) ->
 
 -spec on_get_status(connector_resource_id(), connector_state()) ->
     ?status_connected | ?status_disconnected.
-on_get_status(_ConnectorResId, _State = #{kafka_client_id := ClientID}) ->
-    case brod_sup:find_client(ClientID) of
-        [_Pid] -> ?status_connected;
-        _ -> ?status_disconnected
+on_get_status(_ConnectorResId, State = #{kafka_client_id := ClientID}) ->
+    case whereis(ClientID) of
+        Pid when is_pid(Pid) ->
+            case check_client_connectivity(Pid) of
+                {Status, Reason} ->
+                    {Status, State, Reason};
+                Status ->
+                    Status
+            end;
+        _ ->
+            ?status_disconnected
     end;
 on_get_status(_ConnectorResId, _State) ->
     ?status_disconnected.
@@ -631,6 +638,39 @@ is_dry_run(ConnectorResId) ->
             string:equal(TestIdStart, ConnectorResId)
     end.
 
+-spec check_client_connectivity(pid()) ->
+    ?status_connected
+    | ?status_disconnected
+    | {?status_disconnected, term()}.
+check_client_connectivity(ClientPid) ->
+    %% We use a fake group id just to probe the connection, as `get_group_coordinator'
+    %% will ensure a connection to the broker.
+    FakeGroupId = <<"____emqx_consumer_probe">>,
+    case brod_client:get_group_coordinator(ClientPid, FakeGroupId) of
+        {error, client_down} ->
+            ?status_disconnected;
+        {error, {client_down, Reason}} ->
+            %% `brod' should have already logged the client being down.
+            {?status_disconnected, maybe_clean_error(Reason)};
+        {error, Reason} ->
+            %% `brod' should have already logged the client being down.
+            {?status_disconnected, maybe_clean_error(Reason)};
+        {ok, _Metadata} ->
+            ?status_connected
+    end.
+
+%% Attempt to make the returned error a bit more friendly.
+maybe_clean_error(Reason) ->
+    case Reason of
+        [{{Host, Port}, {nxdomain, _Stacktrace}} | _] when is_integer(Port) ->
+            HostPort = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
+            {HostPort, nxdomain};
+        [{error_code, Code}, {error_msg, Msg} | _] ->
+            {Code, Msg};
+        _ ->
+            Reason
+    end.
+
 -spec make_client_id(connector_resource_id(), binary(), atom() | binary()) -> atom().
 make_client_id(ConnectorResId, BridgeType, BridgeName) ->
     case is_dry_run(ConnectorResId) of

+ 1 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl

@@ -2071,6 +2071,7 @@ t_begin_offset_earliest(Config) ->
             {ok, _} = create_bridge(Config, #{
                 <<"kafka">> => #{<<"offset_reset_policy">> => <<"earliest">>}
             }),
+            ?retry(500, 20, ?assertEqual({ok, connected}, health_check(Config))),
 
             #{num_published => NumMessages}
         end,

+ 12 - 0
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl

@@ -339,3 +339,15 @@ t_update_topic(Config) ->
         emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
     ),
     ok.
+
+t_bad_bootstrap_host(Config) ->
+    ?assertMatch(
+        {error, {{_, 400, _}, _, _}},
+        emqx_bridge_v2_testlib:probe_connector_api(
+            Config,
+            #{
+                <<"bootstrap_hosts">> => <<"bad_host:9999">>
+            }
+        )
+    ),
+    ok.