Преглед изворни кода

Merge pull request #11836 from sstrigler/kjell/break_out_connection_settings_from_bridge/EMQX-10805/EMQX-10770

AEH fixes, rename type and fix legacy v1 test suite
Ivan Dyachkov пре 2 година
родитељ
комит
5d516bfde9

+ 22 - 18
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -737,8 +737,8 @@ bridge_v2_type_to_connector_type(kafka) ->
     kafka_producer;
 bridge_v2_type_to_connector_type(kafka_producer) ->
     kafka_producer;
-bridge_v2_type_to_connector_type(azure_event_hub) ->
-    azure_event_hub.
+bridge_v2_type_to_connector_type(azure_event_hub_producer) ->
+    azure_event_hub_producer.
 
 %%====================================================================
 %% Data backup API
@@ -964,8 +964,8 @@ bridge_v1_type_to_bridge_v2_type(kafka) ->
     kafka_producer;
 bridge_v1_type_to_bridge_v2_type(kafka_producer) ->
     kafka_producer;
-bridge_v1_type_to_bridge_v2_type(azure_event_hub) ->
-    azure_event_hub.
+bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) ->
+    azure_event_hub_producer.
 
 %% This function should return true for all inputs that are bridge V1 types for
 %% bridges that have been refactored to bridge V2s, and for all all bridge V2
@@ -976,7 +976,7 @@ is_bridge_v2_type(<<"kafka_producer">>) ->
     true;
 is_bridge_v2_type(<<"kafka">>) ->
     true;
-is_bridge_v2_type(<<"azure_event_hub">>) ->
+is_bridge_v2_type(<<"azure_event_hub_producer">>) ->
     true;
 is_bridge_v2_type(_) ->
     false.
@@ -1385,33 +1385,37 @@ to_existing_atom(X) ->
         {error, _} -> throw(bad_atom)
     end.
 
-validate_referenced_connectors(Type0, ConnectorName0, BridgeName) ->
+validate_referenced_connectors(BridgeType, ConnectorNameBin, BridgeName) ->
     %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is
     %% identical to its matching connector type name.
     try
-        Type = to_existing_atom(Type0),
-        ConnectorName = to_existing_atom(ConnectorName0),
-        case emqx_config:get([connectors, Type, ConnectorName], undefined) of
+        {ConnectorName, ConnectorType} = to_connector(ConnectorNameBin, BridgeType),
+        case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
             undefined ->
-                {error, #{
-                    reason => "connector_not_found_or_wrong_type",
-                    type => Type,
-                    bridge_name => BridgeName,
-                    connector_name => ConnectorName
-                }};
+                throw(not_found);
             _ ->
                 ok
         end
     catch
-        throw:bad_atom ->
+        throw:not_found ->
             {error, #{
                 reason => "connector_not_found_or_wrong_type",
-                type => Type0,
+                connector_name => ConnectorNameBin,
                 bridge_name => BridgeName,
-                connector_name => ConnectorName0
+                bridge_type => BridgeType
             }}
     end.
 
+to_connector(ConnectorNameBin, BridgeType) ->
+    try
+        ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(to_existing_atom(BridgeType)),
+        ConnectorName = to_existing_atom(ConnectorNameBin),
+        {ConnectorName, ConnectorType}
+    catch
+        _:_ ->
+            throw(not_found)
+    end.
+
 multi_validate_referenced_connectors(Configs) ->
     Pipeline =
         lists:map(

+ 2 - 2
apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl

@@ -44,7 +44,7 @@ bridge_v2_structs() ->
                     required => false
                 }
             )},
-        {azure_event_hub,
+        {azure_event_hub_producer,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_azure_event_hub, bridge_v2)),
                 #{
@@ -57,7 +57,7 @@ bridge_v2_structs() ->
 api_schemas(Method) ->
     [
         api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"),
-        api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_bridge_v2")
+        api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2")
     ].
 
 api_ref(Module, Type, Method) ->

