Просмотр исходного кода

Merge remote-tracking branch 'origin/release-53' into sync-r53-to-m-20231120

Thales Macedo Garitezi 2 лет назад
Родитель
Сommit
53e796bbd0

+ 1 - 1
apps/emqx/test/emqx_routing_SUITE.erl

@@ -100,7 +100,7 @@ mk_config_listeners(N) ->
 
 t_cluster_routing(Config) ->
     Cluster = ?config(cluster, Config),
-    Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster],
+    Clients = [C1, C2, C3] = lists:sort([start_client(N) || N <- Cluster]),
     Commands = [
         {fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]},
         {fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]},

+ 41 - 13
apps/emqx_bridge/src/emqx_bridge.erl

@@ -55,7 +55,6 @@
 ]).
 
 -export([config_key_path/0]).
--export([validate_bridge_name/1]).
 
 %% exported for `emqx_telemetry'
 -export([get_basic_usage_info/0]).
@@ -269,7 +268,12 @@ config_key_path() ->
 pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
     {ok, RawConf};
 pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
-    {ok, convert_certs(NewConf)}.
+    case multi_validate_bridge_names(NewConf) of
+        ok ->
+            {ok, convert_certs(NewConf)};
+        Error ->
+            Error
+    end.
 
 post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
     #{added := Added, removed := Removed, changed := Updated} =
@@ -658,17 +662,13 @@ get_basic_usage_info() ->
             InitialAcc
     end.
 
-validate_bridge_name(BridgeName0) ->
-    BridgeName = to_bin(BridgeName0),
-    case re:run(BridgeName, ?MAP_KEY_RE, [{capture, none}]) of
-        match ->
-            ok;
-        nomatch ->
-            {error, #{
-                kind => validation_error,
-                reason => bad_bridge_name,
-                value => BridgeName
-            }}
+validate_bridge_name(BridgeName) ->
+    try
+        _ = emqx_resource:validate_name(to_bin(BridgeName)),
+        ok
+    catch
+        throw:Error ->
+            {error, Error}
     end.
 
 to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
@@ -676,3 +676,31 @@ to_bin(B) when is_binary(B) -> B.
 
 upgrade_type(Type) ->
     emqx_bridge_lib:upgrade_type(Type).
+
+multi_validate_bridge_names(Conf) ->
+    BridgeTypeAndNames =
+        [
+            {Type, Name}
+         || {Type, NameToConf} <- maps:to_list(Conf),
+            {Name, _Conf} <- maps:to_list(NameToConf)
+        ],
+    BadBridges =
+        lists:filtermap(
+            fun({Type, Name}) ->
+                case validate_bridge_name(Name) of
+                    ok -> false;
+                    _Error -> {true, #{type => Type, name => Name}}
+                end
+            end,
+            BridgeTypeAndNames
+        ),
+    case BadBridges of
+        [] ->
+            ok;
+        [_ | _] ->
+            {error, #{
+                kind => validation_error,
+                reason => bad_bridge_names,
+                bad_bridges => BadBridges
+            }}
+    end.

+ 15 - 3
apps/emqx_bridge/src/emqx_bridge_app.erl

@@ -63,7 +63,7 @@ pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
     %% to save the 'enable' to the config files
     {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
 pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
-    case validate_bridge_name(Path) of
+    case validate_bridge_name_in_config(Path) of
         ok ->
             case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
                 {error, Reason} ->
@@ -104,11 +104,23 @@ post_config_update([bridges, BridgeType, BridgeName], _Req, NewConf, OldConf, _A
 operation_to_enable(disable) -> false;
 operation_to_enable(enable) -> true.
 
-validate_bridge_name(Path) ->
+validate_bridge_name_in_config(Path) ->
     [RootKey] = emqx_bridge:config_key_path(),
     case Path of
         [RootKey, _BridgeType, BridgeName] ->
-            emqx_bridge:validate_bridge_name(BridgeName);
+            validate_bridge_name(BridgeName);
         _ ->
             ok
     end.
+
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+to_bin(B) when is_binary(B) -> B.
+
+validate_bridge_name(BridgeName) ->
+    try
+        _ = emqx_resource:validate_name(to_bin(BridgeName)),
+        ok
+    catch
+        throw:Error ->
+            {error, Error}
+    end.

+ 14 - 11
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -202,33 +202,36 @@ lookup(Type, Name) ->
             %% The connector should always exist
             %% ... but, in theory, there might be no channels associated to it when we try
             %% to delete the connector, and then this reference will become dangling...
-            InstanceData =
+            ConnectorData =
                 case emqx_resource:get_instance(ConnectorId) of
                     {ok, _, Data} ->
                         Data;
                     {error, not_found} ->
                         #{}
                 end,
-            %% Find the Bridge V2 status from the InstanceData
-            Channels = maps:get(added_channels, InstanceData, #{}),
+            %% Find the Bridge V2 status from the ConnectorData
+            ConnectorStatus = maps:get(status, ConnectorData, undefined),
+            Channels = maps:get(added_channels, ConnectorData, #{}),
             BridgeV2Id = id(Type, Name, BridgeConnector),
             ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
             {DisplayBridgeV2Status, ErrorMsg} =
-                case ChannelStatus of
-                    #{status := connected} ->
-                        {connected, <<"">>};
-                    #{status := Status, error := undefined} ->
+                case {ChannelStatus, ConnectorStatus} of
+                    {#{status := ?status_connected}, _} ->
+                        {?status_connected, <<"">>};
+                    {#{error := resource_not_operational}, ?status_connecting} ->
+                        {?status_connecting, <<"Not installed">>};
+                    {#{status := Status, error := undefined}, _} ->
                         {Status, <<"Unknown reason">>};
-                    #{status := Status, error := Error} ->
+                    {#{status := Status, error := Error}, _} ->
                         {Status, emqx_utils:readable_error_msg(Error)};
-                    undefined ->
-                        {disconnected, <<"Pending installation">>}
+                    {undefined, _} ->
+                        {?status_disconnected, <<"Not installed">>}
                 end,
             {ok, #{
                 type => bin(Type),
                 name => bin(Name),
                 raw_config => RawConf,
-                resource_data => InstanceData,
+                resource_data => ConnectorData,
                 status => DisplayBridgeV2Status,
                 error => ErrorMsg
             }}

+ 1 - 3
apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl

@@ -82,9 +82,7 @@ schema_modules() ->
     ].
 
 examples(Method) ->
-    ActionExamples = emqx_bridge_v2_schema:examples(Method),
-    RegisteredExamples = registered_examples(Method),
-    maps:merge(ActionExamples, RegisteredExamples).
+    registered_examples(Method).
 
 registered_examples(Method) ->
     MergeFun =

+ 29 - 1
apps/emqx_bridge/test/emqx_bridge_SUITE.erl

@@ -199,13 +199,41 @@ t_create_with_bad_name(_Config) ->
     ?assertMatch(
         {error,
             {pre_config_update, emqx_bridge_app, #{
-                reason := bad_bridge_name,
+                reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>,
                 kind := validation_error
             }}},
         emqx:update_config(Path, Conf)
     ),
     ok.
 
+t_create_with_bad_name_root(_Config) ->
+    BadBridgeName = <<"test_哈哈">>,
+    BridgeConf = #{
+        <<"bridge_mode">> => false,
+        <<"clean_start">> => true,
+        <<"keepalive">> => <<"60s">>,
+        <<"proto_ver">> => <<"v4">>,
+        <<"server">> => <<"127.0.0.1:1883">>,
+        <<"ssl">> =>
+            #{
+                %% needed to trigger pre_config_update
+                <<"certfile">> => cert_file("certfile"),
+                <<"enable">> => true
+            }
+    },
+    Conf = #{<<"mqtt">> => #{BadBridgeName => BridgeConf}},
+    Path = [bridges],
+    ?assertMatch(
+        {error,
+            {pre_config_update, _ConfigHandlerMod, #{
+                kind := validation_error,
+                reason := bad_bridge_names,
+                bad_bridges := [#{type := <<"mqtt">>, name := BadBridgeName}]
+            }}},
+        emqx:update_config(Path, Conf)
+    ),
+    ok.
+
 data_file(Name) ->
     Dir = code:lib_dir(emqx_bridge, test),
     {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),

+ 7 - 1
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -1362,7 +1362,13 @@ t_create_with_bad_name(Config) ->
             Config
         ),
     Msg = emqx_utils_json:decode(Msg0, [return_maps]),
-    ?assertMatch(#{<<"reason">> := <<"bad_bridge_name">>}, Msg),
+    ?assertMatch(
+        #{
+            <<"kind">> := <<"validation_error">>,
+            <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
+        },
+        Msg
+    ),
     ok.
 
 validate_resource_request_ttl(single, Timeout, Name) ->

+ 29 - 2
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -142,7 +142,8 @@ con_schema() ->
 fields("connector") ->
     [
         {enable, hoconsc:mk(any(), #{})},
-        {resource_opts, hoconsc:mk(map(), #{})}
+        {resource_opts, hoconsc:mk(map(), #{})},
+        {ssl, hoconsc:ref(ssl)}
     ];
 fields("api_post") ->
     [
@@ -151,7 +152,9 @@ fields("api_post") ->
         {type, hoconsc:mk(bridge_type(), #{})},
         {send_to, hoconsc:mk(atom(), #{})}
         | fields("connector")
-    ].
+    ];
+fields(ssl) ->
+    emqx_schema:client_ssl_opts_schema(#{required => false}).
 
 con_config() ->
     #{
@@ -798,3 +801,27 @@ t_scenario_2(Config) ->
     ?assert(is_rule_enabled(RuleId2)),
 
     ok.
+
+t_create_with_bad_name(_Config) ->
+    BadBridgeName = <<"test_哈哈">>,
+    %% Note: must contain SSL options to trigger bug.
+    Cacertfile = emqx_common_test_helpers:app_path(
+        emqx,
+        filename:join(["etc", "certs", "cacert.pem"])
+    ),
+    Opts = #{
+        name => BadBridgeName,
+        overrides => #{
+            <<"ssl">> =>
+                #{<<"cacertfile">> => Cacertfile}
+        }
+    },
+    {error,
+        {{_, 400, _}, _, #{
+            <<"code">> := <<"BAD_REQUEST">>,
+            <<"message">> := #{
+                <<"kind">> := <<"validation_error">>,
+                <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
+            }
+        }}} = create_bridge_http_api_v1(Opts),
+    ok.

+ 58 - 10
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -20,6 +20,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -43,7 +44,7 @@ con_schema() ->
         {
             con_type(),
             hoconsc:mk(
-                hoconsc:map(name, typerefl:map()),
+                hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)),
                 #{
                     desc => <<"Test Connector Config">>,
                     required => false
@@ -52,6 +53,15 @@ con_schema() ->
         }
     ].
 
+fields(connector_config) ->
+    [
+        {enable, hoconsc:mk(typerefl:boolean(), #{})},
+        {resource_opts, hoconsc:mk(typerefl:map(), #{})},
+        {on_start_fun, hoconsc:mk(typerefl:binary(), #{})},
+        {on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})},
+        {on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})}
+    ].
+
 con_config() ->
     #{
         <<"enable">> => true,
@@ -112,6 +122,7 @@ setup_mocks() ->
 
     catch meck:new(emqx_connector_schema, MeckOpts),
     meck:expect(emqx_connector_schema, fields, 1, con_schema()),
+    meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]),
 
     catch meck:new(emqx_connector_resource, MeckOpts),
     meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
@@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) ->
     ets:new(fun_table_name(), [named_table, public]),
     %% Create a fake connector
     {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
-    [
-        {mocked_mods, [
-            emqx_connector_schema,
-            emqx_connector_resource,
-
-            emqx_bridge_v2
-        ]}
-        | Config
-    ].
+    Config.
 
 end_per_testcase(_TestCase, _Config) ->
     ets:delete(fun_table_name()),
@@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) ->
     ),
     ok.
 
+t_lookup_status_when_connecting(_Config) ->
+    ResponseETS = ets:new(response_ets, [public]),
+    ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}),
+    OnGetStatusFun = wrap_fun(fun() ->
+        ets:lookup_element(ResponseETS, on_get_status_value, 2)
+    end),
+
+    ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{
+        <<"on_get_status_fun">> => OnGetStatusFun,
+        <<"resource_opts">> => #{<<"start_timeout">> => 100}
+    }),
+    ConnectorName = ?FUNCTION_NAME,
+    ct:pal("connector config:\n  ~p", [ConnectorConfig]),
+    {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig),
+
+    ActionName = my_test_action,
+    ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end),
+    ActionConfig = (bridge_config())#{
+        <<"on_get_channel_status_fun">> => ChanStatusFun,
+        <<"connector">> => atom_to_binary(ConnectorName)
+    },
+    ct:pal("action config:\n  ~p", [ActionConfig]),
+    {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig),
+
+    %% Top-level status is connecting if the connector status is connecting, but the
+    %% channel is not yet installed.  `resource_data.added_channels.$channel_id.status'
+    %% contains true internal status.
+    {ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName),
+    ?assertMatch(
+        #{
+            %% This is the action's public status
+            status := ?status_connecting,
+            resource_data :=
+                #{
+                    %% This is the connector's status
+                    status := ?status_connecting
+                }
+        },
+        Res
+    ),
+    #{resource_data := #{added_channels := Channels}} = Res,
+    [{_Id, ChannelData}] = maps:to_list(Channels),
+    ?assertMatch(#{status := ?status_disconnected}, ChannelData),
+    ok.
+
 %% Helper Functions
 
 wait_until(Fun) ->

+ 2 - 2
apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl

@@ -587,7 +587,7 @@ t_broken_bridge_config(Config) ->
                 <<"type">> := ?BRIDGE_TYPE,
                 <<"connector">> := <<"does_not_exist">>,
                 <<"status">> := <<"disconnected">>,
-                <<"error">> := <<"Pending installation">>
+                <<"error">> := <<"Not installed">>
             }
         ]},
         request_json(get, uri([?ROOT]), Config)
