Explorar o código

Merge pull request #8902 from terry-xiaoyu/check_dependent_actions

Check dependent actions before removing a data-bridge
Xinyu Liu %!s(int64=3) %!d(string=hai) anos
pai
achega
a67484422d

+ 19 - 0
apps/emqx_bridge/src/emqx_bridge.erl

@@ -37,6 +37,7 @@
     create/3,
     create/3,
     disable_enable/3,
     disable_enable/3,
     remove/2,
     remove/2,
+    check_deps_and_remove/3,
     list/0
     list/0
 ]).
 ]).
 
 
@@ -247,6 +248,24 @@ remove(BridgeType, BridgeName) ->
         #{override_to => cluster}
         #{override_to => cluster}
     ).
     ).
 
 
+check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
+    Id = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
+    %% NOTE: This violates the design: Rule depends on data-bridge but not vice versa.
+    case emqx_rule_engine:get_rule_ids_by_action(Id) of
+        [] ->
+            remove(BridgeType, BridgeName);
+        Rules when RemoveDeps =:= false ->
+            {error, {rules_deps_on_this_bridge, Rules}};
+        Rules when RemoveDeps =:= true ->
+            lists:foreach(
+                fun(R) ->
+                    emqx_rule_engine:ensure_action_removed(R, Id)
+                end,
+                Rules
+            ),
+            remove(BridgeType, BridgeName)
+    end.
+
 %%========================================================================================
 %%========================================================================================
 %% Helper functions
 %% Helper functions
 %%========================================================================================
 %%========================================================================================

+ 21 - 5
apps/emqx_bridge/src/emqx_bridge_api.erl

@@ -331,6 +331,7 @@ schema("/bridges/:id") ->
             responses => #{
             responses => #{
                 204 => <<"Bridge deleted">>,
                 204 => <<"Bridge deleted">>,
                 400 => error_schema(['INVALID_ID'], "Update bridge failed"),
                 400 => error_schema(['INVALID_ID'], "Update bridge failed"),
+                403 => error_schema('FORBIDDEN_REQUEST', "forbidden operation"),
                 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
                 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
             }
             }
         }
         }
@@ -424,13 +425,28 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
                 {404, error_msg('NOT_FOUND', <<"bridge not found">>)}
                 {404, error_msg('NOT_FOUND', <<"bridge not found">>)}
         end
         end
     );
     );
-'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
+'/bridges/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
+    AlsoDeleteActs =
+        case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of
+            <<"true">> -> true;
+            true -> true;
+            _ -> false
+        end,
     ?TRY_PARSE_ID(
     ?TRY_PARSE_ID(
         Id,
         Id,
-        case emqx_bridge:remove(BridgeType, BridgeName) of
-            {ok, _} -> {204};
-            {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
-            {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)}
+        case emqx_bridge:check_deps_and_remove(BridgeType, BridgeName, AlsoDeleteActs) of
+            {ok, _} ->
+                204;
+            {error, {rules_deps_on_this_bridge, RuleIds}} ->
+                {403,
+                    error_msg(
+                        'FORBIDDEN_REQUEST',
+                        {<<"There're some rules dependent on this bridge">>, RuleIds}
+                    )};
+            {error, timeout} ->
+                {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
+            {error, Reason} ->
+                {500, error_msg('INTERNAL_ERROR', Reason)}
         end
         end
     ).
     ).
 
 

+ 80 - 2
apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

@@ -61,14 +61,18 @@ init_per_suite(Config) ->
     _ = application:stop(emqx_resource),
     _ = application:stop(emqx_resource),
     _ = application:stop(emqx_connector),
     _ = application:stop(emqx_connector),
     ok = emqx_common_test_helpers:start_apps(
     ok = emqx_common_test_helpers:start_apps(
-        [emqx_bridge, emqx_dashboard],
+        [emqx_rule_engine, emqx_bridge, emqx_dashboard],
         fun set_special_configs/1
         fun set_special_configs/1
     ),
     ),
+    ok = emqx_common_test_helpers:load_config(
+        emqx_rule_engine_schema,
+        <<"rule_engine {rules {}}">>
+    ),
     ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT),
     ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT),
     Config.
     Config.
 
 
 end_per_suite(_Config) ->
 end_per_suite(_Config) ->