+ 19 - 6
apps/emqx_bridge/test/emqx_bridge_testlib.erl

@@ -249,32 +249,42 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
             Error
     end.
 
+make_message(Config, MakeMessageFun) ->
+    BridgeType = ?config(bridge_type, Config),
+    case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
+        true ->
+            BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
+            {BridgeId, MakeMessageFun()};
+        false ->
+            {send_message, MakeMessageFun()}
+    end.
+
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
 t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
-    ResourceId = resource_id(Config),
     ?check_trace(
         begin
             ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ResourceId = resource_id(Config),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
-            Message = {send_message, MakeMessageFun()},
+            Message = make_message(Config, MakeMessageFun),
             IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
             ok
         end,
         fun(Trace) ->
+            ResourceId = resource_id(Config),
             ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
         end
     ),
     ok.
 
 t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
-    ResourceId = resource_id(Config),
     ReplyFun =
         fun(Pid, Result) ->
             Pid ! {result, Result}
@@ -282,12 +292,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
     ?check_trace(
         begin
             ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ResourceId = resource_id(Config),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
-            Message = {send_message, MakeMessageFun()},
+            Message = make_message(Config, MakeMessageFun),
             ?assertMatch(
                 {ok, {ok, _}},
                 ?wait_async_action(
@@ -301,6 +312,7 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
             ok
         end,
         fun(Trace) ->
+            ResourceId = resource_id(Config),
             ?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
         end
     ),
@@ -342,7 +354,6 @@ t_start_stop(Config, StopTracePoint) ->
     t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint).
 
 t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
-    ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
     ?check_trace(
         begin
             %% Check that the bridge probe API doesn't leak atoms.
@@ -365,6 +376,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
             ?assertEqual(AtomsBefore, AtomsAfter),
 
             ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
+            ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
 
             %% Since the connection process is async, we give it some time to
             %% stabilize and avoid flakiness.
@@ -428,6 +440,7 @@ t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
             ok
         end,
         fun(Trace) ->
+            ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
             %% one for each probe, two for real
             ?assertMatch(
                 [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
@@ -445,9 +458,9 @@ t_on_get_status(Config, Opts) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
-    ResourceId = resource_id(Config),
     FailureStatus = maps:get(failure_status, Opts, disconnected),
     ?assertMatch({ok, _}, create_bridge(Config)),
+    ResourceId = resource_id(Config),
     %% Since the connection process is async, we give it some time to
     %% stabilize and avoid flakiness.
     ?retry(

+ 16 - 16
apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl

@@ -120,16 +120,16 @@ setup_mocks() ->
     meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()),
 
     catch meck:new(emqx_bridge_v2, MeckOpts),
-    meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
+    BridgeType = bridge_type(),
+    BridgeTypeBin = atom_to_binary(BridgeType),
+    meck:expect(
+        emqx_bridge_v2,
+        bridge_v2_type_to_connector_type,
+        fun(Type) when Type =:= BridgeType; Type =:= BridgeTypeBin -> con_type() end
+    ),
     meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
-    IsBridgeV2TypeFun = fun(Type) ->
-        BridgeV2Type = bridge_type(),
-        case Type of
-            BridgeV2Type -> true;
-            _ -> false
-        end
-    end,
-    meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun),
+
+    meck:expect(emqx_bridge_v2, is_bridge_v2_type, fun(Type) -> Type =:= BridgeType end),
     ok.
 
 init_per_suite(Config) ->
@@ -519,8 +519,8 @@ t_load_no_matching_connector(_Config) ->
         {error,
             {post_config_update, _HandlerMod, #{
                 bridge_name := my_test_bridge_update,
-                connector_name := unknown,
-                type := _,
+                connector_name := <<"unknown">>,
+                bridge_type := _,
                 reason := "connector_not_found_or_wrong_type"
             }}},
         update_root_config(RootConf0)
@@ -536,8 +536,8 @@ t_load_no_matching_connector(_Config) ->
         {error,
             {post_config_update, _HandlerMod, #{
                 bridge_name := my_test_bridge_new,
-                connector_name := unknown,
-                type := _,
+                connector_name := <<"unknown">>,
+                bridge_type := _,
                 reason := "connector_not_found_or_wrong_type"
             }}},
         update_root_config(RootConf1)
@@ -608,7 +608,7 @@ t_create_no_matching_connector(_Config) ->
             {post_config_update, _HandlerMod, #{
                 bridge_name := _,
                 connector_name := _,
-                type := _,
+                bridge_type := _,
                 reason := "connector_not_found_or_wrong_type"
             }}},
         emqx_bridge_v2:create(bridge_type(), my_test_bridge, Conf)