@@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) ->
                 <<"type">> := ?BRIDGE_TYPE,
                 <<"connector">> := <<"does_not_exist">>,
                 <<"status">> := <<"disconnected">>,
-                <<"error">> := <<"Pending installation">>
+                <<"error">> := <<"Not installed">>
             }
         ]},
         request_json(get, uri([?ROOT]), Config)

+ 2 - 2
apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl

@@ -43,8 +43,8 @@ on_start(
 ) ->
     Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef),
     Fun(Conf);
-on_start(_InstId, _Config) ->
-    {ok, #{}}.
+on_start(_InstId, Config) ->
+    {ok, Config}.
 
 on_add_channel(
     _InstId,

+ 5 - 5
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -481,11 +481,11 @@ on_get_status(
     case wolff_client_sup:find_client(ClientId) of
         {ok, Pid} ->
             case wolff_client:check_connectivity(Pid) of
-                ok -> connected;
-                {error, Error} -> {connecting, State, Error}
+                ok -> ?status_connected;
+                {error, Error} -> {?status_connecting, State, Error}
             end;
         {error, _Reason} ->
-            connecting
+            ?status_connecting
     end.
 
 on_get_channel_status(
@@ -499,10 +499,10 @@ on_get_channel_status(
     #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
     try
         ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
-        connected
+        ?status_connected
     catch
         throw:#{reason := restarting} ->
-            conneting
+            ?status_connecting
     end.
 
 check_topic_and_leader_connections(ClientId, KafkaTopic) ->

+ 154 - 102
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -23,8 +23,14 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("brod/include/brod.hrl").
 
+-import(emqx_common_test_helpers, [on_exit/1]).
+
 -define(TYPE, kafka_producer).
 
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -51,6 +57,135 @@ end_per_suite(Config) ->
     emqx_cth_suite:stop(Apps),
     ok.
 
+init_per_testcase(_TestCase, Config) ->
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    emqx_common_test_helpers:call_janitor(60_000),
+    ok.
+
+%%-------------------------------------------------------------------------------------
+%% Helper fns
+%%-------------------------------------------------------------------------------------
+
+check_send_message_with_bridge(BridgeName) ->
+    %% ######################################
+    %% Create Kafka message
+    %% ######################################
+    Time = erlang:unique_integer(),
+    BinTime = integer_to_binary(Time),
+    Payload = list_to_binary("payload" ++ integer_to_list(Time)),
+    Msg = #{
+        clientid => BinTime,
+        payload => Payload,
+        timestamp => Time
+    },
+    Offset = resolve_kafka_offset(),
+    %% ######################################
+    %% Send message
+    %% ######################################
+    emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
+    %% ######################################
+    %% Check if message is sent to Kafka
+    %% ######################################
+    check_kafka_message_payload(Offset, Payload).
+
+resolve_kafka_offset() ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    Partition = 0,
+    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
+        Hosts, KafkaTopic, Partition
+    ),
+    Offset0.
+
+check_kafka_message_payload(Offset, ExpectedPayload) ->
+    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
+    Partition = 0,
+    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
+    {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
+    ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
+
+bridge_v2_config(ConnectorName) ->
+    #{
+        <<"connector">> => ConnectorName,
+        <<"enable">> => true,
+        <<"kafka">> => #{
+            <<"buffer">> => #{
+                <<"memory_overload_protection">> => false,
+                <<"mode">> => <<"memory">>,
+                <<"per_partition_limit">> => <<"2GB">>,
+                <<"segment_bytes">> => <<"100MB">>
+            },
+            <<"compression">> => <<"no_compression">>,
+            <<"kafka_header_value_encode_mode">> => <<"none">>,
+            <<"max_batch_bytes">> => <<"896KB">>,
+            <<"max_inflight">> => 10,
+            <<"message">> => #{
+                <<"key">> => <<"${.clientid}">>,
+                <<"timestamp">> => <<"${.timestamp}">>,
+                <<"value">> => <<"${.payload}">>
+            },
+            <<"partition_count_refresh_interval">> => <<"60s">>,
+            <<"partition_strategy">> => <<"random">>,
+            <<"query_mode">> => <<"sync">>,
+            <<"required_acks">> => <<"all_isr">>,
+            <<"sync_query_timeout">> => <<"5s">>,
+            <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
+        },
+        <<"local_topic">> => <<"kafka_t/#">>,
+        <<"resource_opts">> => #{
+            <<"health_check_interval">> => <<"15s">>
+        }
+    }.
+
+connector_config() ->
+    #{
+        <<"authentication">> => <<"none">>,
+        <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
+        <<"connect_timeout">> => <<"5s">>,
+        <<"enable">> => true,
+        <<"metadata_request_timeout">> => <<"5s">>,
+        <<"min_metadata_refresh_interval">> => <<"3s">>,
+        <<"socket_opts">> =>
+            #{
+                <<"recbuf">> => <<"1024KB">>,
+                <<"sndbuf">> => <<"1024KB">>,
+                <<"tcp_keepalive">> => <<"none">>
+            },
+        <<"ssl">> =>
+            #{
+                <<"ciphers">> => [],
+                <<"depth">> => 10,
+                <<"enable">> => false,
+                <<"hibernate_after">> => <<"5s">>,
+                <<"log_level">> => <<"notice">>,
+                <<"reuse_sessions">> => true,
+                <<"secure_renegotiate">> => true,
+                <<"verify">> => <<"verify_peer">>,
+                <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
+            }
+    }.
+
+kafka_hosts_string() ->
+    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
+    KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
+    KafkaHost ++ ":" ++ KafkaPort.
+
+create_connector(Name, Config) ->
+    Res = emqx_connector:create(?TYPE, Name, Config),
+    on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
+    Res.
+
+create_action(Name, Config) ->
+    Res = emqx_bridge_v2:create(?TYPE, Name, Config),
+    on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
+    Res.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
 t_create_remove_list(_) ->
     [] = emqx_bridge_v2:list(),
     ConnectorConfig = connector_config(),