-    emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_dashboard]),
+    emqx_common_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_dashboard]),
     ok.
     ok.
 
 
 set_special_configs(emqx_dashboard) ->
 set_special_configs(emqx_dashboard) ->
@@ -301,6 +305,80 @@ t_http_crud_apis(Config) ->
     ),
     ),
     ok.
     ok.
 
 
+t_check_dependent_actions_on_delete(Config) ->
+    Port = ?config(port, Config),
+    %% assert we there's no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a webhook bridge, using POST
+    %% POST /bridges/ will create a bridge
+    URL1 = ?URL(Port, "path1"),
+    Name = <<"t_http_crud_apis">>,
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
+    {ok, 201, _} = request(
+        post,
+        uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
+    ),
+    {ok, 201, Rule} = request(
+        post,
+        uri(["rules"]),
+        #{
+            <<"name">> => <<"t_http_crud_apis">>,
+            <<"enable">> => true,
+            <<"actions">> => [BridgeID],
+            <<"sql">> => <<"SELECT * from \"t\"">>
+        }
+    ),
+    #{<<"id">> := RuleId} = jsx:decode(Rule),
+    %% delete the bridge should fail because there is a rule depenents on it
+    {ok, 403, _} = request(delete, uri(["bridges", BridgeID]), []),
+    %% delete the rule first
+    {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
+    %% then delete the bridge is OK
+    {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+    ok.
+
+t_cascade_delete_actions(Config) ->
+    Port = ?config(port, Config),
+    %% assert we there's no bridges at first
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
+    %% then we add a webhook bridge, using POST
+    %% POST /bridges/ will create a bridge
+    URL1 = ?URL(Port, "path1"),
+    Name = <<"t_http_crud_apis">>,
+    BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
+    {ok, 201, _} = request(
+        post,
+        uri(["bridges"]),
+        ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
+    ),
+    {ok, 201, Rule} = request(
+        post,
+        uri(["rules"]),
+        #{
+            <<"name">> => <<"t_http_crud_apis">>,
+            <<"enable">> => true,
+            <<"actions">> => [BridgeID],
+            <<"sql">> => <<"SELECT * from \"t\"">>
+        }
+    ),
+    #{<<"id">> := RuleId} = jsx:decode(Rule),
+    %% delete the bridge will also delete the actions from the rules
+    {ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []),
+    {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+    {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
+    ?assertMatch(
+        #{
+            <<"actions">> := []
+        },
+        jsx:decode(Rule1)
+    ),
+    {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
+    ok.
+
 t_start_stop_bridges_node(Config) ->
 t_start_stop_bridges_node(Config) ->
     do_start_stop_bridges(node, Config).
     do_start_stop_bridges(node, Config).
 
 

+ 1 - 1
apps/emqx_dashboard/test/emqx_dashboard_api_test_helpers.erl

@@ -92,7 +92,7 @@ request(Username, Method, Url, Body) ->
 uri() -> uri([]).
 uri() -> uri([]).
 uri(Parts) when is_list(Parts) ->
 uri(Parts) when is_list(Parts) ->
     NParts = [E || E <- Parts],
     NParts = [E || E <- Parts],
-    ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
+    ?HOST ++ to_list(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
 
 
 auth_header(Username) ->
 auth_header(Username) ->
     Password = <<"public">>,
     Password = <<"public">>,

+ 61 - 0
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -46,6 +46,8 @@
     get_rules/0,
     get_rules/0,
     get_rules_for_topic/1,
     get_rules_for_topic/1,
     get_rules_with_same_event/1,
     get_rules_with_same_event/1,
+    get_rule_ids_by_action/1,
+    ensure_action_removed/2,
     get_rules_ordered_by_ts/0
     get_rules_ordered_by_ts/0
 ]).
 ]).
 
 
@@ -99,6 +101,8 @@
 
 
 -define(RATE_METRICS, ['matched']).
 -define(RATE_METRICS, ['matched']).
 
 
+-type action_name() :: binary() | #{function := binary()}.
+
 config_key_path() ->
 config_key_path() ->
     [rule_engine, rules].
     [rule_engine, rules].
 
 
@@ -208,6 +212,46 @@ get_rules_with_same_event(Topic) ->
         lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
         lists:any(fun(T) -> is_of_event_name(EventName, T) end, From)
     ].
     ].
 
 
