Przeglądaj źródła

Merge pull request #11877 from zmstone/1102-rename-kafka-to-config

Rename connector channel related configs in bridge_v2 to 'parameters'
Zaiming (Stone) Shi 2 lat temu
rodzic
commit
f19904d43c

+ 1 - 1
apps/emqx/rebar.config

@@ -30,7 +30,7 @@
     {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.7"}}},
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.16"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.2.1"}}},
-    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
+    {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.19"}}},
     {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

+ 17 - 1
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -900,7 +900,7 @@ format_resource(
         case emqx_bridge_v2:is_bridge_v2_type(Type) of
             true ->
                 %% The defaults are already filled in
-                RawConf;
+                downgrade_raw_conf(Type, RawConf);
             false ->
                 fill_defaults(Type, RawConf)
         end,
@@ -1164,3 +1164,19 @@ upgrade_type(Type) ->
 
 downgrade_type(Type) ->
     emqx_bridge_lib:downgrade_type(Type).
+
+%% TODO: move it to callback
+downgrade_raw_conf(kafka_producer, RawConf) ->
+    rename(<<"parameters">>, <<"kafka">>, RawConf);
+downgrade_raw_conf(azure_event_hub_producer, RawConf) ->
+    rename(<<"parameters">>, <<"kafka">>, RawConf);
+downgrade_raw_conf(_Type, RawConf) ->
+    RawConf.
+
+rename(OldKey, NewKey, Map) ->
+    case maps:find(OldKey, Map) of
+        {ok, Value} ->
+            maps:remove(OldKey, maps:put(NewKey, Value, Map));
+        error ->
+            Map
+    end.

+ 1 - 2
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -164,9 +164,8 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) ->
     end.
 
 common_field_names() ->
-    %% TODO: add 'config' to the list
     [
-        enable, description, local_topic, connector, resource_opts
+        enable, description, local_topic, connector, resource_opts, parameters
     ].
 
 -endif.

+ 13 - 4
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -82,7 +82,7 @@ fields("config_bridge_v2") ->
     fields(actions);
 fields("config_connector") ->
     Fields = override(
-        emqx_bridge_kafka:fields(kafka_connector),
+        emqx_bridge_kafka:fields("config_connector"),
         connector_overrides()
     ),
     override_documentations(Fields);
@@ -117,7 +117,7 @@ fields(kafka_message) ->
 fields(actions) ->
     Fields =
         override(
-            emqx_bridge_kafka:fields(producer_opts),
+            emqx_bridge_kafka:producer_opts(),
             bridge_v2_overrides()
         ) ++
             [
@@ -267,7 +267,7 @@ values(common_config) ->
     };
 values(producer) ->
     #{
-        kafka => #{
+        parameters => #{
             topic => <<"topic">>,
             message => #{
                 key => <<"${.clientid}">>,
@@ -378,18 +378,27 @@ producer_overrides() ->
                     )
                 }
             ),
+        %% NOTE: field 'kafka' is renamed to 'parameters' since e5.3.1
+        %% We will keep 'kafka' for backward compatibility.
+        %% TODO: delete this override when we upgrade bridge schema json to 0.2.0
+        %% See emqx_conf:bridge_schema_json/0
         kafka =>
             mk(ref(producer_kafka_opts), #{
                 required => true,
                 validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
             }),
+        parameters =>
+            mk(ref(producer_kafka_opts), #{
+                required => true,
+                validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1
+            }),
         ssl => mk(ref(ssl_client_opts), #{default => #{<<"enable">> => true}}),
         type => mk(azure_event_hub_producer, #{required => true})
     }.
 
 bridge_v2_overrides() ->
     #{
-        kafka =>
+        parameters =>
             mk(ref(producer_kafka_opts), #{
                 required => true,
                 validator => fun emqx_bridge_kafka:producer_strategy_key_validator/1

+ 89 - 83
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -28,10 +28,14 @@
     fields/1,
     desc/1,
     host_opts/0,
-    ssl_client_opts_fields/0
+    ssl_client_opts_fields/0,
+    producer_opts/0
 ]).
 