@@ -187,106 +322,23 @@ t_unknown_topic(_Config) ->
     ),
     ok.
 
-check_send_message_with_bridge(BridgeName) ->
-    %% ######################################
-    %% Create Kafka message
-    %% ######################################
-    Time = erlang:unique_integer(),
-    BinTime = integer_to_binary(Time),
-    Payload = list_to_binary("payload" ++ integer_to_list(Time)),
-    Msg = #{
-        clientid => BinTime,
-        payload => Payload,
-        timestamp => Time
-    },
-    Offset = resolve_kafka_offset(),
-    %% ######################################
-    %% Send message
-    %% ######################################
-    emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
-    %% ######################################
-    %% Check if message is sent to Kafka
-    %% ######################################
-    check_kafka_message_payload(Offset, Payload).
-
-resolve_kafka_offset() ->
-    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
-    Partition = 0,
-    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
-    {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
-        Hosts, KafkaTopic, Partition
+t_bad_url(_Config) ->
+    ConnectorName = <<"test_connector">>,
+    ActionName = <<"test_action">>,
+    ActionConfig = bridge_v2_config(<<"test_connector">>),
+    ConnectorConfig0 = connector_config(),
+    ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>},
+    ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
+    ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
+    ?assertMatch(
+        {ok, #{
+            resource_data :=
+                #{
+                    status := connecting,
+                    error := [#{reason := unresolvable_hostname}]
+                }
+        }},
+        emqx_connector:lookup(?TYPE, ConnectorName)
     ),
