소스 검색

chore: remove redundant redis action swagger example

zhongwencool 2 년 전
부모
커밋
3404f39fd2

+ 15 - 3
apps/emqx_bridge/src/emqx_action_info.erl

@@ -33,7 +33,8 @@
     has_custom_bridge_v1_config_to_connector_config/1,
     bridge_v1_config_to_action_config/3,
     has_custom_bridge_v1_config_to_action_config/1,
-    transform_bridge_v1_config_to_action_config/4
+    transform_bridge_v1_config_to_action_config/4,
+    action_convert_from_connector/3
 ]).
 
 -callback bridge_v1_type_name() ->
@@ -142,8 +143,10 @@ action_type_to_bridge_v1_type(ActionType, ActionConf) ->
 
 get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
     ConnectorType = action_type_to_connector_type(ActionType),
-    ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]),
-    {ConnectorConfig, ActionConfig};
+    case emqx_conf:get_raw([connectors, ConnectorType, ConnectorName], undefined) of
+        undefined -> undefined;
+        ConnectorConfig -> {ConnectorConfig, ActionConfig}
+    end;
 get_confs(_, _) ->
     undefined.
 
@@ -188,6 +191,15 @@ connector_action_config_to_bridge_v1_config(ActionOrBridgeType, ConnectorConfig,
             connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig)
     end.
 
+action_convert_from_connector(ActionOrBridgeType, ConnectorConfig, ActionConfig) ->
+    Module = get_action_info_module(ActionOrBridgeType),
+    case erlang:function_exported(Module, action_convert_from_connector, 2) of
+        true ->
+            Module:action_convert_from_connector(ConnectorConfig, ActionConfig);
+        false ->
+            ActionConfig
+    end.
+
 connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
     Merged = emqx_utils_maps:deep_merge(
         maps:without(

+ 172 - 152
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -73,6 +73,8 @@
     get_channels_for_connector/1
 ]).
 
+-export([diff_confs/2]).
+
 %% Exported for tests
 -export([
     id/2,
@@ -97,7 +99,8 @@
 
 -export([
     bridge_v2_type_to_connector_type/1,
-    is_bridge_v2_type/1
+    is_bridge_v2_type/1,
+    connector_type/1
 ]).
 
 %% Compatibility Layer API
@@ -148,12 +151,23 @@
 %%====================================================================
 
 load() ->
+    reinit_bridges(),
     load_bridges(),
     load_message_publish_hook(),
     ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2),
     ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2),
     ok.
 
