Преглед изворни кода

Merge pull request #12082 from thalesmg/test-ensure-res-opts-conn-m-20231201

add missing `resource_opts` fields to connectors schemas
Thales Macedo Garitezi пре 2 година
родитељ
комит
984e2ccc74

+ 2 - 3
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -260,11 +260,10 @@ create(BridgeType, BridgeName, RawConf) ->
         #{override_to => cluster}
     ).
 
-%% NOTE: This function can cause broken references from rules but it is only
-%% called directly from test cases.
-
 -spec remove(bridge_v2_type(), bridge_v2_name()) -> ok | {error, any()}.
 remove(BridgeType, BridgeName) ->
+    %% NOTE: This function can cause broken references from rules but it is only
+    %% called directly from test cases.
     ?SLOG(debug, #{
         brige_action => remove,
         bridge_version => 2,

+ 70 - 2
apps/emqx_bridge/test/emqx_bridge_v2_tests.erl

@@ -16,6 +16,32 @@
 -module(emqx_bridge_v2_tests).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+non_deprecated_fields(Fields) ->
+    [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)].
+
+find_resource_opts_fields(SchemaMod, FieldName) ->
+    Fields = hocon_schema:fields(SchemaMod, FieldName),
+    case lists:keyfind(resource_opts, 1, Fields) of
+        false ->
+            undefined;
+        {resource_opts, ROSc} ->
+            get_resource_opts_subfields(ROSc)
+    end.
+
+get_resource_opts_subfields(Sc) ->
+    ?R_REF(SchemaModRO, FieldNameRO) = hocon_schema:field_schema(Sc, type),
+    ROFields = non_deprecated_fields(hocon_schema:fields(SchemaModRO, FieldNameRO)),
+    proplists:get_keys(ROFields).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
 
 resource_opts_union_connector_actions_test() ->
     %% The purpose of this test is to ensure we have split `resource_opts' fields
@@ -37,5 +63,47 @@ resource_opts_union_connector_actions_test() ->
     ),
     ok.
 
-non_deprecated_fields(Fields) ->
-    [K || {K, Schema} <- Fields, not hocon_schema:is_deprecated(Schema)].
+connector_resource_opts_test() ->
+    %% The purpose of this test is to ensure that all connectors have the `resource_opts'
+    %% field with at least some sub-fields that should always be present.
+    %% These are used by `emqx_resource_manager' itself to manage the resource lifecycle.
+    MinimumROFields = [
+        health_check_interval,
+        query_mode,
+        start_after_created,
+        start_timeout
+    ],
+    ConnectorSchemasRefs =
+        lists:map(
+            fun({Type, #{type := ?MAP(_, ?R_REF(SchemaMod, FieldName))}}) ->
+                {Type, find_resource_opts_fields(SchemaMod, FieldName)}
+            end,
+            emqx_connector_schema:fields(connectors)
+        ),
+    ConnectorsMissingRO = [Type || {Type, undefined} <- ConnectorSchemasRefs],
+    ConnectorsMissingROSubfields =
+        lists:filtermap(
+            fun
+                ({_Type, undefined}) ->
+                    false;
+                ({Type, Fs}) ->
+                    case MinimumROFields -- Fs of
+                        [] ->
+                            false;
+                        MissingFields ->
+                            {true, {Type, MissingFields}}
+                    end
+            end,
+            ConnectorSchemasRefs
+        ),
+    ?assertEqual(
+        #{
+            missing_resource_opts_field => #{},
+            missing_subfields => #{}
+        },
+        #{
+            missing_resource_opts_field => maps:from_keys(ConnectorsMissingRO, true),
+            missing_subfields => maps:from_list(ConnectorsMissingROSubfields)
+        }
+    ),
+    ok.

+ 1 - 1
apps/emqx_bridge_http/src/emqx_bridge_http.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_http, [
     {description, "EMQX HTTP Bridge and Connector Application"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_connector, emqx_resource, ehttpc]},
     {env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},

+ 2 - 3
apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl

@@ -24,7 +24,6 @@
 
 -export([
     bridge_v2_examples/1,
-    %%conn_bridge_examples/1,
     connector_examples/1
 ]).
 
@@ -169,7 +168,7 @@ basic_config() ->
                 }
             )},
         {description, emqx_schema:description_schema()}
-    ] ++ http_resource_opts() ++ connector_opts().
+    ] ++ connector_opts().
 
 request_config() ->
     [
@@ -321,7 +320,7 @@ http_resource_opts() ->
 connector_opts() ->
     mark_request_field_deperecated(
         proplists:delete(max_retries, emqx_bridge_http_connector:fields(config))
-    ).
+    ) ++ http_resource_opts().
 
 mark_request_field_deperecated(Fields) ->
     lists:map(

+ 5 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl

@@ -548,6 +548,8 @@ fields(consumer_kafka_opts) ->
                 #{default => <<"5s">>, desc => ?DESC(consumer_offset_commit_interval_seconds)}
             )}
     ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields(resource_opts) ->
     SupportedFields = [health_check_interval],
     CreationOpts = emqx_bridge_v2_schema:resource_opts_fields(),
@@ -568,6 +570,8 @@ desc("config_connector") ->
     ?DESC("desc_config");
 desc(resource_opts) ->
     ?DESC(emqx_resource_schema, "resource_opts");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc("get_" ++ Type) when
     Type =:= "consumer"; Type =:= "producer"; Type =:= "connector"; Type =:= "bridge_v2"
 ->
@@ -626,7 +630,7 @@ kafka_connector_config_fields() ->
             })},
         {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})},
         {ssl, mk(ref(ssl_client_opts), #{})}
-    ] ++ [resource_opts()].
+    ] ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts).
 
 producer_opts(ActionOrBridgeV1) ->
     [

+ 6 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl

@@ -53,7 +53,8 @@ fields("config") ->
     ];
 fields("config_connector") ->
     emqx_connector_schema:common_fields() ++
-        fields("connection_fields");
+        fields("connection_fields") ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
 fields("connection_fields") ->
     [
         {parameters,
@@ -93,6 +94,8 @@ fields(action_parameters) ->
         {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
         {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
     ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields(resource_opts) ->
     fields("creation_opts");
 fields(mongodb_rs) ->
@@ -202,6 +205,8 @@ desc("creation_opts") ->
     ?DESC(emqx_resource_schema, "creation_opts");
 desc(resource_opts) ->
     ?DESC(emqx_resource_schema, "resource_opts");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(mongodb_rs) ->
     ?DESC(mongodb_rs_conf);
 desc(mongodb_sharded) ->

+ 9 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl

@@ -54,7 +54,15 @@ bridge_v1_config_to_connector_config(BridgeV1Config) ->
     ConnectorTopLevelKeys = schema_keys("config_connector"),
     ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
     ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
-    make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
+    ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config),
+    emqx_utils_maps:update_if_present(
+        <<"resource_opts">>,
+        fun(ResourceOpts) ->
+            CommonROSubfields = emqx_connector_schema:common_resource_opts_subfields_bin(),
+            maps:with(CommonROSubfields, ResourceOpts)
+        end,
+        ConnConfig0
+    ).
 
 make_config_map(PickKeys, IndentKeys, Config) ->
     Conf0 = maps:with(PickKeys, Config),

+ 1 - 0
apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl

@@ -51,6 +51,7 @@ fields("config_connector") ->
                 )}
         ] ++
         emqx_redis:redis_fields() ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts) ++
         emqx_connector_schema_lib:ssl_fields();
 fields(action) ->
     {?TYPE,

+ 7 - 1
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl

@@ -93,7 +93,9 @@ roots() ->
     [{config, #{type => hoconsc:ref(?MODULE, config)}}].
 
 fields(config) ->
-    emqx_connector_schema:common_fields() ++ fields("connection_fields");
+    emqx_connector_schema:common_fields() ++
+        fields("connection_fields") ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
 fields("connection_fields") ->
     [
         {server, server()},
@@ -114,6 +116,8 @@ fields("connection_fields") ->
                 emqx_connector_schema_lib:pool_size(Other)
         end}
     ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields(Field) when
     Field == "get";
     Field == "post";
@@ -125,6 +129,8 @@ fields(Field) when
 
 desc(config) ->
     ?DESC("desc_config");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."];
 desc(_) ->

+ 7 - 1
apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl

@@ -77,7 +77,9 @@ namespace() -> "connector_syskeeper_proxy".
 roots() -> [].
 
 fields(config) ->
-    emqx_connector_schema:common_fields() ++ fields("connection_fields");
+    emqx_connector_schema:common_fields() ++
+        fields("connection_fields") ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
 fields("connection_fields") ->
     [
         {listen, listen()},
@@ -92,6 +94,8 @@ fields("connection_fields") ->
                 #{desc => ?DESC(handshake_timeout), default => <<"10s">>}
             )}
     ];
+fields(connector_resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields(Field) when
     Field == "get";
     Field == "post";
@@ -103,6 +107,8 @@ fields(Field) when
 
 desc(config) ->
     ?DESC("desc_config");
+desc(connector_resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."];
 desc(_) ->

+ 45 - 14
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -40,7 +40,13 @@
     type_and_name_fields/1
 ]).
 
--export([resource_opts_fields/0, resource_opts_fields/1]).
+-export([
+    common_resource_opts_subfields/0,
+    common_resource_opts_subfields_bin/0,
+    resource_opts_fields/0,
+    resource_opts_fields/1,
+    resource_opts_ref/2
+]).
 
 -export([examples/1]).
 
@@ -178,14 +184,19 @@ split_bridge_to_connector_and_action(
                 %% Get connector fields from bridge config
                 lists:foldl(
                     fun({ConnectorFieldName, _Spec}, ToTransformSoFar) ->
-                        case maps:is_key(to_bin(ConnectorFieldName), BridgeV1Conf) of
+                        ConnectorFieldNameBin = to_bin(ConnectorFieldName),
+                        case maps:is_key(ConnectorFieldNameBin, BridgeV1Conf) of
                             true ->
-                                NewToTransform = maps:put(
-                                    to_bin(ConnectorFieldName),
-                                    maps:get(to_bin(ConnectorFieldName), BridgeV1Conf),
+                                PrevFieldConfig =
+                                    project_to_connector_resource_opts(
+                                        ConnectorFieldNameBin,
+                                        maps:get(ConnectorFieldNameBin, BridgeV1Conf)
+                                    ),
+                                maps:put(
+                                    ConnectorFieldNameBin,
+                                    PrevFieldConfig,
                                     ToTransformSoFar
-                                ),
-                                NewToTransform;
+                                );
                             false ->
                                 ToTransformSoFar
                         end
@@ -213,6 +224,12 @@ split_bridge_to_connector_and_action(
         end,
     {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
 
+project_to_connector_resource_opts(<<"resource_opts">>, OldResourceOpts) ->
+    Subfields = common_resource_opts_subfields_bin(),
+    maps:with(Subfields, OldResourceOpts);
+project_to_connector_resource_opts(_, OldConfig) ->
+    OldConfig.
+
 transform_bridge_v1_config_to_action_config(
     BridgeV1Conf, ConnectorName, ConnectorConfSchemaMod, ConnectorConfSchemaName
 ) ->
@@ -497,19 +514,33 @@ status_and_actions_fields() ->
             )}
     ].
 
+resource_opts_ref(Module, RefName) ->
+    [
+        {resource_opts,
+            mk(
+                ref(Module, RefName),
+                emqx_resource_schema:resource_opts_meta()
+            )}
+    ].
+
+common_resource_opts_subfields() ->
+    [
+        health_check_interval,
+        query_mode,
+        start_after_created,
+        start_timeout
+    ].
+
+common_resource_opts_subfields_bin() ->
+    lists:map(fun atom_to_binary/1, common_resource_opts_subfields()).
+
 resource_opts_fields() ->
     resource_opts_fields(_Overrides = []).
 
 resource_opts_fields(Overrides) ->
     %% Note: these don't include buffer-related configurations because buffer workers are
     %% tied to the action.
-    ConnectorROFields = [
-        health_check_interval,
-        query_mode,
-        request_ttl,
-        start_after_created,
-        start_timeout
-    ],
+    ConnectorROFields = common_resource_opts_subfields(),
     lists:filter(
         fun({Key, _Sc}) -> lists:member(Key, ConnectorROFields) end,
         emqx_resource_schema:create_opts(Overrides)

+ 7 - 1
apps/emqx_postgresql/src/schema/emqx_postgresql_connector_schema.erl

@@ -49,7 +49,11 @@ fields("connection_fields") ->
         adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
         emqx_connector_schema_lib:ssl_fields();
 fields("config_connector") ->
-    fields("connection_fields") ++ emqx_connector_schema:common_fields();
+    fields("connection_fields") ++
+        emqx_connector_schema:common_fields() ++
+        emqx_connector_schema:resource_opts_ref(?MODULE, resource_opts);
+fields(resource_opts) ->
+    emqx_connector_schema:resource_opts_fields();
 fields(config) ->
     fields("config_connector") ++
         fields(action);
@@ -159,5 +163,7 @@ values(common) ->
 
 desc("config_connector") ->
     ?DESC("config_connector");
+desc(resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(_) ->
     undefined.

+ 1 - 1
apps/emqx_resource/src/schema/emqx_resource_schema.erl

@@ -23,7 +23,7 @@
 
 -export([namespace/0, roots/0, fields/1, desc/1]).
 
--export([create_opts/1]).
+-export([create_opts/1, resource_opts_meta/0]).
 
 %% range interval in ms
 -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1).