-    Offset0.
-
-check_kafka_message_payload(Offset, ExpectedPayload) ->
-    KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
-    Partition = 0,
-    Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
-    {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
-    ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
-
-bridge_v2_config(ConnectorName) ->
-    #{
-        <<"connector">> => ConnectorName,
-        <<"enable">> => true,
-        <<"kafka">> => #{
-            <<"buffer">> => #{
-                <<"memory_overload_protection">> => false,
-                <<"mode">> => <<"memory">>,
-                <<"per_partition_limit">> => <<"2GB">>,
-                <<"segment_bytes">> => <<"100MB">>
-            },
-            <<"compression">> => <<"no_compression">>,
-            <<"kafka_header_value_encode_mode">> => <<"none">>,
-            <<"max_batch_bytes">> => <<"896KB">>,
-            <<"max_inflight">> => 10,
-            <<"message">> => #{
-                <<"key">> => <<"${.clientid}">>,
-                <<"timestamp">> => <<"${.timestamp}">>,
-                <<"value">> => <<"${.payload}">>
-            },
-            <<"partition_count_refresh_interval">> => <<"60s">>,
-            <<"partition_strategy">> => <<"random">>,
-            <<"query_mode">> => <<"sync">>,
-            <<"required_acks">> => <<"all_isr">>,
-            <<"sync_query_timeout">> => <<"5s">>,
-            <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
-        },
-        <<"local_topic">> => <<"kafka_t/#">>,
-        <<"resource_opts">> => #{
-            <<"health_check_interval">> => <<"15s">>
-        }
-    }.
-
-connector_config() ->
-    #{
-        <<"authentication">> => <<"none">>,
-        <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
-        <<"connect_timeout">> => <<"5s">>,
-        <<"enable">> => true,
-        <<"metadata_request_timeout">> => <<"5s">>,
-        <<"min_metadata_refresh_interval">> => <<"3s">>,
-        <<"socket_opts">> =>
-            #{
-                <<"recbuf">> => <<"1024KB">>,
-                <<"sndbuf">> => <<"1024KB">>,
-                <<"tcp_keepalive">> => <<"none">>
-            },
-        <<"ssl">> =>
-            #{
-                <<"ciphers">> => [],
-                <<"depth">> => 10,
-                <<"enable">> => false,
-                <<"hibernate_after">> => <<"5s">>,
-                <<"log_level">> => <<"notice">>,
-                <<"reuse_sessions">> => true,
-                <<"secure_renegotiate">> => true,
-                <<"verify">> => <<"verify_peer">>,
-                <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
-            }
-    }.
-
-kafka_hosts_string() ->
-    KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
-    KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
-    KafkaHost ++ ":" ++ KafkaPort.
+    ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
+    ok.

