Procházet zdrojové kódy

fix(bridges_v1): avoid create dangling connectors when updating bridges via api v1

Fixes https://emqx.atlassian.net/browse/EMQX-11291
Thales Macedo Garitezi před 2 roky
rodič
revize
45a39d97c6

+ 37 - 34
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -1068,21 +1068,29 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
     case lookup_conf(BridgeV2Type, BridgeName) of
         {error, _} ->
             %% If the bridge v2 does not exist, it is a valid bridge v1
-            split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
+            PreviousRawConf = undefined,
+            split_bridge_v1_config_and_create_helper(
+                BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+            );
         _Conf ->
             case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of
                 true ->
                     %% Using remove + create as update, hence do not delete deps.
                     RemoveDeps = [],
+                    PreviousRawConf = emqx:get_raw_config(
+                        [?ROOT_KEY, BridgeV2Type, BridgeName], undefined
+                    ),
                     bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps),
-                    split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
+                    split_bridge_v1_config_and_create_helper(
+                        BridgeV1Type, BridgeName, RawConf, PreviousRawConf
+                    );
                 false ->
                     %% If the bridge v2 exists, it is not a valid bridge v1
                     {error, non_compatible_bridge_v2_exists}
             end
     end.
 
-split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
+split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
     #{
         connector_type := ConnectorType,
         connector_name := NewConnectorName,
@@ -1091,7 +1099,7 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
         bridge_v2_name := BridgeName,
         bridge_v2_conf := NewBridgeV2RawConf
     } =
-        split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf),
+        split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf),
     case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
         {ok, _} ->
             case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
@@ -1116,14 +1124,14 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
             Error
     end.
 
-split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
+split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
     %% Create fake global config for the transformation and then call
