فهرست منبع

Merge pull request #12205 from thalesmg/fix-list-v2-actions-in-v1-bridges-r54-20231220

fix(bridges_v1): don't list v2-only bridges in API
Thales Macedo Garitezi 2 سال پیش
والد
کامیت
2b03ae8daf

+ 15 - 0
apps/emqx_bridge/src/emqx_action_info.erl

@@ -24,6 +24,7 @@
     action_type_to_connector_type/1,
     action_type_to_connector_type/1,
     action_type_to_bridge_v1_type/2,
     action_type_to_bridge_v1_type/2,
     bridge_v1_type_to_action_type/1,
     bridge_v1_type_to_action_type/1,
+    bridge_v1_type_name/1,
     is_action_type/1,
     is_action_type/1,
     registered_schema_modules/0,
     registered_schema_modules/0,
     connector_action_config_to_bridge_v1_config/2,
     connector_action_config_to_bridge_v1_config/2,
@@ -144,6 +145,20 @@ get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
 get_confs(_, _) ->
 get_confs(_, _) ->
     undefined.
     undefined.
 
 
+%% We need this hack because of the bugs introduced by associating v2/action/source types
+%% with v1 types unconditionally, like `mongodb' being a "valid" V1 bridge type, or
+%% `confluent_producer', which has no v1 equivalent....
+bridge_v1_type_name(ActionTypeBin) when is_binary(ActionTypeBin) ->
+    bridge_v1_type_name(binary_to_existing_atom(ActionTypeBin));
+bridge_v1_type_name(ActionType) ->
+    Module = get_action_info_module(ActionType),
+    case erlang:function_exported(Module, bridge_v1_type_name, 0) of
+        true ->
+            {ok, Module:bridge_v1_type_name()};
+        false ->
+            {error, no_v1_equivalent}
+    end.
+
 %% This function should return true for all inputs that are bridge V1 types for
 %% This function should return true for all inputs that are bridge V1 types for
 %% bridges that have been refactored to bridge V2s, and for all all bridge V2
 %% bridges that have been refactored to bridge V2s, and for all all bridge V2
 %% types. For everything else the function should return false.
 %% types. For everything else the function should return false.

+ 1 - 0
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -621,6 +621,7 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
     end.
     end.
 
 
 lookup_from_local_node(ActionType, ActionName) ->
 lookup_from_local_node(ActionType, ActionName) ->
+    %% TODO: BUG: shouldn't accept an action type here, only V1 types....
     case emqx_bridge:lookup(ActionType, ActionName) of
     case emqx_bridge:lookup(ActionType, ActionName) of
         {ok, Res} -> {ok, format_resource(Res, node())};
         {ok, Res} -> {ok, format_resource(Res, node())};
         Error -> Error
         Error -> Error

+ 8 - 1
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -1086,7 +1086,8 @@ bridge_v1_lookup_and_transform(ActionType, Name) ->
     case lookup(ActionType, Name) of
     case lookup(ActionType, Name) of
         {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
         {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
             BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
             BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
-            case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
+            HasBridgeV1Equivalent = has_bridge_v1_equivalent(ActionType),
+            case HasBridgeV1Equivalent andalso ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
                 true ->
                 true ->
                     ConnectorType = connector_type(ActionType),
                     ConnectorType = connector_type(ActionType),
                     case emqx_connector:lookup(ConnectorType, ConnectorName) of
                     case emqx_connector:lookup(ConnectorType, ConnectorName) of
@@ -1112,6 +1113,12 @@ bridge_v1_lookup_and_transform(ActionType, Name) ->
 not_bridge_v1_compatible_error() ->
 not_bridge_v1_compatible_error() ->
     {error, not_bridge_v1_compatible}.
     {error, not_bridge_v1_compatible}.
 
 
+has_bridge_v1_equivalent(ActionType) ->
+    case emqx_action_info:bridge_v1_type_name(ActionType) of
+        {ok, _} -> true;
+        {error, no_v1_equivalent} -> false
+    end.
+
 connector_raw_config(Connector, ConnectorType) ->
 connector_raw_config(Connector, ConnectorType) ->
     get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema).
     get_raw_with_defaults(Connector, ConnectorType, <<"connectors">>, emqx_connector_schema).
 
 

+ 3 - 0
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -136,6 +136,9 @@ setup_mocks() ->
         end
         end
     ),
     ),
 
 