+ 64 - 6
apps/emqx_connector/src/emqx_connector.erl

@@ -108,18 +108,28 @@ config_key_path() ->
 pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
     {ok, RawConf};
 pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
-    {ok, convert_certs(NewConf)};
+    case multi_validate_connector_names(NewConf) of
+        ok ->
+            {ok, convert_certs(NewConf)};
+        Error ->
+            Error
+    end;
 pre_config_update(_, {_Oper, _, _}, undefined) ->
     {error, connector_not_found};
 pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
     %% to save the 'enable' to the config files
     {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
 pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
-    case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
-        {error, Reason} ->
-            {error, Reason};
-        {ok, ConfNew} ->
-            {ok, ConfNew}
+    case validate_connector_name_in_config(Path) of
+        ok ->
+            case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
+                {error, Reason} ->
+                    {error, Reason};
+                {ok, ConfNew} ->
+                    {ok, ConfNew}
+            end;
+        Error ->
+            Error
     end.
 
 operation_to_enable(disable) -> false;
@@ -458,3 +468,51 @@ ensure_no_channels(Configs) ->
         {error, Reason, _State} ->
             {error, Reason}
     end.
+
+to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+to_bin(B) when is_binary(B) -> B.
+
+validate_connector_name(ConnectorName) ->
+    try
+        _ = emqx_resource:validate_name(to_bin(ConnectorName)),
+        ok
+    catch
+        throw:Error ->
+            {error, Error}
+    end.
+
+validate_connector_name_in_config(Path) ->
+    case Path of
+        [?ROOT_KEY, _ConnectorType, ConnectorName] ->
+            validate_connector_name(ConnectorName);
+        _ ->
+            ok
+    end.
+
+multi_validate_connector_names(Conf) ->
+    ConnectorTypeAndNames =
+        [
+            {Type, Name}
+         || {Type, NameToConf} <- maps:to_list(Conf),
+            {Name, _Conf} <- maps:to_list(NameToConf)
+        ],
+    BadConnectors =
+        lists:filtermap(
+            fun({Type, Name}) ->
+                case validate_connector_name(Name) of
+                    ok -> false;
+                    _Error -> {true, #{type => Type, name => Name}}
+                end
+            end,
+            ConnectorTypeAndNames
+        ),
+    case BadConnectors of
+        [] ->
+            ok;
+        [_ | _] ->
+            {error, #{
+                kind => validation_error,
+                reason => bad_connector_names,
+                bad_connectors => BadConnectors
+            }}
+    end.

+ 65 - 0
apps/emqx_connector/test/emqx_connector_SUITE.erl

@@ -204,6 +204,71 @@ t_remove_fail(_Config) ->
     ),
     ok.
 