+reinit_bridges() ->
+    %% todo
+    Bridges = emqx:get_raw_config([?ROOT_KEY], #{}),
+    try
+        actions_convert_from_connectors(Bridges)
+    catch
+        _:_ -> ok
+    end,
+    ok.
+
 load_bridges() ->
     Bridges = emqx:get_config([?ROOT_KEY], #{}),
     lists:foreach(
@@ -248,7 +262,7 @@ list() ->
     {ok, emqx_config:update_result()} | {error, any()}.
 create(BridgeType, BridgeName, RawConf) ->
     ?SLOG(debug, #{
-        brige_action => create,
+        bridge_action => create,
         bridge_version => 2,
         bridge_type => BridgeType,
         bridge_name => BridgeName,
@@ -265,7 +279,7 @@ remove(BridgeType, BridgeName) ->
     %% NOTE: This function can cause broken references from rules but it is only
     %% called directly from test cases.
     ?SLOG(debug, #{
-        brige_action => remove,
+        bridge_action => remove,
         bridge_version => 2,
         bridge_type => BridgeType,
         bridge_name => BridgeName
@@ -356,7 +370,7 @@ install_bridge_v2_helper(
     _BridgeName,
     {error, Reason} = Error
 ) ->
-    ?SLOG(error, Reason),
+    ?SLOG(warning, Reason),
     Error;
 install_bridge_v2_helper(
     BridgeV2Type,
@@ -412,7 +426,7 @@ uninstall_bridge_v2(
     CreationOpts = emqx_resource:fetch_creation_opts(Config),
     ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
     ok = emqx_resource:clear_metrics(BridgeV2Id),
-    case validate_referenced_connectors(BridgeV2Type, ConnectorName, BridgeName) of
+    case referenced_connectors_exist(BridgeV2Type, ConnectorName, BridgeName) of
         {error, _} ->
             ok;
         ok ->
@@ -440,6 +454,7 @@ combine_connector_and_bridge_v2_config(
             BridgeV2Config#{resource_opts => CombinedCreationOpts}
     catch
         _:_ ->
+            alarm_connector_not_found(BridgeV2Type, BridgeName, ConnectorName),
             {error, #{
                 reason => "connector_not_found",
                 type => BridgeV2Type,
@@ -451,15 +466,14 @@ combine_connector_and_bridge_v2_config(
 %%====================================================================
 %% Operations
 %%====================================================================
+-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)).
 
 -spec disable_enable(disable | enable, bridge_v2_type(), bridge_v2_name()) ->
     {ok, any()} | {error, any()}.
-disable_enable(Action, BridgeType, BridgeName) when
-    Action =:= disable; Action =:= enable
-->
+disable_enable(Action, BridgeType, BridgeName) when ?ENABLE_OR_DISABLE(Action) ->
     emqx_conf:update(
         config_key_path() ++ [BridgeType, BridgeName],
-        {Action, BridgeType, BridgeName},
+        Action,
         #{override_to => cluster}
     ).
 
@@ -565,7 +579,7 @@ query(BridgeType, BridgeName, Message, QueryOpts0) ->
 do_query_with_enabled_config(
     _BridgeType, _BridgeName, _Message, _QueryOpts0, {error, Reason} = Error
 ) ->
-    ?SLOG(error, Reason),
+    ?SLOG(warning, Reason),
     Error;
 do_query_with_enabled_config(
     BridgeType, BridgeName, Message, QueryOpts0, Config
@@ -863,118 +877,89 @@ import_config(RawConf) ->
 %% Config Update Handler API
 %%====================================================================
 
-config_key_path() ->
-    [?ROOT_KEY].
-
-config_key_path_leaf() ->
-    [?ROOT_KEY, '?', '?'].
-
-pre_config_update(_, {force_update, Conf}, _OldConf) ->
-    {ok, Conf};
-%% NOTE: We depend on the `emqx_bridge:pre_config_update/3` to restart/stop the
-%%       underlying resources.
-pre_config_update(_, {_Oper, _, _}, undefined) ->
-    {error, bridge_not_found};
-pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
-    %% to save the 'enable' to the config files
-    {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
-pre_config_update(_Path, Conf, _OldConfig) when is_map(Conf) ->
-    {ok, Conf}.
-
-operation_to_enable(disable) -> false;
-operation_to_enable(enable) -> true.
-
-%% A public API that can trigger this is:
-%% bin/emqx ctl conf load data/configs/cluster.hocon
-post_config_update([?ROOT_KEY], {force_update, _Req}, NewConf, OldConf, _AppEnv) ->
-    do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => false});
-%% This top level handler will be triggered when the actions path is updated
-%% with calls to emqx_conf:update([actions], BridgesConf, #{}).
-%%
-post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
-    do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors => true});
-post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) ->
-    Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]),
-    ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf),
-    Bridges = emqx_utils_maps:deep_remove([BridgeType, BridgeName], emqx:get_config([?ROOT_KEY])),
-    reload_message_publish_hook(Bridges),
+config_key_path() -> [?ROOT_KEY].
+
+config_key_path_leaf() -> [?ROOT_KEY, '?', '?'].
+
+%% enable or disable action
+pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined) when ?ENABLE_OR_DISABLE(Oper) ->
+    {error, #{
+        bridge_name => Name,
+        bridge_type => Type,
+        reason => <<"bridge_not_found">>
+    }};
+pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldAction) when ?ENABLE_OR_DISABLE(Oper) ->
+    {ok, OldAction#{<<"enable">> => operation_to_enable(Oper)}};
+
+%% Updates a single action from a specific HTTP API.
+%% If the connector is not found, the update operation fails.
+pre_config_update([?ROOT_KEY, ActionType, _Name], Conf = #{}, _OldConf) ->
+    action_convert_from_connector(ActionType, Conf);
+%% Batch updates actions when importing a configuration or executing a CLI command.
+%% Update succeeded even if the connector is not found, alarm in post_config_update
+pre_config_update([?ROOT_KEY], Conf = #{}, _OldConfs) ->
+    {ok, actions_convert_from_connectors(Conf)}.
+
+%% Don't crash event the bridge is not found
+post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
+    AllBridges = emqx:get_config([?ROOT_KEY]),
+    case emqx_utils_maps:deep_get([Type, Name], AllBridges, undefined) of
+        undefined ->
+            ok;
+        Action ->
+            ok = uninstall_bridge_v2(Type, Name, Action),
+            Bridges = emqx_utils_maps:deep_remove([Type, Name], AllBridges),
+            reload_message_publish_hook(Bridges)
+    end,
     ?tp(bridge_post_config_update_done, #{}),
     ok;
+%% Create a single bridge failed if the connector is not found(already check in pre_config_update)
 post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) ->
-    %% N.B.: all bridges must use the same field name (`connector`) to define the
-    %% connector name.
-    ConnectorName = maps:get(connector, NewConf),
-    case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
-        ok ->
-            ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
-            Bridges = emqx_utils_maps:deep_put(
-                [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
-            ),
-            reload_message_publish_hook(Bridges),
-            ?tp(bridge_post_config_update_done, #{}),
-            ok;
-        {error, Error} ->
-            {error, Error}
-    end;
+    ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
+    Bridges = emqx_utils_maps:deep_put(
+        [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
+    ),
+    reload_message_publish_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
+    ok;
+%% update bridges failed if the connector is not found(already check in pre_config_update)
 post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) ->
-    ConnectorName = maps:get(connector, NewConf),
-    case validate_referenced_connectors(BridgeType, ConnectorName, BridgeName) of
-        ok ->
-            ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
-            ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
-            Bridges = emqx_utils_maps:deep_put(
-                [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
-            ),
-            reload_message_publish_hook(Bridges),
-            ?tp(bridge_post_config_update_done, #{}),
-            ok;
-        {error, Error} ->
-            {error, Error}
-    end.
+    ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf),
+    ok = install_bridge_v2(BridgeType, BridgeName, NewConf),
+    Bridges = emqx_utils_maps:deep_put(
+        [BridgeType, BridgeName], emqx:get_config([?ROOT_KEY]), NewConf
+    ),
+    reload_message_publish_hook(Bridges),
+    ?tp(bridge_post_config_update_done, #{}),
+    ok;
 
-do_post_config_update(NewConf, OldConf, #{validate_referenced_connectors := NeedValidate}) ->
+%% This top level handler will be triggered when the actions path is updated
+%% with calls to emqx_conf:update([actions], BridgesConf, #{}).
+%% such as import_config/1
+%% Notice ** do succeeded even if the connector is not found **
+%% Install a non-exist connector will alarm & log(warn) in install_bridge_v2.
+post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
     #{added := Added, removed := Removed, changed := Updated} =
         diff_confs(NewConf, OldConf),
-    UpdatedConfigs =
-        lists:map(
-            fun({{Type, BridgeName}, {_Old, New}}) ->
-                {Type, BridgeName, New}
-            end,
-            maps:to_list(Updated)
-        ),
-    AddedConfigs =
-        lists:map(
-            fun({{Type, BridgeName}, AddedConf}) ->
-                {Type, BridgeName, AddedConf}
-            end,
-            maps:to_list(Added)
-        ),
-    ToValidate = UpdatedConfigs ++ AddedConfigs,
-    case multi_validate_referenced_connectors(NeedValidate, ToValidate) of
-        ok ->
-            %% The config update will be failed if any task in `perform_bridge_changes` failed.
-            RemoveFun = fun uninstall_bridge_v2/3,
-            CreateFun = fun install_bridge_v2/3,
-            UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
-                uninstall_bridge_v2(Type, Name, OldBridgeConf),
-                install_bridge_v2(Type, Name, Conf)
-            end,
-            Result = perform_bridge_changes([
-                #{action => RemoveFun, data => Removed},
-                #{
-                    action => CreateFun,
-                    data => Added,
-                    on_exception_fn => fun emqx_bridge_resource:remove/4
-                },
-                #{action => UpdateFun, data => Updated}
-            ]),
-            ok = unload_message_publish_hook(),
-            ok = load_message_publish_hook(NewConf),
-            ?tp(bridge_post_config_update_done, #{}),
-            Result;
-        {error, Error} ->
-            {error, Error}
-    end.
+    %% The config update will be failed if any task in `perform_bridge_changes` failed.
+    RemoveFun = fun uninstall_bridge_v2/3,
+    CreateFun = fun install_bridge_v2/3,
+    UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) ->
+        uninstall_bridge_v2(Type, Name, OldBridgeConf),
+        install_bridge_v2(Type, Name, Conf)
+                end,
+    Result = perform_bridge_changes([
+        #{action => RemoveFun, data => Removed},
+        #{
+            action => CreateFun,
+            data => Added,
+            on_exception_fn => fun emqx_bridge_resource:remove/4
+        },
+        #{action => UpdateFun, data => Updated}
+    ]),
+    reload_message_publish_hook(NewConf),
+    ?tp(bridge_post_config_update_done, #{}),
+    Result.
 
 diff_confs(NewConfs, OldConfs) ->
     emqx_utils_maps:diff_maps(
@@ -1066,12 +1051,10 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
         #{connector := ConnectorName} ->
             ConnectorType = connector_type(BridgeV2Type),
             ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
-            {ok, Channels} = emqx_resource:get_channels(ConnectorResourceId),
-            case Channels of
-                [_Channel] ->
-                    true;
-                _ ->
-                    false
+            case emqx_resource:get_channels(ConnectorResourceId) of
+                {ok, [_Channel]} -> true;
+                %% not_found, [], [_|_]
+                _ -> false
             end
     end.
 
@@ -1148,7 +1131,7 @@ bridge_v1_lookup_and_transform_helper(
     BridgeV2Status = maps:get(status, Action, undefined),
     BridgeV2Error = maps:get(error, Action, undefined),
     ResourceData1 = maps:get(resource_data, BridgeV1, #{}),
-    %% Replace id in resouce data
+    %% Replace id in resource data
     BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>,
     ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1),
     ConnectorStatus = maps:get(status, ResourceData2, undefined),
@@ -1422,7 +1405,7 @@ bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
     ).
 
 %% Bridge v1 delegated-removal in 3 steps:
-%% 1. Delete rule actions if RmoveDeps has 'rule_actions'
+%% 1. Delete rule actions if RemoveDeps has 'rule_actions'
 %% 2. Delete self (the bridge v2), also delete its channel in the connector
 %% 3. Delete the connector if the connector has no more channel left and if 'connector' is in RemoveDeps
 bridge_v1_check_deps_and_remove(
@@ -1557,6 +1540,9 @@ bridge_v1_operation_helper(BridgeV1Type, Name, ConnectorOpFun, DoHealthCheck) ->
 %% Misc helper functions
 %%====================================================================
 
+operation_to_enable(disable) -> false;
+operation_to_enable(enable) -> true.
+
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Str) when is_list(Str) -> list_to_binary(Str);
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
@@ -1575,50 +1561,84 @@ to_existing_atom(X) ->
         {error, _} -> throw(bad_atom)
     end.
 
-validate_referenced_connectors(BridgeType, ConnectorNameBin, BridgeName) ->
+referenced_connectors_exist(BridgeType, ConnectorNameBin, BridgeName) ->
     %% N.B.: assumes that, for all bridgeV2 types, the name of the bridge type is
     %% identical to its matching connector type name.
-    try
-        {ConnectorName, ConnectorType} = to_connector(ConnectorNameBin, BridgeType),
-        case emqx_config:get([connectors, ConnectorType, ConnectorName], undefined) of
-            undefined ->
-                throw(not_found);
-            _ ->
-                ok
-        end
-    catch
-        throw:not_found ->
+    case get_connector_info(ConnectorNameBin, BridgeType) of
+        {error, not_found} ->
             {error, #{
                 reason => "connector_not_found_or_wrong_type",
                 connector_name => ConnectorNameBin,
                 bridge_name => BridgeName,
                 bridge_type => BridgeType
+            }};
+        {ok, _Connector} ->
+            ok
+    end.
+
+actions_convert_from_connectors(Conf) ->
+    maps:map(
+        fun({ActionType, Actions}) ->
+            maps:map(
+                fun(_Name, Action) ->
+                    case action_convert_from_connector(ActionType, Action) of
+                        {ok, NewAction} -> NewAction;
+                        {error, _} -> Action
+                    end
+                end,
+                Actions
+            )
+        end,
+        Conf
+    ).
+
+action_convert_from_connector(ActionType, Action = #{<<"connector">> := ConnectorName}) ->
+    case get_connector_info(ConnectorName, ActionType) of
+        {ok, Connector} ->
+            Action1 = emqx_action_info:action_convert_from_connector(ActionType, Connector, Action),
+            {ok, Action1};
+        {error, not_found} ->
+            {error, #{
+                reason => "connector_not_found",
+                type => ActionType,
+                connector_name => ConnectorName
             }}
     end.
 
+get_connector_info(ConnectorNameBin, BridgeType) ->
+    case to_connector(ConnectorNameBin, BridgeType) of
+        {error, not_found} ->
+            {error, not_found};
+        {ConnectorName, ConnectorType} ->
+            case emqx_config:get_raw([connectors, ConnectorType, ConnectorName], undefined) of
+                undefined -> {error, not_found};
+                Connector -> {ok, Connector}
+            end
+    end.
+
 to_connector(ConnectorNameBin, BridgeType) ->
     try
-        ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(to_existing_atom(BridgeType)),
         ConnectorName = to_existing_atom(ConnectorNameBin),
+        BridgeType1 = to_existing_atom(BridgeType),
+        ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType1),
         {ConnectorName, ConnectorType}
     catch
         _:_ ->
-            throw(not_found)
+            {error, not_found}
     end.
 
-multi_validate_referenced_connectors(false, _Configs) ->
-    ok;
-multi_validate_referenced_connectors(true, Configs) ->
-    Pipeline =
-        lists:map(
-            fun({Type, BridgeName, #{connector := ConnectorName}}) ->
-                fun(_) -> validate_referenced_connectors(Type, ConnectorName, BridgeName) end
-            end,
-            Configs
-        ),
-    case emqx_utils:pipeline(Pipeline, unused, unused) of
-        {ok, _, _} ->
-            ok;
-        {error, Reason, _State} ->
-            {error, Reason}
-    end.
+alarm_connector_not_found(ActionType, ActionName, ConnectorName) ->
+    ConnectorType = connector_type(to_existing_atom(ActionType)),
+    ResId = emqx_connector_resource:resource_id(
+        ConnectorType, ConnectorName
+    ),
+    _ = emqx_alarm:safe_activate(
+        ResId,
+        #{
+            connector_name => ConnectorName,
+            connector_type => ConnectorType,
+            action_type => ActionType,
+            action_name => ActionName
+        },
+        <<"connector not found">>
+    ).

+ 19 - 0
apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl

@@ -56,6 +56,8 @@
     project_to_actions_resource_opts/1
 ]).
 
+-export([actions_convert_from_connectors/1]).
+
 -export_type([action_type/0]).
 
 %% Should we explicitly list them here so dialyzer may be more helpful?
@@ -305,6 +307,23 @@ project_to_actions_resource_opts(OldResourceOpts) ->
     Subfields = common_resource_opts_subfields_bin(),
     maps:with(Subfields, OldResourceOpts).
 
+actions_convert_from_connectors(RawConf = #{<<"actions">> := Actions}) ->
+    Actions1 =
+        maps:map(fun(ActionType, ActionMap) ->
+            maps:map(fun(_ActionName, Action) ->
+            #{<<"connector">> := ConnName} = Action,
+            ConnType = atom_to_binary(emqx_bridge_v2:connector_type(ActionType)),
+            ConnPath = [<<"connectors">>, ConnType, ConnName],
+            case emqx_utils_maps:deep_find(ConnPath, RawConf) of
+                {ok, ConnConf} ->
+                    emqx_action_info:action_convert_from_connector(ActionType, ConnConf, Action);
+                {not_found, _KeyPath, _Data} -> Action
+            end
+                 end, ActionMap)
+             end, Actions),
+    maps:put(<<"actions">>, Actions1, RawConf);
+actions_convert_from_connectors(RawConf) -> RawConf.
+
 -ifdef(TEST).
 -include_lib("hocon/include/hocon_types.hrl").
 schema_homogeneous_test() ->

+ 1 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_redis, [
     {description, "EMQX Enterprise Redis Bridge"},
-    {vsn, "0.1.4"},
+    {vsn, "0.1.5"},
     {registered, []},
     {applications, [
         kernel,

+ 19 - 1
apps/emqx_bridge_redis/src/emqx_bridge_redis_action_info.erl

@@ -14,7 +14,8 @@
     connector_action_config_to_bridge_v1_config/2,
     bridge_v1_config_to_action_config/2,
     bridge_v1_config_to_connector_config/1,
-    bridge_v1_type_name_fun/1
+    bridge_v1_type_name_fun/1,
+    action_convert_from_connector/2
 ]).
 
 -import(emqx_utils_conv, [bin/1]).
@@ -48,6 +49,23 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
         end,
     maps:without([<<"description">>], Config1).
 
+action_convert_from_connector(ConnectorConfig, ActionConfig) ->
+    case ConnectorConfig of
+        #{<<"parameters">> := #{<<"redis_type">> := <<"redis_cluster">>}} ->
+            emqx_utils_maps:update_if_present(
+                <<"resource_opts">>,
+                fun(Opts) ->
+                    Opts#{
+                        <<"batch_size">> => 1,
+                        <<"batch_time">> => <<"0ms">>
+                    }
+                end,
+                ActionConfig
+            );
+        _ ->
+            ActionConfig
+    end.
+
 bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
     ActionTopLevelKeys = schema_keys(?SCHEMA_MODULE:fields(redis_action)),
     ActionParametersKeys = schema_keys(emqx_bridge_redis:fields(action_parameters)),

+ 2 - 14
apps/emqx_bridge_redis/src/emqx_bridge_redis_schema.erl

@@ -145,22 +145,10 @@ resource_opts_converter(Conf, _Opts) ->
 bridge_v2_examples(Method) ->
     [
         #{
-            <<"redis_single_producer">> => #{
-                summary => <<"Redis Single Producer Action">>,
+            <<"redis">> => #{
+                summary => <<"Redis Action">>,
                 value => action_example(single, Method)
             }
-        },
-        #{
-            <<"redis_sentinel_producer">> => #{
-                summary => <<"Redis Sentinel Producer Action">>,
-                value => action_example(sentinel, Method)
-            }
-        },
-        #{
-            <<"redis_cluster_producer">> => #{
-                summary => <<"Redis Cluster Producer Action">>,
-                value => action_example(cluster, Method)
-            }
         }
     ].
 

+ 4 - 2
apps/emqx_conf/src/emqx_conf_schema.erl

@@ -76,8 +76,10 @@
 -define(DEFAULT_MAX_PORTS, 1024 * 1024).
 
 %% Callback to upgrade config after loaded from config file but before validation.
-upgrade_raw_conf(RawConf) ->
-    emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf).
+upgrade_raw_conf(Raw0) ->
+    Raw1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(Raw0),
+    emqx_bridge_v2_schema:actions_convert_from_connectors(Raw1).
+
 
 namespace() -> emqx.
 