@@ -628,7 +628,7 @@ t_create_wrong_connector_type(_Config) ->
             {post_config_update, _HandlerMod, #{
                 bridge_name := _,
                 connector_name := _,
-                type := wrong_type,
+                bridge_type := wrong_type,
                 reason := "connector_not_found_or_wrong_type"
             }}},
         emqx_bridge_v2:create(wrong_type, my_test_bridge, Conf)
@@ -644,7 +644,7 @@ t_update_connector_not_found(_Config) ->
             {post_config_update, _HandlerMod, #{
                 bridge_name := _,
                 connector_name := _,
-                type := _,
+                bridge_type := _,
                 reason := "connector_not_found_or_wrong_type"
             }}},
         emqx_bridge_v2:create(bridge_type(), my_test_bridge, BadConf)

+ 2 - 2
apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl

@@ -31,8 +31,8 @@
 
 -import(hoconsc, [mk/2, enum/1, ref/2]).
 
--define(AEH_CONNECTOR_TYPE, azure_event_hub).
--define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub">>).
+-define(AEH_CONNECTOR_TYPE, azure_event_hub_producer).
+-define(AEH_CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
 
 %%-------------------------------------------------------------------------------------------------
 %% `hocon_schema' API

+ 6 - 7
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_producer_SUITE.erl

@@ -13,7 +13,6 @@
 -define(BRIDGE_TYPE, azure_event_hub_producer).
 -define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>).
 -define(KAFKA_BRIDGE_TYPE, kafka).
--define(APPS, [emqx_resource, emqx_bridge, emqx_rule_engine]).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -22,9 +21,7 @@
 %%------------------------------------------------------------------------------
 
 all() ->
-    %TODO: fix tests
-    %emqx_common_test_helpers:all(?MODULE).
-    [].
+    emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
     KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"),
@@ -43,6 +40,7 @@ init_per_suite(Config) ->
                     emqx_resource,
                     emqx_bridge_azure_event_hub,
                     emqx_bridge,
+                    emqx_rule_engine,
                     {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
                 ],
                 #{work_dir => ?config(priv_dir, Config)}
@@ -283,8 +281,6 @@ t_sync_query(Config) ->
 t_same_name_azure_kafka_bridges(AehConfig) ->
     ConfigKafka = lists:keyreplace(bridge_type, 1, AehConfig, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
     BridgeName = ?config(bridge_name, AehConfig),
-    AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
-    KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka),
     TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
     %% creates the AEH bridge and check it's working
     ok = emqx_bridge_testlib:t_sync_query(
@@ -295,6 +291,8 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
     ),
     %% than creates a Kafka bridge with same name and delete it after creation
     ok = emqx_bridge_testlib:t_create_via_http(ConfigKafka),
+    AehResourceId = emqx_bridge_testlib:resource_id(AehConfig),
+    KafkaResourceId = emqx_bridge_testlib:resource_id(ConfigKafka),
     %% check that both bridges are healthy
     ?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)),
     ?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)),
@@ -309,7 +307,8 @@ t_same_name_azure_kafka_bridges(AehConfig) ->
     % check that AEH bridge is still working
     ?check_trace(
         begin
-            Message = {send_message, make_message()},
+            BridgeId = emqx_bridge_v2_testlib:bridge_id(AehConfig),
+            Message = {BridgeId, make_message()},
             ?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)),
             ok
         end,

+ 7 - 5
apps/emqx_bridge_azure_event_hub/test/emqx_bridge_azure_event_hub_v2_SUITE.erl

@@ -10,10 +10,11 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(BRIDGE_TYPE, azure_event_hub).
--define(BRIDGE_TYPE_BIN, <<"azure_event_hub">>).
+-define(BRIDGE_TYPE, azure_event_hub_producer).
+-define(BRIDGE_TYPE_BIN, <<"azure_event_hub_producer">>).
+-define(CONNECTOR_TYPE, azure_event_hub_producer).
+-define(CONNECTOR_TYPE_BIN, <<"azure_event_hub_producer">>).
 -define(KAFKA_BRIDGE_TYPE, kafka_producer).
--define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]).
 
 -import(emqx_common_test_helpers, [on_exit/1]).
 
@@ -41,6 +42,7 @@ init_per_suite(Config) ->
                     emqx_resource,
                     emqx_bridge_azure_event_hub,
                     emqx_bridge,
+                    emqx_rule_engine,
                     {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
                 ],
                 #{work_dir => ?config(priv_dir, Config)}
@@ -88,7 +90,7 @@ common_init_per_testcase(TestCase, Config) ->
     ok = snabbkaffe:start_trace(),
     ExtraConfig ++
         [
-            {connector_type, ?BRIDGE_TYPE},
+            {connector_type, ?CONNECTOR_TYPE},
             {connector_name, Name},
             {connector_config, ConnectorConfig},
             {bridge_type, ?BRIDGE_TYPE},
@@ -156,7 +158,7 @@ connector_config(Name, KafkaHost, KafkaPort) ->
     parse_and_check_connector_config(InnerConfigMap, Name).
 
 parse_and_check_connector_config(InnerConfigMap, Name) ->
-    TypeBin = ?BRIDGE_TYPE_BIN,
+    TypeBin = ?CONNECTOR_TYPE_BIN,
     RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
     #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
         hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{

+ 4 - 4
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -23,7 +23,7 @@ resource_type(Type) when is_binary(Type) ->
 resource_type(kafka_producer) ->
     emqx_bridge_kafka_impl_producer;
 %% We use AEH's Kafka interface.
-resource_type(azure_event_hub) ->
+resource_type(azure_event_hub_producer) ->
     emqx_bridge_kafka_impl_producer;
 resource_type(Type) ->
     error({unknown_connector_type, Type}).
@@ -31,7 +31,7 @@ resource_type(Type) ->
 %% For connectors that need to override connector configurations.
 connector_impl_module(ConnectorType) when is_binary(ConnectorType) ->
     connector_impl_module(binary_to_atom(ConnectorType, utf8));
-connector_impl_module(azure_event_hub) ->
+connector_impl_module(azure_event_hub_producer) ->
     emqx_bridge_azure_event_hub;
 connector_impl_module(_ConnectorType) ->
     undefined.
@@ -49,7 +49,7 @@ connector_structs() ->
                     required => false
                 }
             )},
-        {azure_event_hub,
+        {azure_event_hub_producer,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_azure_event_hub, "config_connector")),
                 #{
@@ -82,7 +82,7 @@ api_schemas(Method) ->
         %% We need to map the `type' field of a request (binary) to a
         %% connector schema module.
         api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
-        api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub">>, Method ++ "_connector")
+        api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector")
     ].
 
 api_ref(Module, Type, Method) ->

+ 1 - 1
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -57,7 +57,7 @@ enterprise_fields_connectors() -> [].
 -endif.
 
 connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
-connector_type_to_bridge_types(azure_event_hub) -> [azure_event_hub].
+connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer].
 
 actions_config_name() -> <<"bridges_v2">>.