--export([kafka_producer_converter/2, producer_strategy_key_validator/1]).
+-export([
+    kafka_producer_converter/2,
+    producer_strategy_key_validator/1
+]).
 
 %% -------------------------------------------------------------------------------------------------
 %% api
@@ -251,15 +255,13 @@ fields("get_" ++ Type) ->
 fields("config_bridge_v2") ->
     fields(kafka_producer_action);
 fields("config_connector") ->
-    fields(kafka_connector);
+    connector_config_fields();
 fields("config_producer") ->
     fields(kafka_producer);
 fields("config_consumer") ->
     fields(kafka_consumer);
-fields(kafka_connector) ->
-    fields("config");
 fields(kafka_producer) ->
-    fields("config") ++ fields(producer_opts);
+    connector_config_fields() ++ producer_opts();
 fields(kafka_producer_action) ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@@ -268,49 +270,9 @@ fields(kafka_producer_action) ->
                 desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
             })},
         {description, emqx_schema:description_schema()}
-    ] ++ fields(producer_opts);
+    ] ++ producer_opts();
 fields(kafka_consumer) ->
-    fields("config") ++ fields(consumer_opts);
-fields("config") ->
-    [
-        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
-        {description, emqx_schema:description_schema()},
-        {bootstrap_hosts,
-            mk(
-                binary(),
-                #{
-                    required => true,
-                    desc => ?DESC(bootstrap_hosts),
-                    validator => emqx_schema:servers_validator(
-                        host_opts(), _Required = true
-                    )
-                }
-            )},
-        {connect_timeout,
-            mk(emqx_schema:timeout_duration_ms(), #{
-                default => <<"5s">>,
-                desc => ?DESC(connect_timeout)
-            })},
-        {min_metadata_refresh_interval,
-            mk(
-                emqx_schema:timeout_duration_ms(),
-                #{
-                    default => <<"3s">>,
-                    desc => ?DESC(min_metadata_refresh_interval)
-                }
-            )},
-        {metadata_request_timeout,
-            mk(emqx_schema:timeout_duration_ms(), #{
-                default => <<"5s">>,
-                desc => ?DESC(metadata_request_timeout)
-            })},
-        {authentication,
-            mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
-                default => none, desc => ?DESC("authentication")
-            })},
-        {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
-        {ssl, mk(ref(ssl_client_opts), #{})}
-    ];
+    connector_config_fields() ++ fields(consumer_opts);
 fields(ssl_client_opts) ->
     ssl_client_opts_fields();
 fields(auth_username_password) ->
@@ -369,20 +331,6 @@ fields(socket_opts) ->
                 validator => fun emqx_schema:validate_tcp_keepalive/1
             })}
     ];