+ 23 - 34
apps/emqx_connector/src/emqx_connector.erl

@@ -50,6 +50,7 @@
 ]).
 
 -define(ROOT_KEY, connectors).
+-define(ENABLE_OR_DISABLE(A), (A =:= disable orelse A =:= enable)).
 
 load() ->
     Connectors = emqx:get_config([?ROOT_KEY], #{}),
@@ -107,22 +108,24 @@ config_key_path() ->
 
 pre_config_update([?ROOT_KEY], RawConf, RawConf) ->
     {ok, RawConf};
-pre_config_update([?ROOT_KEY], {force_update, NewConf}, RawConf) ->
-    pre_config_update([?ROOT_KEY], NewConf, RawConf);
 pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
     case multi_validate_connector_names(NewConf) of
-        ok ->
-            {ok, convert_certs(NewConf)};
-        Error ->
-            Error
+        ok -> {ok, convert_certs(NewConf)};
+        Error -> Error
     end;
-pre_config_update(_, {_Oper, _, _}, undefined) ->
-    {error, connector_not_found};
-pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
+pre_config_update([?ROOT_KEY, Type, Name], Oper, undefined)
+    when ?ENABLE_OR_DISABLE(Oper) ->
+    {error,  #{
+        reason => <<"connector_not_found">>,
+        connector_name => Name,
+        connector_type => Type
+    }};
+pre_config_update([?ROOT_KEY, _Type, _Name], Oper, OldConfig)
+    when ?ENABLE_OR_DISABLE(Oper) ->
     %% to save the 'enable' to the config files
     {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
-pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
-    case validate_connector_name_in_config(Path) of
+pre_config_update([?ROOT_KEY, _Type, Name] = Path, Conf = #{}, _OldConfig) ->
+    case validate_connector_name(Name) of
         ok ->
             case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of
                 {error, Reason} ->
@@ -137,18 +140,11 @@ pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) ->
 operation_to_enable(disable) -> false;
 operation_to_enable(enable) -> true.
 
-post_config_update([?ROOT_KEY], {force_update, _}, NewConf, OldConf, _AppEnv) ->
-    #{added := Added, removed := Removed, changed := Updated} =
-        diff_confs(NewConf, OldConf),
-    perform_connector_changes(Removed, Added, Updated);
 post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
-    #{added := Added, removed := Removed, changed := Updated} =
-        diff_confs(NewConf, OldConf),
+    #{added := Added, removed := Removed, changed := Updated} = diff_confs(NewConf, OldConf),
     case ensure_no_channels(Removed) of
-        ok ->
-            perform_connector_changes(Removed, Added, Updated);
-        {error, Error} ->
-            {error, Error}
+        ok -> perform_connector_changes(Removed, Added, Updated);
+        {error, Error} -> {error, Error}
     end;
 post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
     case emqx_connector_resource:get_channels(Type, Name) of
@@ -159,11 +155,13 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
         {ok, Channels} ->
             {error, {active_channels, Channels}}
     end;
+%% create a new connector
 post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, undefined, _AppEnvs) ->
     ResOpts = emqx_resource:fetch_creation_opts(NewConf),
     ok = emqx_connector_resource:create(Type, Name, NewConf, ResOpts),
     ?tp(connector_post_config_update_done, #{}),
     ok;
+%% update an existing connector
 post_config_update([?ROOT_KEY, Type, Name], _Req, NewConf, OldConf, _AppEnvs) ->
     ResOpts = emqx_resource:fetch_creation_opts(NewConf),
     ok = emqx_connector_resource:update(Type, Name, {OldConf, NewConf}, ResOpts),
@@ -226,12 +224,10 @@ lookup(Type, Name, RawConf) ->
 get_metrics(Type, Name) ->
     emqx_resource:get_metrics(emqx_connector_resource:resource_id(Type, Name)).
 
-disable_enable(Action, ConnectorType, ConnectorName) when
-    Action =:= disable; Action =:= enable
-->
+disable_enable(Action, ConnectorType, ConnectorName) when ?ENABLE_OR_DISABLE(Action) ->
     emqx_conf:update(
         config_key_path() ++ [ConnectorType, ConnectorName],
-        {Action, ConnectorType, ConnectorName},
+        Action,
         #{override_to => cluster}
     ).
 
@@ -250,7 +246,7 @@ create(ConnectorType, ConnectorName, RawConf) ->
 
 remove(ConnectorType, ConnectorName) ->
     ?SLOG(debug, #{
-        brige_action => remove,
+        bridge_action => remove,
         connector_type => ConnectorType,
         connector_name => ConnectorName
     }),
