Просмотр исходного кода

Merge pull request #11960 from thalesmg/fix-kafka-producer-channel-status-r53-20231116

fix(kafka_producer): make status `connecting` while the client fails to connect
Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
926078b82c

+ 18 - 0
apps/emqx/test/emqx_cth_suite.erl

@@ -74,6 +74,9 @@
 
 -export([merge_appspec/2]).
 
+%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs
+-export([schema_module/0, upgrade_raw_conf/1]).
+
 -export_type([appspec/0]).
 -export_type([appspec_opts/0]).
 
@@ -477,3 +480,18 @@ render_config(Config = #{}) ->
     unicode:characters_to_binary(hocon_pp:do(Config, #{}));
 render_config(Config) ->
     unicode:characters_to_binary(Config).
+
+%%
+
+%% "Unofficial" `emqx_config_handler' API
+schema_module() ->
+    ?MODULE.
+
+%% "Unofficial" `emqx_conf' API
+upgrade_raw_conf(Conf) ->
+    case emqx_release:edition() of
+        ee ->
+            emqx_enterprise_schema:upgrade_raw_conf(Conf);
+        ce ->
+            emqx_conf_schema:upgrade_raw_conf(Conf)
+    end.

+ 14 - 11
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -202,33 +202,36 @@ lookup(Type, Name) ->
             %% The connector should always exist
             %% ... but, in theory, there might be no channels associated to it when we try
             %% to delete the connector, and then this reference will become dangling...
-            InstanceData =
+            ConnectorData =
                 case emqx_resource:get_instance(ConnectorId) of
                     {ok, _, Data} ->
                         Data;
                     {error, not_found} ->
                         #{}
                 end,
-            %% Find the Bridge V2 status from the InstanceData
-            Channels = maps:get(added_channels, InstanceData, #{}),
+            %% Find the Bridge V2 status from the ConnectorData
+            ConnectorStatus = maps:get(status, ConnectorData, undefined),
+            Channels = maps:get(added_channels, ConnectorData, #{}),
             BridgeV2Id = id(Type, Name, BridgeConnector),
             ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
             {DisplayBridgeV2Status, ErrorMsg} =
-                case ChannelStatus of
-                    #{status := connected} ->
-                        {connected, <<"">>};
-                    #{status := Status, error := undefined} ->
+                case {ChannelStatus, ConnectorStatus} of
+                    {#{status := ?status_connected}, _} ->
+                        {?status_connected, <<"">>};
+                    {#{error := resource_not_operational}, ?status_connecting} ->
+                        {?status_connecting, <<"Not installed">>};
+                    {#{status := Status, error := undefined}, _} ->
                         {Status, <<"Unknown reason">>};
-                    #{status := Status, error := Error} ->
+                    {#{status := Status, error := Error}, _} ->
                         {Status, emqx_utils:readable_error_msg(Error)};
-                    undefined ->
-                        {disconnected, <<"Pending installation">>}
+                    {undefined, _} ->
+                        {?status_disconnected, <<"Not installed">>}
                 end,
             {ok, #{
                 type => bin(Type),
                 name => bin(Name),
                 raw_config => RawConf,
-                resource_data => InstanceData,
+                resource_data => ConnectorData,
                 status => DisplayBridgeV2Status,
                 error => ErrorMsg
             }}

+ 58 - 10
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -20,6 +20,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -43,7 +44,7 @@ con_schema() ->
         {
             con_type(),
             hoconsc:mk(
-                hoconsc:map(name, typerefl:map()),
+                hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)),
                 #{
                     desc => <<"Test Connector Config">>,
                     required => false
@@ -52,6 +53,15 @@ con_schema() ->
         }
     ].
 
+fields(connector_config) ->
+    [
+        {enable, hoconsc:mk(typerefl:boolean(), #{})},
+        {resource_opts, hoconsc:mk(typerefl:map(), #{})},
+        {on_start_fun, hoconsc:mk(typerefl:binary(), #{})},
+        {on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})},
+        {on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})}
+    ].
+
 con_config() ->
     #{
         <<"enable">> => true,
@@ -112,6 +122,7 @@ setup_mocks() ->
 
     catch meck:new(emqx_connector_schema, MeckOpts),
     meck:expect(emqx_connector_schema, fields, 1, con_schema()),
+    meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]),
 
     catch meck:new(emqx_connector_resource, MeckOpts),
     meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
@@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) ->
     ets:new(fun_table_name(), [named_table, public]),
     %% Create a fake connector
     {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
-    [
-        {mocked_mods, [
-            emqx_connector_schema,
-            emqx_connector_resource,
-
-            emqx_bridge_v2
-        ]}
-        | Config
-    ].
+    Config.
 
 end_per_testcase(_TestCase, _Config) ->
     ets:delete(fun_table_name()),
@@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) ->
     ),
     ok.
 
+t_lookup_status_when_connecting(_Config) ->
+    ResponseETS = ets:new(response_ets, [public]),
+    ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
+    OnGetStatusFun = wrap_fun(fun() ->
+        ets:lookup_element(ResponseETS, on_get_status_value, 2)
+    end),
+
+    ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
+        <<"on_get_status_fun">> => OnGetStatusFun,
+        <<"resource_opts">> => #{<<"start_timeout">> => 100}
+    }),
+    ConnectorName = ?FUNCTION_NAME,
+    ct:pal("connector config:\n  ~p", [ConnectorConfig]),
+    {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
+
+    ActionName = my_test_action,
+    ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
+    ActionConfig = (bridge_config())#{
+        <<"on_get_channel_status_fun">> => ChanStatusFun,
+        <<"connector">> => atom_to_binary(ConnectorName)
+    },
+    ct:pal("action config:\n  ~p", [ActionConfig]),
+    {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
+
+    %% Top-level status is connecting if the connector status is connecting, but the
+    %% channel is not yet installed.  `resource_data.added_channels.$channel_id.status'
+    %% contains true internal status.
+    {ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName),
+    ?assertMatch(
+        #{
+            %% This is the action's public status
+            status := ?status_connecting,
+            resource_data :=
+                #{
+                    %% This is the connector's status
+                    status := ?status_connecting
+                }
+        },
+        Res
+    ),
+    #{resource_data := #{added_channels := Channels}} = Res,
+    [{_Id, ChannelData}] = maps:to_list(Channels),
+    ?assertMatch(#{status := ?status_disconnected}, ChannelData),
+    ok.
+
 %% Helper Functions
 
 wait_until(Fun) ->

+ 2 - 2
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -587,7 +587,7 @@ t_broken_bridge_config(Config) ->
                 <<"type">> := ?BRIDGE_TYPE,
                 <<"connector">> := <<"does_not_exist">>,
                 <<"status">> := <<"disconnected">>,
-                <<"error">> := <<"Pending installation">>
+                <<"error">> := <<"Not installed">>
             }
         ]},
         request_json(get, uri([?ROOT]), Config)
@@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) ->
                 <<"type">> := ?BRIDGE_TYPE,
                 <<"connector">> := <<"does_not_exist">>,
                 <<"status">> := <<"disconnected">>,
-                <<"error">> := <<"Pending installation">>
+                <<"error">> := <<"Not installed">>
             }
         ]},
         request_json(get, uri([?ROOT]), Config)

+ 2 - 2
apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl

@@ -43,8 +43,8 @@ on_start(
 ) ->
     Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
     Fun(Conf);
-on_start(_InstId, _Config) ->
-    {ok, #{}}.
+on_start(_InstId, Config) ->
+    {ok, Config}.
 
 on_add_channel(
     _InstId,

+ 5 - 5
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -481,11 +481,11 @@ on_get_status(
     case wolff_client_sup:find_client(ClientId) of
         {ok, Pid} ->
             case wolff_client:check_connectivity(Pid) of
-                ok -> connected;
-                {error, Error} -> {connecting, State, Error}
+                ok -> ?status_connected;
+                {error, Error} -> {?status_connecting, State, Error}
             end;
         {error, _Reason} ->
-            connecting
+            ?status_connecting
     end.
 
 on_get_channel_status(
@@ -499,10 +499,10 @@ on_get_channel_status(
     #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
     try
         ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
-        connected
+        ?status_connected
     catch
         throw:#{reason := restarting} ->
-            conneting
+            ?status_connecting
     end.
 
 check_topic_and_leader_connections(ClientId, KafkaTopic) ->

+ 154 - 102
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -23,8 +23,14 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("brod/include/brod.hrl").
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(TYPE, kafka_producer).
 
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -51,6 +57,135 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(Apps),
     ok.
 
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    emqx_common_test_helpers:call_janitor(60_000),
+    ok.
+
+%%-------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------
+
+check_send_message_with_bridge(BridgeName) ->
+    %% ######################################
+    %% Create Kafka message
+    %% ######################################
+    Time = erlang:unique_integer(),
+    BinTime = integer_to_binary(Time),
+    Payload = list_to_binary("payload" ++ integer_to_list(Time)),
+    Msg = #{
+        clientid => BinTime,
+        payload => Payload,
+        timestamp => Time
+    },
+    Offset = resolve_kafka_offset(),
+    %% ######################################
+    %% Send message
+    %% ######################################
+    emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
+    %% ######################################
+    %% Check if message is sent to Kafka
+    %% ######################################
+    check_kafka_message_payload(Offset, Payload).
+
+resolve_kafka_offset() ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    Partition = 0,
+    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
+        Hosts, KafkaTopic, Partition
+    ),
+    Offset0.
+
+check_kafka_message_payload(Offset, ExpectedPayload) ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    Partition = 0,
+    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
+    ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
+
+bridge_v2_config(ConnectorName) ->
+    #{
+        <<"connector">> => ConnectorName,
+        <<"enable">> => true,
+        <<"kafka">> => #{
+            <<"buffer">> => #{
+                <<"memory_overload_protection">> => false,
+                <<"mode">> => <<"memory">>,
+                <<"per_partition_limit">> => <<"2GB">>,
+                <<"segment_bytes">> => <<"100MB">>
+            },
+            <<"compression">> => <<"no_compression">>,
+            <<"kafka_header_value_encode_mode">> => <<"none">>,
+            <<"max_batch_bytes">> => <<"896KB">>,
+            <<"max_inflight">> => 10,
+            <<"message">> => #{
+                <<"key">> => <<"${.clientid}">>,
+                <<"timestamp">> => <<"${.timestamp}">>,
+                <<"value">> => <<"${.payload}">>
+            },
+            <<"partition_count_refresh_interval">> => <<"60s">>,
+            <<"partition_strategy">> => <<"random">>,
+            <<"query_mode">> => <<"sync">>,
+            <<"required_acks">> => <<"all_isr">>,
+            <<"sync_query_timeout">> => <<"5s">>,
+            <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
+        },
+        <<"local_topic">> => <<"kafka_t/#">>,
+        <<"resource_opts">> => #{
+            <<"health_check_interval">> => <<"15s">>
+        }
+    }.
+
+connector_config() ->
+    #{
+        <<"authentication">> => <<"none">>,
+        <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
+        <<"connect_timeout">> => <<"5s">>,
+        <<"enable">> => true,
+        <<"metadata_request_timeout">> => <<"5s">>,
+        <<"min_metadata_refresh_interval">> => <<"3s">>,
+        <<"socket_opts">> =>
+            #{
+                <<"recbuf">> => <<"1024KB">>,
+                <<"sndbuf">> => <<"1024KB">>,
+                <<"tcp_keepalive">> => <<"none">>
+            },
+        <<"ssl">> =>
+            #{
+                <<"ciphers">> => [],
+                <<"depth">> => 10,
+                <<"enable">> => false,
+                <<"hibernate_after">> => <<"5s">>,
+                <<"log_level">> => <<"notice">>,
+                <<"reuse_sessions">> => true,
+                <<"secure_renegotiate">> => true,
+                <<"verify">> => <<"verify_peer">>,
+                <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
+            }
+    }.
+
+kafka_hosts_string() ->
+    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
+    KafkaHost ++ ":" ++ KafkaPort.
+
+create_connector(Name, Config) ->
+    Res = emqx_connector:create(?TYPE, Name, Config),
+    on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
+    Res.
+
+create_action(Name, Config) ->
+    Res = emqx_bridge_v2:create(?TYPE, Name, Config),
+    on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
+    Res.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
 t_create_remove_list(_) ->
     [] = emqx_bridge_v2:list(),
     ConnectorConfig = connector_config(),
@@ -187,106 +322,23 @@ t_unknown_topic(_Config) ->
     ),
     ok.
 
-check_send_message_with_bridge(BridgeName) ->
-    %% ######################################
-    %% Create Kafka message
-    %% ######################################
-    Time = erlang:unique_integer(),
-    BinTime = integer_to_binary(Time),
-    Payload = list_to_binary("payload" ++ integer_to_list(Time)),
-    Msg = #{
-        clientid => BinTime,
-        payload => Payload,
-        timestamp => Time
-    },
-    Offset = resolve_kafka_offset(),
-    %% ######################################
-    %% Send message
-    %% ######################################
-    emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
-    %% ######################################
-    %% Check if message is sent to Kafka
-    %% ######################################
-    check_kafka_message_payload(Offset, Payload).
-
-resolve_kafka_offset() ->
-    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
-    Partition = 0,
-    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
-    {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
-        Hosts, KafkaTopic, Partition
+t_bad_url(_Config) ->
+    ConnectorName = <<"test_connector">>,
+    ActionName = <<"test_action">>,
+    ActionConfig = bridge_v2_config(<<"test_connector">>),
+    ConnectorConfig0 = connector_config(),
+    ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>},
+    ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
+    ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
+    ?assertMatch(
+        {ok, #{
+            resource_data :=
+                #{
+                    status := connecting,
+                    error := [#{reason := unresolvable_hostname}]
+                }
+        }},
+        emqx_connector:lookup(?TYPE, ConnectorName)
     ),
-    Offset0.
-
-check_kafka_message_payload(Offset, ExpectedPayload) ->
-    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
-    Partition = 0,
-    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
-    {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
-    ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
-
-bridge_v2_config(ConnectorName) ->
-    #{
-        <<"connector">> => ConnectorName,
-        <<"enable">> => true,
-        <<"kafka">> => #{
-            <<"buffer">> => #{
-                <<"memory_overload_protection">> => false,
-                <<"mode">> => <<"memory">>,
-                <<"per_partition_limit">> => <<"2GB">>,
-                <<"segment_bytes">> => <<"100MB">>
-            },
-            <<"compression">> => <<"no_compression">>,
-            <<"kafka_header_value_encode_mode">> => <<"none">>,
-            <<"max_batch_bytes">> => <<"896KB">>,
-            <<"max_inflight">> => 10,
-            <<"message">> => #{
-                <<"key">> => <<"${.clientid}">>,
-                <<"timestamp">> => <<"${.timestamp}">>,
-                <<"value">> => <<"${.payload}">>
-            },
-            <<"partition_count_refresh_interval">> => <<"60s">>,
-            <<"partition_strategy">> => <<"random">>,
-            <<"query_mode">> => <<"sync">>,
-            <<"required_acks">> => <<"all_isr">>,
-            <<"sync_query_timeout">> => <<"5s">>,
-            <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
-        },
-        <<"local_topic">> => <<"kafka_t/#">>,
-        <<"resource_opts">> => #{
-            <<"health_check_interval">> => <<"15s">>
-        }
-    }.
-
-connector_config() ->
-    #{
-        <<"authentication">> => <<"none">>,
-        <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
-        <<"connect_timeout">> => <<"5s">>,
-        <<"enable">> => true,
-        <<"metadata_request_timeout">> => <<"5s">>,
-        <<"min_metadata_refresh_interval">> => <<"3s">>,
-        <<"socket_opts">> =>
-            #{
-                <<"recbuf">> => <<"1024KB">>,
-                <<"sndbuf">> => <<"1024KB">>,
-                <<"tcp_keepalive">> => <<"none">>
-            },
-        <<"ssl">> =>
-            #{
-                <<"ciphers">> => [],
-                <<"depth">> => 10,
-                <<"enable">> => false,
-                <<"hibernate_after">> => <<"5s">>,
-                <<"log_level">> => <<"notice">>,
-                <<"reuse_sessions">> => true,
-                <<"secure_renegotiate">> => true,
-                <<"verify">> => <<"verify_peer">>,
-                <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
-            }
-    }.
-
-kafka_hosts_string() ->
-    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
-    KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
-    KafkaHost ++ ":" ++ KafkaPort.
+    ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
+    ok.

+ 16 - 2
apps/emqx_resource/include/emqx_resource.hrl

@@ -13,6 +13,16 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
+
+%% bridge/connector/action status
+-define(status_connected, connected).
+-define(status_connecting, connecting).
+-define(status_disconnected, disconnected).
+%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'...  Modules
+%% implementing `emqx_resource' behavior should not return it.  The `rm_' prefix is to
+%% remind us of that.
+-define(rm_status_stopped, stopped).
+
 -type resource_type() :: module().
 -type resource_id() :: binary().
 -type channel_id() :: binary().
@@ -21,8 +31,12 @@
 -type resource_config() :: term().
 -type resource_spec() :: map().
 -type resource_state() :: term().
--type resource_status() :: connected | disconnected | connecting | stopped.
--type channel_status() :: connected | connecting | disconnected.
+%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'...  Modules
+%% implementing `emqx_resource' behavior should not return it.
+-type resource_status() ::
+    ?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped.
+-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting.
+-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected.
 -type callback_mode() :: always_sync | async_if_possible.
 -type query_mode() ::
     simple_sync