+t_create_with_bad_name_direct_path({init, Config}) ->
+    meck:new(emqx_connector_ee_schema, [passthrough]),
+    meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
+    meck:new(?CONNECTOR, [non_strict]),
+    meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
+    meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
+    meck:expect(?CONNECTOR, on_stop, 2, ok),
+    meck:expect(?CONNECTOR, on_get_status, 2, connected),
+    Config;
+t_create_with_bad_name_direct_path({'end', _Config}) ->
+    meck:unload(),
+    ok;
+t_create_with_bad_name_direct_path(_Config) ->
+    Path = [connectors, kafka_producer, 'test_哈哈'],
+    ConnConfig0 = connector_config(),
+    %% Note: must contain SSL options to trigger original bug.
+    Cacertfile = emqx_common_test_helpers:app_path(
+        emqx,
+        filename:join(["etc", "certs", "cacert.pem"])
+    ),
+    ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
+    ?assertMatch(
+        {error,
+            {pre_config_update, _ConfigHandlerMod, #{
+                kind := validation_error,
+                reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>
+            }}},
+        emqx:update_config(Path, ConnConfig)
+    ),
+    ok.
+
+t_create_with_bad_name_root_path({init, Config}) ->
+    meck:new(emqx_connector_ee_schema, [passthrough]),
+    meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR),
+    meck:new(?CONNECTOR, [non_strict]),
+    meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible),
+    meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}),
+    meck:expect(?CONNECTOR, on_stop, 2, ok),
+    meck:expect(?CONNECTOR, on_get_status, 2, connected),
+    Config;
+t_create_with_bad_name_root_path({'end', _Config}) ->
+    meck:unload(),
+    ok;
+t_create_with_bad_name_root_path(_Config) ->
+    Path = [connectors],
+    BadConnectorName = <<"test_哈哈">>,
+    ConnConfig0 = connector_config(),
+    %% Note: must contain SSL options to trigger original bug.
+    Cacertfile = emqx_common_test_helpers:app_path(
+        emqx,
+        filename:join(["etc", "certs", "cacert.pem"])
+    ),
+    ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
+    Conf = #{<<"kafka_producer">> => #{BadConnectorName => ConnConfig}},
+    ?assertMatch(
+        {error,
+            {pre_config_update, _ConfigHandlerMod, #{
+                kind := validation_error,
+                reason := bad_connector_names,
+                bad_connectors := [#{type := <<"kafka_producer">>, name := BadConnectorName}]
+            }}},
+        emqx:update_config(Path, Conf)
+    ),
+    ok.
+
 %% helpers
 
 connector_config() ->