-fields(producer_opts) ->
-    [
-        %% Note: there's an implicit convention in `emqx_bridge' that,
-        %% for egress bridges with this config, the published messages
-        %% will be forwarded to such bridges.
-        {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
-        {kafka,
-            mk(ref(producer_kafka_opts), #{
-                required => true,
-                desc => ?DESC(producer_kafka_opts),
-                validator => fun producer_strategy_key_validator/1
-            })},
-        {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
-    ];
 fields(producer_kafka_opts) ->
     [
         {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
@@ -580,7 +528,7 @@ fields(resource_opts) ->
     CreationOpts = emqx_resource_schema:create_opts(_Overrides = []),
     lists:filter(fun({Field, _}) -> lists:member(Field, SupportedFields) end, CreationOpts).
 
-desc("config") ->
+desc("config_connector") ->
     ?DESC("desc_config");
 desc(resource_opts) ->
     ?DESC(emqx_resource_schema, "resource_opts");
@@ -599,34 +547,86 @@ desc("post_" ++ Type) when
 desc(kafka_producer_action) ->
     ?DESC("kafka_producer_action");
 desc(Name) ->
-    lists:member(Name, struct_names()) orelse throw({missing_desc, Name}),
     ?DESC(Name).
 
-struct_names() ->
+connector_config_fields() ->
     [
-        auth_gssapi_kerberos,
-        auth_username_password,
-        kafka_message,
-        kafka_producer,
-        kafka_consumer,
-        producer_buffer,
-        producer_kafka_opts,
-        socket_opts,
-        producer_opts,
-        consumer_opts,
-        consumer_kafka_opts,
-        consumer_topic_mapping,
-        producer_kafka_ext_headers,
-        ssl_client_opts
+        {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+        {description, emqx_schema:description_schema()},
+        {bootstrap_hosts,
+            mk(
+                binary(),
+                #{
+                    required => true,
+                    desc => ?DESC(bootstrap_hosts),
+                    validator => emqx_schema:servers_validator(
+                        host_opts(), _Required = true
+                    )
+                }
+            )},
+        {connect_timeout,
+            mk(emqx_schema:timeout_duration_ms(), #{
+                default => <<"5s">>,
+                desc => ?DESC(connect_timeout)
+            })},
+        {min_metadata_refresh_interval,
+            mk(
+                emqx_schema:timeout_duration_ms(),
+                #{
+                    default => <<"3s">>,
+                    desc => ?DESC(min_metadata_refresh_interval)
+                }
+            )},
+        {metadata_request_timeout,
+            mk(emqx_schema:timeout_duration_ms(), #{
+                default => <<"5s">>,
+                desc => ?DESC(metadata_request_timeout)
+            })},
+        {authentication,
+            mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
+                default => none, desc => ?DESC("authentication")
+            })},
+        {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
+        {ssl, mk(ref(ssl_client_opts), #{})}
     ].
 
+producer_opts() ->
+    [
+        %% Note: there's an implicit convention in `emqx_bridge' that,
+        %% for egress bridges with this config, the published messages
+        %% will be forwarded to such bridges.
+        {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
+        parameters_field(),
+        {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
+    ].
+
+%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
+%% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0)
+%% since schema is data for the 'schemas' API.
+parameters_field() ->
+    {Name, Alias} =
+        case get(emqx_bridge_schema_version) of
+            <<"0.1.0">> ->
+                {kafka, parameters};
+            _ ->
+                {parameters, kafka}
+        end,
+    {Name,
+        mk(ref(producer_kafka_opts), #{
+            required => true,
+            aliases => [Alias],
+            desc => ?DESC(producer_kafka_opts),
+            validator => fun producer_strategy_key_validator/1
+        })}.
+
 %% -------------------------------------------------------------------------------------------------
 %% internal
 type_field(BridgeV2Type) when BridgeV2Type =:= "connector"; BridgeV2Type =:= "bridge_v2" ->
     {type, mk(enum([kafka_producer]), #{required => true, desc => ?DESC("desc_type")})};
 type_field(_) ->
     {type,
-        mk(enum([kafka_consumer, kafka, kafka_producer]), #{
+        %% 'kafka' is kept for backward compatibility
+        mk(enum([kafka, kafka_producer, kafka_consumer]), #{
             required => true, desc => ?DESC("desc_type")
         })}.
 
@@ -641,17 +641,23 @@ kafka_producer_converter(undefined, _HoconOpts) ->
 kafka_producer_converter(
     #{<<"producer">> := OldOpts0, <<"bootstrap_hosts">> := _} = Config0, _HoconOpts
 ) ->
-    %% old schema
+    %% prior to e5.0.2
     MQTTOpts = maps:get(<<"mqtt">>, OldOpts0, #{}),
     LocalTopic = maps:get(<<"topic">>, MQTTOpts, undefined),
     KafkaOpts = maps:get(<<"kafka">>, OldOpts0),
     Config = maps:without([<<"producer">>], Config0),
     case LocalTopic =:= undefined of
         true ->
-            Config#{<<"kafka">> => KafkaOpts};
+            Config#{<<"parameters">> => KafkaOpts};
         false ->
-            Config#{<<"kafka">> => KafkaOpts, <<"local_topic">> => LocalTopic}
+            Config#{<<"parameters">> => KafkaOpts, <<"local_topic">> => LocalTopic}
     end;
+kafka_producer_converter(
+    #{<<"kafka">> := _} = Config0, _HoconOpts
+) ->
+    %% from e5.0.2 to e5.3.0
+    {KafkaOpts, Config} = maps:take(<<"kafka">>, Config0),
+    Config#{<<"parameters">> => KafkaOpts};
 kafka_producer_converter(Config, _HoconOpts) ->
     %% new schema
     Config.

+ 2 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -35,7 +35,7 @@
 -define(kafka_client_id, kafka_client_id).
 -define(kafka_producers, kafka_producers).
 
-query_mode(#{kafka := #{query_mode := sync}}) ->
+query_mode(#{parameters := #{query_mode := sync}}) ->
     simple_sync_internal_buffer;
 query_mode(_) ->
     simple_async_internal_buffer.
@@ -111,7 +111,7 @@ create_producers_for_bridge_v2(
     ClientId,
     #{
         bridge_type := BridgeType,
-        kafka := KafkaConfig
+        parameters := KafkaConfig
     }
 ) ->
     #{

+ 19 - 9
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

@@ -6,6 +6,10 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
+-export([atoms/0]).
+%% ensure atoms exist
+atoms() -> [myproducer, my_consumer].
+
 %%===========================================================================
 %% Test cases
 %%===========================================================================
@@ -14,7 +18,6 @@ kafka_producer_test() ->
     Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
     Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
     Conf3 = parse(kafka_producer_new_hocon()),
-
     ?assertMatch(
         #{
             <<"bridges">> :=
@@ -22,7 +25,7 @@ kafka_producer_test() ->
                     <<"kafka_producer">> :=
                         #{
                             <<"myproducer">> :=
-                                #{<<"kafka">> := #{}}
+                                #{<<"parameters">> := #{}}
                         }
                 }
         },
@@ -49,7 +52,7 @@ kafka_producer_test() ->
                         #{
                             <<"myproducer">> :=
                                 #{
-                                    <<"kafka">> := #{},
+                                    <<"parameters">> := #{},
                                     <<"local_topic">> := <<"mqtt/local">>
                                 }
                         }
@@ -65,7 +68,7 @@ kafka_producer_test() ->
                         #{
                             <<"myproducer">> :=
                                 #{
-                                    <<"kafka">> := #{},
+                                    <<"parameters">> := #{},
                                     <<"local_topic">> := <<"mqtt/local">>
                                 }
                         }
