Sfoglia il codice sorgente

fix(rule_engine): resource update problem and add cli (#3991)

bignullnull 5 anni fa
parent
commit
67bfb4f10a

+ 53 - 19
apps/emqx_rule_engine/src/emqx_rule_engine.erl

@@ -39,10 +39,11 @@
         , get_resource_status/1
         , get_resource_params/1
         , delete_resource/1
-        , update_resource/1
+        , update_resource/2
         ]).
 
 -export([ init_resource/4
+        , init_resource/5
         , init_action/4
         , clear_resource/3
         , clear_rule/1
@@ -244,27 +245,60 @@ create_resource(#{type := Type, config := Config0} = Params) ->
             {error, {resource_type_not_found, Type}}
     end.
 
-update_resource(#{id := Id, type := Type, config := NewConfig,
-                  description := Description} = NewResource) ->
+-spec(update_resource(resource_id(), map()) -> ok | {error, Reason :: term()}).
+update_resource(ResId, NewParams) ->
+    try
+        lists:foreach(fun(#rule{id = RuleId, enabled = Enabled, actions = Actions}) ->
+                    lists:foreach(
+                        fun (#action_instance{args = #{<<"$resource">> := ResId1}})
+                            when ResId =:= ResId1, Enabled == true ->
+                                throw({dependency_exists, RuleId});
+                            (_) -> ok
+                        end, Actions)
+                    end, ets:tab2list(?RULE_TAB)),
+        do_update_resource_check(ResId, NewParams)
+    catch _ : Reason ->
+        {error, Reason}
+    end.
+
+do_update_resource_check(Id, NewParams) ->
+    case emqx_rule_registry:find_resource(Id) of
+        {ok, #resource{id = Id,
+                       type = Type,
+                       config = OldConfig,
+                       description = OldDescription} = _OldResource} ->
+                try
+                    do_update_resource(#{id => Id,
+                                        config => case maps:find(<<"config">>, NewParams) of
+                                                       {ok, NewConfig} -> NewConfig;
+                                                       error -> OldConfig
+                                                  end,
+                                        type => Type,
+                                        description => case maps:find(<<"description">>, NewParams) of
+                                                            {ok, NewDescription} -> NewDescription;
+                                                            error -> OldDescription
+                                                       end}),
+                    ok
+                catch _ : Reason ->
+                    {error, Reason}
+                end;
+        _Other ->
+            {error, not_found}
+    end.
+
+do_update_resource(#{id := Id, type := Type, description:= NewDescription, config:= NewConfig}) ->
     case emqx_rule_registry:find_resource_type(Type) of
         {ok, #resource_type{on_create = {Module, Create},
+                            on_destroy = {Module, Destroy},
                             params_spec = ParamSpec}} ->
             Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
-            case delete_resource(Id) of
-                {error, not_found} -> {error, not_found};
-                _ -> %% deletion might fail because of an associted rule.
-                    emqx_rule_registry:add_resource(
-                        #resource{
-                            id = Id,
-                            config = Config,
-                            type = Type,
-                            description = Description,
-                            created_at = erlang:system_time(millisecond)}),
-                    catch cluster_call(init_resource, [Module, Create, Id, Config, true]),
-                    {ok, NewResource}
-            end;
-        not_found ->
-            {error, {resource_type_not_found, Type}}
+            cluster_call(init_resource, [Module, Create, Id, Config]),
+            emqx_rule_registry:add_resource(#resource{id = Id,
+                                                      type = Type,
+                                                      config = Config,
+                                                      description = NewDescription,
+                                                      created_at = erlang:system_time(millisecond)}),
+            cluster_call(clear_resource, [Module, Destroy, Id])
     end.
 
 -spec(start_resource(resource_id()) -> ok | {error, Reason :: term()}).