+ 22 - 0
apps/emqx_connector/test/emqx_connector_api_SUITE.erl

@@ -652,6 +652,28 @@ t_connectors_probe(Config) ->
     ),
     ok.
 
+t_create_with_bad_name(Config) ->
+    ConnectorName = <<"test_哈哈">>,
+    Conf0 = ?KAFKA_CONNECTOR(ConnectorName),
+    %% Note: must contain SSL options to trigger original bug.
+    Cacertfile = emqx_common_test_helpers:app_path(
+        emqx,
+        filename:join(["etc", "certs", "cacert.pem"])
+    ),
+    Conf = Conf0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}},
+    {ok, 400, #{
+        <<"code">> := <<"BAD_REQUEST">>,
+        <<"message">> := Msg0
+    }} = request_json(
+        post,
+        uri(["connectors"]),
+        Conf,
+        Config
+    ),
+    Msg = emqx_utils_json:decode(Msg0, [return_maps]),
+    ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg),
+    ok.
+
 %%% helpers
 listen_on_random_port() ->
     SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],

+ 16 - 2
apps/emqx_resource/include/emqx_resource.hrl

@@ -13,6 +13,16 @@
 %% See the License for the specific language governing permissions and
 %% limitations under the License.
 %%--------------------------------------------------------------------