-    %% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1
+    %% `emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1'
     BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
     ConnectorType = connector_type(BridgeV2Type),
-    %% Needed so name confligts will ba avoided
+    %% Needed to avoid name conflicts
     CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}),
-    FakeGlobalConfig = #{
+    FakeGlobalConfig0 = #{
         <<"connectors">> => CurrentConnectorsConfig,
         <<"bridges">> => #{
             bin(BridgeV1Type) => #{
@@ -1131,6 +1139,13 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
             }
         }
     },
+    FakeGlobalConfig =
+        emqx_utils_maps:put_if(
+            FakeGlobalConfig0,
+            bin(?ROOT_KEY),
+            #{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}},
+            PreviousRawConf =/= undefined
+        ),
     Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(
         FakeGlobalConfig
     ),
@@ -1143,34 +1158,21 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
             ],
             Output
         ),
-    ConnectorsBefore =
-        maps:keys(
-            emqx_utils_maps:deep_get(
-                [
-                    <<"connectors">>,
-                    bin(ConnectorType)
-                ],
-                FakeGlobalConfig,
-                #{}
-            )
-        ),
-    ConnectorsAfter =
-        maps:keys(
-            emqx_utils_maps:deep_get(
-                [
-                    <<"connectors">>,
-                    bin(ConnectorType)
-                ],
-                Output
-            )
-        ),
-    [NewConnectorName] = ConnectorsAfter -- ConnectorsBefore,
+    ConnectorName = emqx_utils_maps:deep_get(
+        [
+            bin(?ROOT_KEY),
+            bin(BridgeV2Type),
+            bin(BridgeName),
+            <<"connector">>
+        ],
+        Output
+    ),
     NewConnectorRawConf =
         emqx_utils_maps:deep_get(
             [
                 <<"connectors">>,
                 bin(ConnectorType),
-                bin(NewConnectorName)
+                bin(ConnectorName)
             ],
             Output
         ),
@@ -1178,7 +1180,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
     NewFakeGlobalConfig = #{
         <<"connectors">> => #{
             bin(ConnectorType) => #{
-                bin(NewConnectorName) => NewConnectorRawConf
+                bin(ConnectorName) => NewConnectorRawConf
             }
         },
         <<"bridges_v2">> => #{
@@ -1197,7 +1199,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
         _ ->
             #{
                 connector_type => ConnectorType,
-                connector_name => NewConnectorName,
+                connector_name => ConnectorName,
                 connector_conf => NewConnectorRawConf,
                 bridge_v2_type => BridgeV2Type,
                 bridge_v2_name => BridgeName,
@@ -1212,6 +1214,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
 bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
     RawConf = maps:without([<<"name">>], RawConfig0),
     TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
+    PreviousRawConf = undefined,
     #{
         connector_type := _ConnectorType,
         connector_name := _NewConnectorName,
@@ -1219,7 +1222,7 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
         bridge_v2_type := BridgeV2Type,
         bridge_v2_name := _BridgeName,
         bridge_v2_conf := BridgeV2RawConf
-    } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
+    } = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
     create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
 
 bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->

+ 25 - 8
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -100,12 +100,6 @@ setup_mocks() ->
     IsBridgeV2TypeFun = fun(Type) ->
         BridgeV2Type = bridge_type(),
         BridgeV2TypeBin = bridge_type_bin(),
-        ct:pal("is_bridge_v2_type mock: ~p", [
-            #{
-                input_type => Type,
-                expected => [BridgeV2Type, BridgeV2TypeBin]
-            }
-        ]),
         case Type of
             BridgeV2Type -> true;
             BridgeV2TypeBin -> true;
@@ -333,7 +327,8 @@ get_connector_http(Name) ->
 create_bridge_http_api_v1(Opts) ->
     Name = maps:get(name, Opts),
     Overrides = maps:get(overrides, Opts, #{}),
-    BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
+    BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
+    BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
     Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
     Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
     ct:pal("creating bridge (http v1): ~p", [Params]),
@@ -352,6 +347,19 @@ create_bridge_http_api_v2(Opts) ->
     ct:pal("bridge create (http v2) result:\n  ~p", [Res]),
     Res.
 
+update_bridge_http_api_v1(Opts) ->
+    Name = maps:get(name, Opts),
+    BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
+    Overrides = maps:get(overrides, Opts, #{}),
+    BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
+    BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
+    Params = BridgeConfig,
+    Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
+    ct:pal("updating bridge (http v1): ~p", [Params]),
+    Res = request(put, Path, Params),
+    ct:pal("bridge update (http v1) result:\n  ~p", [Res]),
+    Res.
+
 delete_bridge_http_api_v1(Opts) ->
     Name = maps:get(name, Opts),
     BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
@@ -515,8 +523,17 @@ t_scenario_1(_Config) ->
     ),
 
     %% ===================================================================================
-    %% TODO: Update the bridge using v1 API.
+    %% Update the bridge using v1 API.
     %% ===================================================================================
+    ?assertMatch(
+        {ok, {{_, 200, _}, _, _}},
+        update_bridge_http_api_v1(#{name => NameA})
+    ),
+    ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()),
+    ?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()),
+    ?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()),
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)),
+    ?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)),
 
     %% ===================================================================================
     %% Now create a new bridge_v2 pointing to the same connector.  It should no longer be

+ 2 - 7
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -222,13 +222,8 @@ encode_payload(State, Selected) ->
     OrderingKey = render_key(OrderingKeyTemplate, Selected),
     Attributes = proc_attributes(AttributesTemplate, Selected),
     Payload0 = #{data => base64:encode(Data)},
-    Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
-    put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
-
-put_if(Acc, K, V, true) ->
-    Acc#{K => V};
-put_if(Acc, _K, _V, false) ->
-    Acc.
+    Payload1 = emqx_utils_maps:put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
+    emqx_utils_maps:put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
 
 -spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary().
 render_payload([] = _Template, Selected) ->

+ 28 - 17
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -71,21 +71,31 @@ has_connector_field(BridgeConf, ConnectorFields) ->
         ConnectorFields
     ).
 
-bridge_configs_to_transform(_BridgeType, [] = _BridgeNameBridgeConfList, _ConnectorFields) ->
+bridge_configs_to_transform(
+    _BridgeType, [] = _BridgeNameBridgeConfList, _ConnectorFields, _RawConfig
+) ->
     [];
-bridge_configs_to_transform(BridgeType, [{BridgeName, BridgeConf} | Rest], ConnectorFields) ->
+bridge_configs_to_transform(
+    BridgeType, [{BridgeName, BridgeConf} | Rest], ConnectorFields, RawConfig
+) ->
     case has_connector_field(BridgeConf, ConnectorFields) of
         true ->
+            PreviousRawConfig =
+                emqx_utils_maps:deep_get(
+                    [<<"bridges_v2">>, to_bin(BridgeType), to_bin(BridgeName)],
+                    RawConfig,
+                    undefined
+                ),
             [
-                {BridgeType, BridgeName, BridgeConf, ConnectorFields}
-                | bridge_configs_to_transform(BridgeType, Rest, ConnectorFields)
+                {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}
+                | bridge_configs_to_transform(BridgeType, Rest, ConnectorFields, RawConfig)
             ];
         false ->
-            bridge_configs_to_transform(BridgeType, Rest, ConnectorFields)
+            bridge_configs_to_transform(BridgeType, Rest, ConnectorFields, RawConfig)
     end.
 
 split_bridge_to_connector_and_action(
-    {ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields}}
+    {ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}}
 ) ->
     %% Get connector fields from bridge config
     ConnectorMap = lists:foldl(
@@ -122,8 +132,12 @@ split_bridge_to_connector_and_action(
         BridgeConf,
         ConnectorFields
     ),
-    %% Generate a connector name
-    ConnectorName = generate_connector_name(ConnectorsMap, BridgeName, 0),
+    %% Generate a connector name, if needed.  Avoid doing so if there was a previous config.
+    ConnectorName =
+        case PreviousRawConfig of
+            #{<<"connector">> := ConnectorName0} -> ConnectorName0;
+            _ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
+        end,
     %% Add connector field to action map
     ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0),
     {BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
@@ -152,20 +166,17 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
     BridgeTypes = ?MODULE:connector_type_to_bridge_types(ConnectorType),
     BridgesConfMap = maps:get(<<"bridges">>, RawConfig, #{}),
     ConnectorsConfMap = maps:get(<<"connectors">>, RawConfig, #{}),
-    BridgeConfigsToTransform1 =
-        lists:foldl(
-            fun(BridgeType, ToTranformSoFar) ->
+    BridgeConfigsToTransform =
+        lists:flatmap(
+            fun(BridgeType) ->
                 BridgeNameToBridgeMap = maps:get(to_bin(BridgeType), BridgesConfMap, #{}),
                 BridgeNameBridgeConfList = maps:to_list(BridgeNameToBridgeMap),
-                NewToTransform = bridge_configs_to_transform(
-                    BridgeType, BridgeNameBridgeConfList, ConnectorFields
-                ),
-                [NewToTransform, ToTranformSoFar]
+                bridge_configs_to_transform(
+                    BridgeType, BridgeNameBridgeConfList, ConnectorFields, RawConfig
+                )
             end,
-            [],
             BridgeTypes
         ),
-    BridgeConfigsToTransform = lists:flatten(BridgeConfigsToTransform1),
     ConnectorsWithTypeMap = maps:get(to_bin(ConnectorType), ConnectorsConfMap, #{}),
     BridgeConfigsToTransformWithConnectorConf = lists:zip(
         lists:duplicate(length(BridgeConfigsToTransform), ConnectorsWithTypeMap),

+ 7 - 1
apps/emqx_utils/src/emqx_utils_maps.erl

@@ -33,7 +33,8 @@
     diff_maps/2,
     best_effort_recursive_sum/3,
     if_only_to_toggle_enable/2,
-    update_if_present/3
+    update_if_present/3,
+    put_if/4
 ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -303,3 +304,8 @@ update_if_present(Key, Fun, Map) ->
         _ ->
             Map
     end.
+
+put_if(Acc, K, V, true) ->
+    Acc#{K => V};
+put_if(Acc, _K, _V, false) ->
+    Acc.