@@ -156,12 +159,14 @@ message_key_dispatch_validations_test() ->
                     <<"message">> := #{<<"key">> := <<>>}
                 }
         },
-        emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf)
+        emqx_utils_maps:deep_get(
+            [<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf
+        )
     ),
     ?assertThrow(
         {_, [
             #{
-                path := "bridges.kafka_producer.myproducer.kafka",
+                path := "bridges.kafka_producer.myproducer.parameters",
                 reason := "Message key cannot be empty when `key_dispatch` strategy is used"
             }
         ]},
@@ -170,7 +175,7 @@ message_key_dispatch_validations_test() ->
     ?assertThrow(
         {_, [
             #{
-                path := "bridges.kafka_producer.myproducer.kafka",
+                path := "bridges.kafka_producer.myproducer.parameters",
                 reason := "Message key cannot be empty when `key_dispatch` strategy is used"
             }
         ]},
@@ -181,8 +186,6 @@ message_key_dispatch_validations_test() ->
 tcp_keepalive_validation_test_() ->
     ProducerConf = parse(kafka_producer_new_hocon()),
     ConsumerConf = parse(kafka_consumer_hocon()),
-    %% ensure atoms exist
-    _ = [my_producer, my_consumer],
     test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
         test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
 
@@ -358,3 +361,10 @@ bridges.kafka_consumer.my_consumer {
   }
 }
 """.
+
+%% assert compatibility
+bridge_schema_json_test() ->
+    JSON = iolist_to_binary(emqx_conf:bridge_schema_json()),
+    Map = emqx_utils_json:decode(JSON),
+    Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
+    ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).

+ 8 - 2
apps/emqx_conf/src/emqx_conf.erl

@@ -188,8 +188,14 @@ hotconf_schema_json() ->
 
 %% TODO: move this function to emqx_dashboard when we stop generating this JSON at build time.
 bridge_schema_json() ->
-    SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => <<"0.1.0">>},
-    gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo).
+    Version = <<"0.1.0">>,
+    SchemaInfo = #{title => <<"EMQX Data Bridge API Schema">>, version => Version},
+    put(emqx_bridge_schema_version, Version),
+    try
+        gen_api_schema_json_iodata(emqx_bridge_api, SchemaInfo)
+    after
+        erase(emqx_bridge_schema_version)
+    end.
 
 %% TODO: remove it and also remove hocon_md.erl and friends.
 %% markdown generation from schema is a failure and we are moving to an interactive

+ 21 - 0
apps/emqx_connector/src/emqx_connector_api.erl

@@ -453,9 +453,30 @@ update_connector(ConnectorType, ConnectorName, Conf) ->
     create_or_update_connector(ConnectorType, ConnectorName, Conf, 200).
 
 create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) ->
+    Check =
+        try
+            is_binary(ConnectorType) andalso emqx_resource:validate_type(ConnectorType),
+            ok = emqx_resource:validate_name(ConnectorName)
+        catch
+            throw:Error ->
+                ?BAD_REQUEST(map_to_json(Error))
+        end,
+    case Check of
+        ok ->
+            do_create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode);
+        BadRequest ->
+            BadRequest
+    end.
+
+do_create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) ->
     case emqx_connector:create(ConnectorType, ConnectorName, Conf) of
         {ok, _} ->
             lookup_from_all_nodes(ConnectorType, ConnectorName, HttpStatusCode);
+        {error, {PreOrPostConfigUpdate, _HandlerMod, Reason}} when
+            PreOrPostConfigUpdate =:= pre_config_update;
+            PreOrPostConfigUpdate =:= post_config_update
+        ->
+            ?BAD_REQUEST(map_to_json(redact(Reason)));
         {error, Reason} when is_map(Reason) ->
             ?BAD_REQUEST(map_to_json(redact(Reason)))
     end.

+ 1 - 1
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -43,7 +43,7 @@ connector_structs() ->
     [
         {kafka_producer,
             mk(
-                hoconsc:map(name, ref(emqx_bridge_kafka, "config")),
+                hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
                 #{
                     desc => <<"Kafka Connector Config">>,
                     required => false

+ 1 - 1
mix.exs

@@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
       # in conflict by emqtt and hocon
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
-      {:hocon, github: "emqx/hocon", tag: "0.39.16", override: true},
+      {:hocon, github: "emqx/hocon", tag: "0.39.19", override: true},
       {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},

+ 1 - 1
rebar.config

@@ -75,7 +75,7 @@
     , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
     , {getopt, "1.0.2"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
-    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}
+    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.19"}}}
     , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

+ 2 - 9
rel/i18n/emqx_bridge_kafka.hocon

@@ -283,13 +283,6 @@ config_enable.desc:
 config_enable.label:
 """Enable or Disable"""
 
-
-config_connector.desc:
-"""Reference to connector"""
-
-config_connector.label:
-"""Connector"""
-
 consumer_mqtt_payload.desc:
 """The template for transforming the incoming Kafka message.  By default, it will use JSON format to serialize inputs from the Kafka message.  Such fields are:
 <code>headers</code>: an object containing string key-value pairs.
@@ -316,10 +309,10 @@ kafka_consumer.label:
 """Kafka Consumer"""
 
 desc_config.desc:
-"""Configuration for a Kafka bridge."""
+"""Configuration for a Kafka Producer Client."""
 
 desc_config.label:
-"""Kafka Bridge Configuration"""
+"""Kafka Producer Client Configuration"""
 
 consumer_value_encoding_mode.desc:
 """Defines how the value from the Kafka message is encoded before being forwarded via MQTT.