+
+%% bridge/connector/action status
+-define(status_connected, connected).
+-define(status_connecting, connecting).
+-define(status_disconnected, disconnected).
+%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'...  Modules
+%% implementing `emqx_resource' behavior should not return it.  The `rm_' prefix is to
+%% remind us of that.
+-define(rm_status_stopped, stopped).
+
 -type resource_type() :: module().
 -type resource_id() :: binary().
 -type channel_id() :: binary().
@@ -21,8 +31,12 @@
 -type resource_config() :: term().
 -type resource_spec() :: map().
 -type resource_state() :: term().
--type resource_status() :: connected | disconnected | connecting | stopped.
--type channel_status() :: connected | connecting | disconnected.
+%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'...  Modules
+%% implementing `emqx_resource' behavior should not return it.
+-type resource_status() ::
+    ?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped.
+-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting.
+-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected.
 -type callback_mode() :: always_sync | async_if_possible.
 -type query_mode() ::
     simple_sync

+ 10 - 18
apps/emqx_resource/src/emqx_resource.erl

@@ -815,29 +815,21 @@ validate_name(<<>>, _Opts) ->
     invalid_data("name cannot be empty string");
 validate_name(Name, _Opts) when size(Name) >= 255 ->
     invalid_data("name length must be less than 255");
-validate_name(Name0, Opts) ->
-    Name = unicode:characters_to_list(Name0, utf8),
-    case lists:all(fun is_id_char/1, Name) of
-        true ->
+validate_name(Name, Opts) ->
+    case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of
+        match ->
             case maps:get(atom_name, Opts, true) of
-                % NOTE
-                % Rule may be created before bridge, thus not `list_to_existing_atom/1`,
-                % also it is infrequent user input anyway.
-                true -> list_to_atom(Name);
-                false -> Name0
+                %% NOTE
+                %% Rule may be created before bridge, thus not `list_to_existing_atom/1`,
+                %% also it is infrequent user input anyway.
+                true -> binary_to_atom(Name, utf8);
+                false -> Name
             end;
-        false ->
+        nomatch ->
             invalid_data(
-                <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name0/binary>>
+                <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>>
             )
     end.
 
 -spec invalid_data(binary()) -> no_return().
 invalid_data(Reason) -> throw(#{kind => validation_error, reason => Reason}).
-
-is_id_char(C) when C >= $0 andalso C =< $9 -> true;
-is_id_char(C) when C >= $a andalso C =< $z -> true;
-is_id_char(C) when C >= $A andalso C =< $Z -> true;
-is_id_char($_) -> true;
-is_id_char($-) -> true;
-is_id_char(_) -> false.