瀏覽代碼

fix: cant replace source conf

zhongwencool 1 年之前
父節點
當前提交
d7c01a4fe0

+ 55 - 0
apps/emqx_bridge_rabbitmq/test/emqx_bridge_rabbitmq_v2_SUITE.erl

@@ -12,6 +12,7 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("stdlib/include/assert.hrl").
 -include_lib("amqp_client/include/amqp_client.hrl").
+-import(emqx_config_SUITE, [prepare_conf_file/3]).
 
 -import(emqx_bridge_rabbitmq_test_utils, [
     rabbit_mq_exchange/0,
@@ -317,6 +318,60 @@ t_action_not_exist_exchange(_Config) ->
     ?assertNot(lists:any(Any, ActionsAfterDelete), ActionsAfterDelete),
     ok.
 
+t_replace_action_source(Config) ->
+    Action = #{<<"rabbitmq">> => #{<<"my_action">> => rabbitmq_action()}},
+    Source = #{<<"rabbitmq">> => #{<<"my_source">> => rabbitmq_source()}},
+    ConnectorName = atom_to_binary(?MODULE),
+    Connector = #{<<"rabbitmq">> => #{ConnectorName => rabbitmq_connector(get_rabbitmq(Config))}},
+    Rabbitmq = #{
+        <<"actions">> => Action,
+        <<"sources">> => Source,
+        <<"connectors">> => Connector
+    },
+    ConfBin0 = hocon_pp:do(Rabbitmq, #{}),
+    ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
+    ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile0])),
+    ?assertMatch(
+        #{<<"rabbitmq">> := #{<<"my_action">> := _}},
+        emqx_config:get_raw([<<"actions">>]),
+        Action
+    ),
+    ?assertMatch(
+        #{<<"rabbitmq">> := #{<<"my_source">> := _}},
+        emqx_config:get_raw([<<"sources">>]),
+        Source
+    ),
+    ?assertMatch(
+        #{<<"rabbitmq">> := #{ConnectorName := _}},
+        emqx_config:get_raw([<<"connectors">>]),
+        Connector
+    ),
+
+    Empty = #{
+        <<"actions">> => #{},
+        <<"sources">> => #{},
+        <<"connectors">> => #{}
+    },
+    ConfBin1 = hocon_pp:do(Empty, #{}),
+    ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
+    ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile1])),
+
+    ?assertEqual(#{}, emqx_config:get_raw([<<"actions">>])),
+    ?assertEqual(#{}, emqx_config:get_raw([<<"sources">>])),
+    ?assertMatch(#{}, emqx_config:get_raw([<<"connectors">>])),
+
+    %% restore connectors
+    Rabbitmq2 = #{<<"connectors">> => Connector},
+    ConfBin2 = hocon_pp:do(Rabbitmq2, #{}),
+    ConfFile2 = prepare_conf_file(?FUNCTION_NAME, ConfBin2, Config),
+    ?assertMatch(ok, emqx_conf_cli:conf(["load", "--replace", ConfFile2])),
+    ?assertMatch(
+        #{<<"rabbitmq">> := #{ConnectorName := _}},
+        emqx_config:get_raw([<<"connectors">>]),
+        Connector
+    ),
+    ok.
+
 waiting_for_disconnected_alarms(InstanceId) ->
     waiting_for_disconnected_alarms(InstanceId, 0).
 

+ 33 - 25
apps/emqx_conf/src/emqx_conf_cli.erl

@@ -245,10 +245,11 @@ load_config_from_raw(RawConf0, Opts) ->
     case check_config(RawConf1) of
         {ok, RawConf} ->
             %% It has been ensured that the connector is always the first configuration to be updated.
-            %% However, when deleting the connector, we need to clean up the dependent actions first;
+            %% However, when deleting the connector, we need to clean up the dependent actions/sources first;
             %% otherwise, the deletion will fail.
-            %% notice: we can't create a action before connector.
-            uninstall_actions(RawConf, Opts),
+            %% notice: we can't create a action/sources before connector.
+            uninstall(<<"actions">>, RawConf, Opts),
+            uninstall(<<"sources">>, RawConf, Opts),
             Error =
                 lists:filtermap(
                     fun({K, V}) ->
@@ -288,27 +289,33 @@ load_config_from_raw(RawConf0, Opts) ->
             {error, Errors}
     end.
 
-uninstall_actions(#{<<"actions">> := New}, #{mode := replace}) ->
-    Old = emqx_conf:get_raw([<<"actions">>], #{}),
-    #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
-    maps:foreach(
-        fun({Type, Name}, _) ->
-            case emqx_bridge_v2:remove(Type, Name) of
-                ok ->
-                    ok;
-                {error, Reason} ->
-                    ?SLOG(error, #{
-                        msg => "failed_to_remove_action",
-                        type => Type,
-                        name => Name,
-                        error => Reason
-                    })
-            end
-        end,
-        Removed
-    );
-%% we don't delete things when in merge mode or without actions key.
-uninstall_actions(_RawConf, _) ->
+uninstall(ActionOrSource, Conf, #{mode := replace}) ->
+    case maps:find(ActionOrSource, Conf) of
+        {ok, New} ->
+            Old = emqx_conf:get_raw([ActionOrSource], #{}),
+            ActionOrSourceAtom = binary_to_existing_atom(ActionOrSource),
+            #{removed := Removed} = emqx_bridge_v2:diff_confs(New, Old),
+            maps:foreach(
+                fun({Type, Name}, _) ->
+                    case emqx_bridge_v2:remove(ActionOrSourceAtom, Type, Name) of
+                        ok ->
+                            ok;
+                        {error, Reason} ->
+                            ?SLOG(error, #{
+                                msg => "failed_to_remove",
+                                type => Type,
+                                name => Name,
+                                error => Reason
+                            })
+                    end
+                end,
+                Removed
+            );
+        error ->
+            ok
+    end;
+%% we don't delete things when in merge mode or without actions/sources key.
+uninstall(_, _RawConf, _) ->
     ok.
 
 update_config_cluster(
@@ -481,7 +488,8 @@ filter_readonly_config(Raw) ->
     end.
 
 reload_config(AllConf, Opts) ->
-    uninstall_actions(AllConf, Opts),
+    uninstall(<<"actions">>, AllConf, Opts),
+    uninstall(<<"sources">>, AllConf, Opts),
     Fold = fun({Key, Conf}, Acc) ->
         case update_config_local(Key, Conf, Opts) of
             ok ->

+ 2 - 0
apps/emqx_connector/src/emqx_connector.erl

@@ -473,6 +473,8 @@ ensure_no_channels(Configs) ->
             fun({Type, ConnectorName}) ->
                 fun(_) ->
                     case emqx_connector_resource:get_channels(Type, ConnectorName) of
+                        {error, not_found} ->
+                            ok;
                         {ok, []} ->
                             ok;
                         {ok, Channels} ->

+ 1 - 0
changes/ce/fix-12715.en.md

@@ -0,0 +1 @@
+Fix replacing sources crash if connector has active channels