+-spec get_rule_ids_by_action(action_name()) -> [rule_id()].
+get_rule_ids_by_action(ActionName) when is_binary(ActionName) ->
+    [
+        Id
+     || #{actions := Acts, id := Id} <- get_rules(),
+        lists:any(fun(A) -> A =:= ActionName end, Acts)
+    ];
+get_rule_ids_by_action(#{function := FuncName}) when is_binary(FuncName) ->
+    {Mod, Fun} =
+        case string:split(FuncName, ":", leading) of
+            [M, F] -> {binary_to_module(M), F};
+            [F] -> {emqx_rule_actions, F}
+        end,
+    [
+        Id
+     || #{actions := Acts, id := Id} <- get_rules(),
+        contains_actions(Acts, Mod, Fun)
+    ].
+
+-spec ensure_action_removed(rule_id(), action_name()) -> ok.
+ensure_action_removed(RuleId, ActionName) ->
+    FilterFunc =
+        fun
+            (Func, Func) -> false;
+            (#{<<"function">> := Func}, #{function := Func}) -> false;
+            (_, _) -> true
+        end,
+    case emqx:get_raw_config([rule_engine, rules, RuleId], not_found) of
+        not_found ->
+            ok;
+        #{<<"actions">> := Acts} ->
+            NewActs = [AName || AName <- Acts, FilterFunc(AName, ActionName)],
+            {ok, _} = emqx_conf:update(
+                emqx_rule_engine:config_key_path() ++ [RuleId, actions],
+                NewActs,
+                #{override_to => cluster}
+            ),
+            ok
+    end.
+
 is_of_event_name(EventName, Topic) ->
 is_of_event_name(EventName, Topic) ->
     EventName =:= emqx_rule_events:event_name(Topic).
     EventName =:= emqx_rule_events:event_name(Topic).
 
 
@@ -413,3 +457,20 @@ now_ms() ->
 
 
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(B) when is_binary(B) -> B.
 bin(B) when is_binary(B) -> B.
+
+binary_to_module(ModName) ->
+    try
+        binary_to_existing_atom(ModName, utf8)
+    catch
+        error:badarg ->
+            not_exist_mod
+    end.
+
+contains_actions(Actions, Mod0, Func0) ->
+    lists:any(
+        fun
+            (#{mod := Mod, func := Func}) when Mod =:= Mod0; Func =:= Func0 -> true;
+            (_) -> false
+        end,
+        Actions
+    ).

+ 2 - 6
apps/emqx_rule_engine/src/emqx_rule_sqltester.erl

@@ -19,7 +19,6 @@
 
 
 -export([
 -export([
     test/1,
     test/1,
-    echo_action/2,
     get_selected_data/3
     get_selected_data/3
 ]).
 ]).
 
 
@@ -70,7 +69,8 @@ test_rule(Sql, Select, Context, EventTopics) ->
         ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
         ok = emqx_rule_engine:clear_metrics_for_rule(RuleId)
     end.
     end.
 
 
-get_selected_data(Selected, _Envs, _Args) ->
+get_selected_data(Selected, Envs, Args) ->
+    ?TRACE("RULE", "testing_rule_sql_ok", #{selected => Selected, envs => Envs, args => Args}),
     {ok, Selected}.
     {ok, Selected}.
 
 
 is_publish_topic(<<"$events/", _/binary>>) -> false;
 is_publish_topic(<<"$events/", _/binary>>) -> false;
@@ -84,10 +84,6 @@ flatten([{ok, D}]) ->
 flatten([D | L]) when is_list(D) ->
 flatten([D | L]) when is_list(D) ->
     [D0 || {ok, D0} <- D] ++ flatten(L).
     [D0 || {ok, D0} <- D] ++ flatten(L).
 
 
-echo_action(Data, Envs) ->
-    ?TRACE("RULE", "testing_rule_sql_ok", #{data => Data, envs => Envs}),
-    {ok, Data}.
-
 fill_default_values(Event, Context) ->
 fill_default_values(Event, Context) ->
     maps:merge(envs_examp(Event), Context).
     maps:merge(envs_examp(Event), Context).
 
 

+ 102 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -52,7 +52,9 @@ groups() ->
             t_create_existing_rule,
             t_create_existing_rule,
             t_get_rules_for_topic,
             t_get_rules_for_topic,
             t_get_rules_for_topic_2,
             t_get_rules_for_topic_2,
-            t_get_rules_with_same_event
+            t_get_rules_with_same_event,
+            t_get_rule_ids_by_action,
+            t_ensure_action_removed
         ]},
         ]},
         {runtime, [], [
         {runtime, [], [
             t_match_atom_and_binary,
             t_match_atom_and_binary,
@@ -431,6 +433,105 @@ t_get_rules_with_same_event(_Config) ->
     ]),
     ]),
     ok.
     ok.
 
 
+t_get_rule_ids_by_action(_) ->
+    ID = <<"t_get_rule_ids_by_action">>,
+    Rule1 = #{
+        enable => false,
+        id => ID,
+        sql => <<"SELECT * FROM \"t\"">>,
+        from => [<<"t">>],
+        fields => [<<"*">>],
+        is_foreach => false,
+        conditions => {},
+        actions => [
+            #{mod => emqx_rule_actions, func => console, args => #{}},
+            #{mod => emqx_rule_actions, func => republish, args => #{}},
+            <<"mqtt:my_mqtt_bridge">>,
+            <<"mysql:foo">>
+        ],
+        description => ID,
+        created_at => erlang:system_time(millisecond)
+    },
+    ok = insert_rules([Rule1]),
+    ?assertMatch(
+        [ID],
+        emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:console">>})
+    ),
+    ?assertMatch(
+        [ID],
+        emqx_rule_engine:get_rule_ids_by_action(#{function => <<"emqx_rule_actions:republish">>})
+    ),
+    ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(#{function => <<"some_mod:fun">>})),
+    ?assertMatch([ID], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:foo">>)),
+    ?assertEqual([], emqx_rule_engine:get_rule_ids_by_action(<<"mysql:not_exists">>)),
+    ok = delete_rules_by_ids([<<"t_get_rule_ids_by_action">>]).
+
+t_ensure_action_removed(_) ->
+    Id = <<"t_ensure_action_removed">>,
+    GetSelectedData = <<"emqx_rule_sqltester:get_selected_data">>,
+    emqx:update_config(
+        [rule_engine, rules],
+        #{
+            Id => #{
+                <<"actions">> => [
+                    #{<<"function">> => GetSelectedData},
+                    #{<<"function">> => <<"console">>},
+                    #{<<"function">> => <<"republish">>},
+                    <<"mysql:foo">>,
+                    <<"mqtt:bar">>
+                ],
+                <<"description">> => <<"">>,
+                <<"sql">> => <<"SELECT * FROM \"t/#\"">>
+            }
+        }
+    ),
+    ?assertMatch(
+        #{
+            <<"actions">> := [
+                #{<<"function">> := GetSelectedData},
+                #{<<"function">> := <<"console">>},
+                #{<<"function">> := <<"republish">>},
+                <<"mysql:foo">>,
+                <<"mqtt:bar">>
+            ]
+        },
+        emqx:get_raw_config([rule_engine, rules, Id])
+    ),
+    ok = emqx_rule_engine:ensure_action_removed(Id, #{function => <<"console">>}),
+    ?assertMatch(
+        #{
+            <<"actions">> := [
+                #{<<"function">> := GetSelectedData},
+                #{<<"function">> := <<"republish">>},
+                <<"mysql:foo">>,
+                <<"mqtt:bar">>
+            ]
+        },
+        emqx:get_raw_config([rule_engine, rules, Id])
+    ),
+    ok = emqx_rule_engine:ensure_action_removed(Id, <<"mysql:foo">>),
+    ?assertMatch(
+        #{
+            <<"actions">> := [
+                #{<<"function">> := GetSelectedData},
+                #{<<"function">> := <<"republish">>},
+                <<"mqtt:bar">>
+            ]
+        },
+        emqx:get_raw_config([rule_engine, rules, Id])
+    ),
+    ok = emqx_rule_engine:ensure_action_removed(Id, #{function => GetSelectedData}),
+    ?assertMatch(
+        #{
+            <<"actions">> := [
+                #{<<"function">> := <<"republish">>},
+                <<"mqtt:bar">>
+            ]
+        },
+        emqx:get_raw_config([rule_engine, rules, Id])
+    ),
+    emqx:remove_config([rule_engine, rules, Id]).
+
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------
 %% Test cases for rule runtime
 %% Test cases for rule runtime
 %%------------------------------------------------------------------------------
 %%------------------------------------------------------------------------------