+    catch meck:new(emqx_action_info, MeckOpts),
+    meck:expect(emqx_action_info, bridge_v1_type_name, 1, {ok, bridge_type()}),
+
     ok.
     ok.
 
 
 con_mod() ->
 con_mod() ->

+ 36 - 0
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -343,6 +343,42 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
     ct:pal("bridge probe result: ~p", [Res]),
     ct:pal("bridge probe result: ~p", [Res]),
     Res.
     Res.
 
 
+list_bridges_http_api_v1() ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    ct:pal("list bridges (http v1)"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list bridges (http v1) result:\n  ~p", [Res]),
+    Res.
+
+list_actions_http_api() ->
+    Path = emqx_mgmt_api_test_util:api_path(["actions"]),
+    ct:pal("list actions (http v2)"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list actions (http v2) result:\n  ~p", [Res]),
+    Res.
+
+list_connectors_http_api() ->
+    Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
+    ct:pal("list connectors"),
+    Res = request(get, Path, _Params = []),
+    ct:pal("list connectors result:\n  ~p", [Res]),
+    Res.
+
+update_rule_http(RuleId, Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId]),
+    ct:pal("update rule ~p:\n  ~p", [RuleId, Params]),
+    Res = request(put, Path, Params),
+    ct:pal("update rule ~p result:\n  ~p", [RuleId, Res]),
+    Res.
+
+enable_rule_http(RuleId) ->
+    Params = #{<<"enable">> => true},
+    update_rule_http(RuleId, Params).
+
+is_rule_enabled(RuleId) ->
+    {ok, #{enable := Enable}} = emqx_rule_engine:get_rule(RuleId),
+    Enable.
+
 try_decode_error(Body0) ->
 try_decode_error(Body0) ->
     case emqx_utils_json:safe_decode(Body0, [return_maps]) of
     case emqx_utils_json:safe_decode(Body0, [return_maps]) of
         {ok, #{<<"message">> := Msg0} = Body1} ->
         {ok, #{<<"message">> := Msg0} = Body1} ->

+ 44 - 4
apps/emqx_bridge_confluent/test/emqx_bridge_confluent_producer_SUITE.erl

@@ -10,8 +10,8 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("common_test/include/ct.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 
--define(BRIDGE_TYPE, confluent_producer).
--define(BRIDGE_TYPE_BIN, <<"confluent_producer">>).
+-define(ACTION_TYPE, confluent_producer).
+-define(ACTION_TYPE_BIN, <<"confluent_producer">>).
 -define(CONNECTOR_TYPE, confluent_producer).
 -define(CONNECTOR_TYPE, confluent_producer).
 -define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
 -define(CONNECTOR_TYPE_BIN, <<"confluent_producer">>).
 -define(KAFKA_BRIDGE_TYPE, kafka_producer).
 -define(KAFKA_BRIDGE_TYPE, kafka_producer).
@@ -93,7 +93,7 @@ common_init_per_testcase(TestCase, Config) ->
             {connector_type, ?CONNECTOR_TYPE},
             {connector_type, ?CONNECTOR_TYPE},
             {connector_name, Name},
             {connector_name, Name},
             {connector_config, ConnectorConfig},
             {connector_config, ConnectorConfig},
-            {bridge_type, ?BRIDGE_TYPE},
+            {bridge_type, ?ACTION_TYPE},
             {bridge_name, Name},
             {bridge_name, Name},
             {bridge_config, BridgeConfig}
             {bridge_config, BridgeConfig}
             | Config
             | Config
@@ -212,7 +212,7 @@ serde_roundtrip(InnerConfigMap0) ->
     InnerConfigMap.
     InnerConfigMap.
 
 
 parse_and_check_bridge_config(InnerConfigMap, Name) ->
 parse_and_check_bridge_config(InnerConfigMap, Name) ->
-    TypeBin = ?BRIDGE_TYPE_BIN,
+    TypeBin = ?ACTION_TYPE_BIN,
     RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
     RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
     hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
     hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
     InnerConfigMap.
     InnerConfigMap.
@@ -341,3 +341,43 @@ t_same_name_confluent_kafka_bridges(Config) ->
         end
         end
     ),
     ),
     ok.
     ok.
+
+t_list_v1_bridges(Config) ->
+    ?check_trace(
+        begin
+            {ok, _} = emqx_bridge_v2_testlib:create_bridge_api(Config),
+
+            ?assertMatch(
+                {error, no_v1_equivalent},
+                emqx_action_info:bridge_v1_type_name(confluent_producer)
+            ),
+
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, []}}, emqx_bridge_v2_testlib:list_bridges_http_api_v1()
+            ),
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_actions_http_api()
+            ),
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, [_]}}, emqx_bridge_v2_testlib:list_connectors_http_api()
+            ),
+
+            RuleTopic = <<"t/c">>,
+            {ok, #{<<"id">> := RuleId0}} =
+                emqx_bridge_v2_testlib:create_rule_and_action_http(
+                    ?ACTION_TYPE_BIN,
+                    RuleTopic,
+                    Config,
+                    #{overrides => #{enable => true}}
+                ),
+            ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)),
+            ?assertMatch(
+                {ok, {{_, 200, _}, _, _}}, emqx_bridge_v2_testlib:enable_rule_http(RuleId0)
+            ),
+            ?assert(emqx_bridge_v2_testlib:is_rule_enabled(RuleId0)),
+
+            ok
+        end,
+        []
+    ),
+    ok.