@@ -499,7 +533,7 @@ init_resource(Module, OnCreate, ResId, Config, Restart) ->
     Params = ?RAISE(
         Module:OnCreate(ResId, Config),
         Restart andalso
-            timer:apply_after(timer:seconds(60), ?MODULE, do_init_resource,
+            timer:apply_after(timer:seconds(60), ?MODULE, init_resource,
                               [Module, OnCreate, ResId, Config, Restart]),
         {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
     ResParams = #resource_params{id = ResId,

+ 34 - 20
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl

@@ -327,27 +327,34 @@ start_resource(#{id := Id}, _Params) ->
             return({error, 400, ?ERR_BADARGS(Reason)})
     end.
 
-update_resource(#{id := Id}, Params) ->
-    case parse_resource_params(Params) of
-        {ok, ParsedParams} ->
-            case emqx_rule_registry:find_resource(Id) of
-                {ok, #resource{id = Id, type = Type} = _OldResource} ->
-                    Config = maps:get(config, ParsedParams),
-                    Description = maps:get(description, ParsedParams),
-                    _ = emqx_rule_engine:update_resource(
-                        #{id => Id,
-                        config => Config,
-                        type => Type,
-                        description => Description,
-                        created_at => erlang:system_time(millisecond)}),
-                    return(ok);
-                _Other ->
-                    return({error, 400, ?ERR_NO_RESOURCE(Id)})
-            end;
+update_resource(#{id := Id}, NewParams) ->
+    P1 = case proplists:get_value(<<"description">>, NewParams) of
+            undefined -> #{};
+            Value -> #{<<"description">> => Value}
+    end,
+    P2 = case proplists:get_value(<<"config">>, NewParams) of
+            undefined -> #{};
+            <<"{}">> -> #{};
+            Map -> #{<<"config">> => ?RAISE(maps:from_list(Map), {invalid_config, Map})}
+    end,
+    case emqx_rule_engine:update_resource(Id, maps:merge(P1, P2)) of
+        ok ->
+            return(ok);
+        {error, not_found} ->
+            ?LOG(error, "resource not found: ~0p", [Id]),
+            return({error, 400, list_to_binary("resource not found:" ++ binary_to_list(Id))});
+        {error, {init_resource_failure, _}} ->
+            ?LOG(error, "init resource failure: ~0p", [Id]),
+            return({error, 500, list_to_binary("init resource failure:" ++ binary_to_list(Id))});
+        {error, {dependency_exists, RuleId}} ->
+            ?LOG(error, "dependency exists: ~0p", [RuleId]),
+            return({error, 500, list_to_binary("resource dependency by rule:" ++ binary_to_list(RuleId))});
         {error, Reason} ->
-            return({error, 400, ?ERR_BADARGS(Reason)})
+            ?LOG(error, "update resource failed: ~0p", [Reason]),
+            return({error, 500, <<"update resource failed,error info have been written to logfile!">>})
     end.
 
+
 delete_resource(#{id := Id}, _Params) ->
     case emqx_rule_engine:delete_resource(Id) of
         ok -> return(ok);
@@ -524,7 +531,14 @@ parse_resource_params([_ | Params], Res) ->
     parse_resource_params(Params, Res).
 
 json_term_to_map(List) ->
-    emqx_json:decode(emqx_json:encode(List), [return_maps]).
+    Data = lists:map(fun({K, V}) ->
+                    case V of
+                        {} ->{K, [{}]};
+                        _ -> {K, V}
+                    end
+                 end,
+            List),
+    emqx_json:decode(emqx_json:encode(Data), [return_maps]).
 
 sort_by_title(action, Actions) ->
     sort_by(#action.title, Actions);
@@ -544,4 +558,4 @@ get_rule_metrics(Id) ->
 
 get_action_metrics(Id) ->
     [maps:put(node, Node, rpc:call(Node, emqx_rule_metrics, get_action_metrics, [Id]))
-     || Node <- ekka_mnesia:running_nodes()].
+     || Node <- ekka_mnesia:running_nodes()].

+ 33 - 2
apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl

@@ -44,6 +44,12 @@
         , {descr, $d, "descr", {binary, <<"">>}, "Description"}
         ]).
 
+-define(OPTSPEC_RESOURCES_UPDATE,
+        [ {id, undefined, undefined, binary, "The resource id"}
+        , {config, $c, "config", {binary, undefined}, "Config"}
+        , {description, $d, "descr", {binary, undefined}, "Description"}
+        ]).
+
 -define(OPTSPEC_RULES_CREATE,
         [ {sql, undefined, undefined, binary, "Filter Condition SQL"}
         , {actions, undefined, undefined, binary, "Action List in JSON format: [{\"name\": <action_name>, \"params\": {<key>: <value>}}]"}
@@ -61,7 +67,6 @@
         , {on_action_failed, $g, "on_action_failed", {atom, undefined}, "'continue' or 'stop' when an action in the rule fails"}
         , {descr, $d, "descr", {binary, undefined}, "Description"}
         ]).
-
 %%-----------------------------------------------------------------------------
 %% Load/Unload Commands
 %%-----------------------------------------------------------------------------
@@ -148,6 +153,7 @@ actions(_Usage) ->
 %%------------------------------------------------------------------------------
 %% 'resources' command
 %%------------------------------------------------------------------------------
+
 resources(["create" | Params]) ->
     with_opts(fun({Opts, _}) ->
                 case emqx_rule_engine:create_resource(make_resource(Opts)) of
@@ -158,6 +164,19 @@ resources(["create" | Params]) ->
                 end
               end, Params, ?OPTSPEC_RESOURCES_CREATE, {?FUNCTION_NAME, create});
 
+
+resources(["update" | Params]) ->
+    with_opts(fun({Opts, _}) ->
+        Id = maps:get(id, maps:from_list(Opts)),
+        Maps = make_updated_resource(Opts),
+        case emqx_rule_engine:update_resource(Id, Maps) of
+            ok ->
+                emqx_ctl:print("Resource update successfully~n");
+            {error, Reason} ->
+                emqx_ctl:print("update resource failed, reason: ~p!~n", [Reason])
+            end
+        end, Params, ?OPTSPEC_RESOURCES_UPDATE, {?FUNCTION_NAME, update});
+
 resources(["test" | Params]) ->
     with_opts(fun({Opts, _}) ->
                 case emqx_rule_engine:test_resource(make_resource(Opts)) of
@@ -192,7 +211,8 @@ resources(_Usage) ->
     emqx_ctl:usage([{"resources create", "Create a resource"},
                     {"resources list [-t <ResourceType>]", "List resources"},
                     {"resources show <ResourceId>", "Show a resource"},
-                    {"resources delete <ResourceId>", "Delete a resource"}
+                    {"resources delete <ResourceId>", "Delete a resource"},
+                    {"resources update <ResourceId> [-c <config>] [-d <description>]", "Update a resource"}
                    ]).
 
 %%------------------------------------------------------------------------------
@@ -302,6 +322,17 @@ make_resource(Opts) ->
           config => ?RAISE(emqx_json:decode(Config, [return_maps]), {invalid_config, Config}),
           description => get_value(descr, Opts)}, id, <<"">>, Opts).
 
+make_updated_resource(Opts) ->
+    P1 = case proplists:get_value(description, Opts) of
+            undefined -> #{};
+            Value -> #{<<"description">> => Value}
+    end,
+    P2 = case proplists:get_value(config, Opts) of
+            undefined -> #{};
+            Map -> #{<<"config">> => ?RAISE((emqx_json:decode(Map, [return_maps])), {invalid_config, Map})}
+    end,
+    maps:merge(P1, P2).
+
 printable_actions(Actions) when is_list(Actions) ->
     emqx_json:encode([#{id => Id, name => Name, params => Args,
                         metrics => get_action_metrics(Id),

+ 36 - 6
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -260,6 +260,7 @@ init_per_testcase(_TestCase, Config) ->
     %ct:pal("============ ~p", [ets:tab2list(emqx_resource_type)]),
     Config.
 
+
 end_per_testcase(t_events, Config) ->
     ets:delete(events_record_tab),
     ok = emqx_rule_registry:remove_rule(?config(hook_points_rules, Config)),
@@ -443,24 +444,53 @@ t_crud_resources_api(_Config) ->
     ResId = maps:get(id, Resources1),
     {ok, #{code := 0, data := Resources}} = emqx_rule_engine_api:list_resources(#{},[]),
     ?assert(length(Resources) > 0),
-
     {ok, #{code := 0, data := Resources2}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
     ?assertEqual(ResId, maps:get(id, Resources2)),
-
+    %
     {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId},
-                                                              [{<<"id">>, ResId},
-                                                               {<<"type">>, <<"built_in">>},
-                                                               {<<"config">>, [{<<"a">>, 2}]},
+                                                              [{<<"config">>, [{<<"a">>, 2}]},
                                                                {<<"description">>, <<"2">>}]),
     {ok, #{code := 0, data := Resources3}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
     ?assertEqual(ResId, maps:get(id, Resources3)),
     ?assertEqual(#{<<"a">> => 2}, maps:get(config, Resources3)),
     ?assertEqual(<<"2">>, maps:get(description, Resources3)),
-
+    {ok, #{code := 0, data := Resources3}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
+    ?assertEqual(ResId, maps:get(id, Resources3)),
+    %
+    {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId},
+                                                              [{<<"config">>, [{<<"a">>, 3}]}]),
+    {ok, #{code := 0, data := Resources4}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
+    ?assertEqual(ResId, maps:get(id, Resources4)),
+    ?assertEqual(#{<<"a">> => 3}, maps:get(config, Resources4)),
+    ?assertEqual(<<"2">>, maps:get(description, Resources4)),
+    % Only config
+    {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId},
+                                                              [{<<"config">>, [{<<"a">>, 1},
+                                                                               {<<"b">>, 2},
+                                                                               {<<"c">>, 3}]}]),
+    {ok, #{code := 0, data := Resources5}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
+    ?assertEqual(ResId, maps:get(id, Resources5)),
+    ?assertEqual(#{<<"a">> => 1, <<"b">> => 2, <<"c">> => 3}, maps:get(config, Resources5)),
+    ?assertEqual(<<"2">>, maps:get(description, Resources5)),
+    % Only description
+    {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId},
+                                                              [{<<"description">>, <<"new5">>}]),
+    {ok, #{code := 0, data := Resources6}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
+    ?assertEqual(ResId, maps:get(id, Resources6)),
+    ?assertEqual(#{<<"a">> => 1, <<"b">> => 2, <<"c">> => 3}, maps:get(config, Resources6)),
+    ?assertEqual(<<"new5">>, maps:get(description, Resources6)),
+    % None
+    {ok, #{code := 0}} = emqx_rule_engine_api:update_resource(#{id => ResId},[]),
+    {ok, #{code := 0, data := Resources7}} = emqx_rule_engine_api:show_resource(#{id => ResId},[]),
+    ?assertEqual(ResId, maps:get(id, Resources7)),
+    ?assertEqual(#{<<"a">> => 1, <<"b">> => 2, <<"c">> => 3}, maps:get(config, Resources7)),
+    ?assertEqual(<<"new5">>, maps:get(description, Resources7)),
+    %
     ?assertMatch({ok, #{code := 0}}, emqx_rule_engine_api:delete_resource(#{id => ResId},#{})),
     ?assertMatch({ok, #{code := 404}}, emqx_rule_engine_api:show_resource(#{id => ResId},[])),
     ok.
 
+
 t_list_resource_types_api(_Config) ->
     {ok, #{code := 0, data := ResourceTypes}} = emqx_rule_engine_api:list_resource_types(#{},[]),
     ?assert(length(ResourceTypes) > 0),