Просмотр исходного кода

feat(emqx_bridge): action_info with dynamic lookup

This allows a n:1 relation between v1 bridge_types to action/connector types as
it's the case with mongodb for instance, where we had `mongodb_single`
`mongodb_sharded` etc and the new implementation will just have `mongodb`.
Stefan Strigler 2 лет назад
Родитель
Сommit
84ff7b0b38

+ 60 - 29
apps/emqx_bridge/src/emqx_action_info.erl

@@ -22,7 +22,7 @@
 
 -export([
     action_type_to_connector_type/1,
-    action_type_to_bridge_v1_type/1,
+    action_type_to_bridge_v1_type/2,
     bridge_v1_type_to_action_type/1,
     is_action_type/1,
     registered_schema_modules/0,
@@ -30,7 +30,12 @@
     bridge_v1_to_action_fixup/2
 ]).
 
--callback bridge_v1_type_name() -> atom().
+-callback bridge_v1_type_name() ->
+    atom()
+    | {
+        fun(({ActionConfig :: map(), ConnectorConfig :: map()}) -> Type :: atom()),
+        TypeList :: [atom()]
+    }.
 -callback action_type_name() -> atom().
 -callback connector_type_name() -> atom().
 -callback schema_module() -> atom().
@@ -93,16 +98,22 @@ bridge_v1_type_to_action_type(Type) ->
         ActionType -> ActionType
     end.
 
-action_type_to_bridge_v1_type(Bin) when is_binary(Bin) ->
-    action_type_to_bridge_v1_type(binary_to_existing_atom(Bin));
-action_type_to_bridge_v1_type(Type) ->
+action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) ->
+    action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf);
+action_type_to_bridge_v1_type(ActionType, Conf) ->
     ActionInfoMap = info_map(),
     ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
-    case maps:get(Type, ActionTypeToBridgeV1Type, undefined) of
-        undefined -> Type;
+    case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of
+        undefined -> ActionType;
+        BridgeV1TypeFun when is_function(BridgeV1TypeFun) -> BridgeV1TypeFun(get_confs(Conf));
         BridgeV1Type -> BridgeV1Type
     end.
 
+get_confs(#{connector := ConnectorName, type := ActionType} = ActionConfig) ->
+    ConnectorType = action_type_to_connector_type(ActionType),
+    ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]),
+    {ActionConfig, ConnectorConfig}.
+
 %% This function should return true for all inputs that are bridge V1 types for
 %% bridges that have been refactored to bridge V2s, and for all all bridge V2
 %% types. For everything else the function should return false.
@@ -226,36 +237,56 @@ get_info_map(Module) ->
     %% Force the module to get loaded
     _ = code:ensure_loaded(Module),
     ActionType = Module:action_type_name(),
-    BridgeV1Type =
+    {BridgeV1TypeOrFun, BridgeV1Types} =
         case erlang:function_exported(Module, bridge_v1_type_name, 0) of
             true ->
-                Module:bridge_v1_type_name();
+                case Module:bridge_v1_type_name() of
+                    {_BridgeV1TypeFun, _BridgeV1Types} = BridgeV1TypeTuple ->
+                        BridgeV1TypeTuple;
+                    BridgeV1Type0 ->
+                        {BridgeV1Type0, [BridgeV1Type0]}
+                end;
             false ->
-                Module:action_type_name()
+                {ActionType, [ActionType]}
         end,
     #{
-        action_type_names => #{
-            ActionType => true,
-            BridgeV1Type => true
-        },
-        bridge_v1_type_to_action_type => #{
-            BridgeV1Type => ActionType,
-            %% Alias the bridge V1 type to the action type
-            ActionType => ActionType
-        },
+        action_type_names =>
+            lists:foldl(
+                fun(BridgeV1Type, M) ->
+                    M#{BridgeV1Type => true}
+                end,
+                #{ActionType => true},
+                BridgeV1Types
+            ),
+        bridge_v1_type_to_action_type =>
+            lists:foldl(
+                fun(BridgeV1Type, M) ->
+                    %% Alias the bridge V1 type to the action type
+                    M#{BridgeV1Type => ActionType}
+                end,
+                #{ActionType => ActionType},
+                BridgeV1Types
+            ),
         action_type_to_bridge_v1_type => #{
-            ActionType => BridgeV1Type
-        },
-        action_type_to_connector_type => #{
-            ActionType => Module:connector_type_name(),
-            %% Alias the bridge V1 type to the action type
-            BridgeV1Type => Module:connector_type_name()
+            ActionType => BridgeV1TypeOrFun
         },
+        action_type_to_connector_type =>
+            lists:foldl(
+                fun(BridgeV1Type, M) ->
+                    M#{BridgeV1Type => Module:connector_type_name()}
+                end,
+                #{ActionType => Module:connector_type_name()},
+                BridgeV1Types
+            ),
         action_type_to_schema_module => #{
             ActionType => Module:schema_module()
         },
-        action_type_to_info_module => #{
-            ActionType => Module,
-            BridgeV1Type => Module
-        }
+        action_type_to_info_module =>
+            lists:foldl(
+                fun(BridgeV1Type, M) ->
+                    M#{BridgeV1Type => Module}
+                end,
+                #{ActionType => Module},
+                BridgeV1Types
+            )
     }.