@@ -293,6 +289,7 @@ import_config(RawConf) ->
     ConnectorsConf = maps:get(<<"connectors">>, RawConf, #{}),
     OldConnectorsConf = emqx:get_raw_config(RootKeyPath, #{}),
     MergedConf = merge_confs(OldConnectorsConf, ConnectorsConf),
+    %% using merge strategy, deletions should not be performed within the post_config_update/5.
     case emqx_conf:update(RootKeyPath, MergedConf, #{override_to => cluster}) of
         {ok, #{raw_config := NewRawConf}} ->
             {ok, #{root_key => ?ROOT_KEY, changed => changed_paths(OldConnectorsConf, NewRawConf)}};
@@ -490,14 +487,6 @@ validate_connector_name(ConnectorName) ->
             {error, Error}
     end.
 
-validate_connector_name_in_config(Path) ->
-    case Path of
-        [?ROOT_KEY, _ConnectorType, ConnectorName] ->
-            validate_connector_name(ConnectorName);
-        _ ->
-            ok
-    end.
-
 multi_validate_connector_names(Conf) ->
     ConnectorTypeAndNames =
         [

+ 3 - 1
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -136,14 +136,16 @@ create(Type, Name, Conf0, Opts) ->
         config => emqx_utils:redact(Conf0)
     }),
     TypeBin = bin(Type),
+    ResourceId = resource_id(Type, Name),
     Conf = Conf0#{connector_type => TypeBin, connector_name => Name},
     {ok, _Data} = emqx_resource:create_local(
-        resource_id(Type, Name),
+        ResourceId,
         <<"emqx_connector">>,
         ?MODULE:connector_to_resource_type(Type),
         parse_confs(TypeBin, Name, Conf),
         parse_opts(Conf, Opts)
     ),
+    _ = emqx_alarm:ensure_deactivated(ResourceId),
     ok.
 
 update(ConnectorId, {OldConf, Conf}) ->