+ 17 - 5
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -621,24 +621,36 @@ validate_bridge_existence_in_actions(#{actions := Actions, from := Froms} = _Rul
     BridgeIDs0 =
     BridgeIDs0 =
         lists:map(
         lists:map(
             fun(BridgeID) ->
             fun(BridgeID) ->
-                emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false})
+                %% FIXME: this supposedly returns an upgraded type, but it's fuzzy: it
+                %% returns v1 types when attempting to "upgrade".....
+                {Type, Name} =
+                    emqx_bridge_resource:parse_bridge_id(BridgeID, #{atom_name => false}),
+                case emqx_action_info:is_action_type(Type) of
+                    true -> {action, Type, Name};
+                    false -> {bridge_v1, Type, Name}
+                end
             end,
             end,
             get_referenced_hookpoints(Froms)
             get_referenced_hookpoints(Froms)
         ),
         ),
     BridgeIDs1 =
     BridgeIDs1 =
         lists:filtermap(
         lists:filtermap(
             fun
             fun
-                ({bridge_v2, Type, Name}) -> {true, {Type, Name}};
-                ({bridge, Type, Name, _ResId}) -> {true, {Type, Name}};
+                ({bridge_v2, Type, Name}) -> {true, {action, Type, Name}};
+                ({bridge, Type, Name, _ResId}) -> {true, {bridge_v1, Type, Name}};
                 (_) -> false
                 (_) -> false
             end,
             end,
             Actions
             Actions
         ),
         ),
     NonExistentBridgeIDs =
     NonExistentBridgeIDs =
         lists:filter(
         lists:filter(
-            fun({Type, Name}) ->
+            fun({Kind, Type, Name}) ->
+                LookupFn =
+                    case Kind of
+                        action -> fun emqx_bridge_v2:lookup/2;
+                        bridge_v1 -> fun emqx_bridge:lookup/2
+                    end,
                 try
                 try
-                    case emqx_bridge:lookup(Type, Name) of
+                    case LookupFn(Type, Name) of
                         {ok, _} -> false;
                         {ok, _} -> false;
                         {error, _} -> true
                         {error, _} -> true
                     end
                     end