+ 3 - 3
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -907,7 +907,7 @@ format_resource(
     redact(
         maps:merge(
             RawConfFull#{
-                type => downgrade_type(Type),
+                type => downgrade_type(Type, RawConf),
                 name => maps:get(<<"name">>, RawConf, BridgeName),
                 node => Node
             },
@@ -1162,5 +1162,5 @@ non_compat_bridge_msg() ->
 upgrade_type(Type) ->
     emqx_bridge_lib:upgrade_type(Type).
 
-downgrade_type(Type) ->
-    emqx_bridge_lib:downgrade_type(Type).
+downgrade_type(Type, Conf) ->
+    emqx_bridge_lib:downgrade_type(Type, Conf).

+ 14 - 8
apps/emqx_bridge/src/emqx_bridge_lib.erl

@@ -18,7 +18,7 @@
 -export([
     maybe_withdraw_rule_action/3,
     upgrade_type/1,
-    downgrade_type/1
+    downgrade_type/2
 ]).
 
 %% @doc A bridge can be used as a rule action.
@@ -61,17 +61,17 @@ upgrade_type(Type) when is_list(Type) ->
     atom_to_list(emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(list_to_binary(Type))).
 
 %% @doc Kafka producer bridge type renamed from 'kafka' to 'kafka_bridge' since 5.3.1
-downgrade_type(Type) when is_atom(Type) ->
-    emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type);
-downgrade_type(Type) when is_binary(Type) ->
-    atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type));
-downgrade_type(Type) when is_list(Type) ->
-    atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type))).
+downgrade_type(Type, Conf) when is_atom(Type) ->
+    emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type, Conf);
+downgrade_type(Type, Conf) when is_binary(Type) ->
+    atom_to_binary(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(Type, Conf));
+downgrade_type(Type, Conf) when is_list(Type) ->
+    atom_to_list(emqx_bridge_v2:bridge_v2_type_to_bridge_v1_type(list_to_binary(Type), Conf)).
 
 %% A rule might be referencing an old version bridge type name
 %% i.e. 'kafka' instead of 'kafka_producer' so we need to try both
 external_ids(Type, Name) ->
-    case downgrade_type(Type) of
+    case downgrade_type(Type, get_conf(Type, Name)) of
         Type ->
             [external_id(Type, Name)];
         Type0 ->
@@ -87,3 +87,9 @@ external_id(BridgeType, BridgeName) ->
 
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
+
+get_conf(BridgeType, BridgeName) ->
+    case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
+        true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]);
+        false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName])
+    end.

+ 3 - 3
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -111,7 +111,7 @@
     bridge_v1_create_dry_run/2,
     bridge_v1_type_to_bridge_v2_type/1,
     %% Exception from the naming convention:
-    bridge_v2_type_to_bridge_v1_type/1,
+    bridge_v2_type_to_bridge_v1_type/2,
     bridge_v1_id_to_connector_resource_id/1,
     bridge_v1_enable_disable/3,
     bridge_v1_restart/2,
@@ -1050,8 +1050,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
 bridge_v1_type_to_bridge_v2_type(Type) ->
     emqx_action_info:bridge_v1_type_to_action_type(Type).
 
-bridge_v2_type_to_bridge_v1_type(Type) ->
-    emqx_action_info:action_type_to_bridge_v1_type(Type).
+bridge_v2_type_to_bridge_v1_type(Type, Conf) ->
+    emqx_action_info:action_type_to_bridge_v1_type(Type, Conf).
 
 is_bridge_v2_type(Type) ->
     emqx_action_info:is_action_type(Type).

+ 6 - 1
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -621,8 +621,13 @@ partitioner(random) -> random;
 partitioner(key_dispatch) -> first_key_dispatch.
 
 replayq_dir(BridgeType, BridgeName) ->
+    RawConf = emqx_conf:get_raw([actions, BridgeType, BridgeName]),
     DirName = iolist_to_binary([
-        emqx_bridge_lib:downgrade_type(BridgeType), ":", BridgeName, ":", atom_to_list(node())
+        emqx_bridge_lib:downgrade_type(BridgeType, RawConf),
+        ":",
+        BridgeName,
+        ":",
+        atom_to_list(node())
     ]),
     filename:join([emqx:data_dir(), "kafka", DirName]).
 

+ 22 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -521,8 +521,9 @@ format_action(Actions) ->
 
 do_format_action({bridge, BridgeType, BridgeName, _ResId}) ->
     emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
-do_format_action({bridge_v2, BridgeType, BridgeName}) ->
-    emqx_bridge_resource:bridge_id(emqx_bridge_lib:downgrade_type(BridgeType), BridgeName);
+do_format_action({bridge_v2, BridgeType0, BridgeName}) ->
+    BridgeType = try_downgrade(BridgeType0, BridgeName),
+    emqx_bridge_resource:bridge_id(BridgeType, BridgeName);
 do_format_action(#{mod := Mod, func := Func, args := Args}) ->
     #{
         function => printable_function_name(Mod, Func),
@@ -533,6 +534,25 @@ do_format_action(#{mod := Mod, func := Func}) ->
         function => printable_function_name(Mod, Func)
     }.
 
+try_downgrade(BridgeType, BridgeName) ->
+    Conf = try_get_conf(BridgeType, BridgeName),
+    try emqx_bridge_lib:downgrade_type(BridgeType, Conf) of
+        DowngradedBridgeType ->
+            DowngradedBridgeType
+    catch
+        error:{config_not_found, _} ->
+            BridgeType
+    end.
+
+try_get_conf(BridgeType, BridgeName) ->
+    try emqx_conf:get_raw([actions, BridgeType, BridgeName]) of
+        RawConf ->
+            RawConf
+    catch
+        error:{config_not_found, _} ->
+            #{}
+    end.
+
 printable_function_name(emqx_rule_actions, Func) ->
     Func;
 printable_function_name(Mod, Func) ->