ソースを参照

feat(resource): add option 'force_create' to emqx_resource:create/4

Shawn 4 年 前
コミット
a879ec0f3a

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.erl

@@ -192,7 +192,7 @@ create(Type, Name, Conf) ->
     ?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
         config => Conf}),
     case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
-            parse_confs(Type, Name, Conf)) of
+            parse_confs(Type, Name, Conf), #{force_create => true}) of
         {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
         {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
         {error, Reason} -> {error, Reason}

+ 6 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -29,6 +29,12 @@
     metrics := emqx_plugin_libs_metrics:metrics()
 }.
 -type resource_group() :: binary().
+-type create_opts() :: #{
+        %% The emqx_resource:create/4 will return OK event if the Mod:on_start/2 fails,
+        %% the 'status' of the resource will be 'stopped' in this case.
+        %% Defaults to 'false'
+        force_create => boolean()
+    }.
 -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} |
     undefined.
 

+ 28 - 4
apps/emqx_resource/src/emqx_resource.erl

@@ -33,7 +33,9 @@
 
 -export([ check_config/2
         , check_and_create/3
+        , check_and_create/4
         , check_and_create_local/3
+        , check_and_create_local/4
         , check_and_recreate/4
         , check_and_recreate_local/4
         ]).
@@ -42,7 +44,9 @@
 %% provisional solution: rpc:multical to all the nodes for creating/updating/removing
 %% todo: replicate operations
 -export([ create/3 %% store the config and start the instance
+        , create/4
         , create_local/3
+        , create_local/4
         , create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially
         , create_dry_run_local/2
         , recreate/4 %% this will do create_dry_run, stop the old instance and start a new one
@@ -141,12 +145,22 @@ apply_query_after_calls(Funcs) ->
 -spec create(instance_id(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create(InstId, ResourceType, Config) ->
-    cluster_call(create_local, [InstId, ResourceType, Config]).
+    create(InstId, ResourceType, Config, #{}).
+
+-spec create(instance_id(), resource_type(), resource_config(), create_opts()) ->
+    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
+create(InstId, ResourceType, Config, Opts) ->
+    cluster_call(create_local, [InstId, ResourceType, Config, Opts]).
 
 -spec create_local(instance_id(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
 create_local(InstId, ResourceType, Config) ->
-    call_instance(InstId, {create, InstId, ResourceType, Config}).
+    create_local(InstId, ResourceType, Config, #{}).
+
+-spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
+    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
+create_local(InstId, ResourceType, Config, Opts) ->
+    call_instance(InstId, {create, InstId, ResourceType, Config, Opts}).
 
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
@@ -294,14 +308,24 @@ check_config(ResourceType, RawConfigTerm) ->
 -spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, term()}.
 check_and_create(InstId, ResourceType, RawConfig) ->
+    check_and_create(InstId, ResourceType, RawConfig, #{}).
+
+-spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
+    {ok, resource_data() | 'already_created'} | {error, term()}.
+check_and_create(InstId, ResourceType, RawConfig, Opts) ->
     check_and_do(ResourceType, RawConfig,
-        fun(InstConf) -> create(InstId, ResourceType, InstConf) end).
+        fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end).
 
 -spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) ->
     {ok, resource_data()} | {error, term()}.
 check_and_create_local(InstId, ResourceType, RawConfig) ->
+    check_and_create_local(InstId, ResourceType, RawConfig, #{}).
+
+-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(),
+    create_opts()) -> {ok, resource_data()} | {error, term()}.
+check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
     check_and_do(ResourceType, RawConfig,
-        fun(InstConf) -> create_local(InstId, ResourceType, InstConf) end).
+        fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
 
 -spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), term()) ->
     {ok, resource_data()} | {error, term()}.

+ 12 - 18
apps/emqx_resource/src/emqx_resource_instance.erl

@@ -26,7 +26,6 @@
 -export([ lookup/1
         , get_metrics/1
         , list_all/0
-        , create_local/3
         ]).
 
 -export([ hash_call/2
@@ -85,15 +84,6 @@ list_all() ->
         error:badarg -> []
     end.
 
-
--spec create_local(instance_id(), resource_type(), resource_config()) ->
-    {ok, resource_data()} | {error, term()}.
-create_local(InstId, ResourceType, InstConf) ->
-    case hash_call(InstId, {create, InstId, ResourceType, InstConf}, 15000) of
-        {ok, Data} -> {ok, Data};
-        Error -> Error
-    end.
-
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
 %%------------------------------------------------------------------------------
@@ -105,8 +95,8 @@ init({Pool, Id}) ->
     true = gproc_pool:connect_worker(Pool, {Pool, Id}),
     {ok, #state{worker_pool = Pool, worker_id = Id}}.
 
-handle_call({create, InstId, ResourceType, Config}, _From, State) ->
-    {reply, do_create(InstId, ResourceType, Config), State};
+handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
+    {reply, do_create(InstId, ResourceType, Config, Opts), State};
 
 handle_call({create_dry_run, InstId, ResourceType, Config}, _From, State) ->
     {reply, do_create_dry_run(InstId, ResourceType, Config), State};
@@ -146,7 +136,7 @@ code_change(_OldVsn, State, _Extra) ->
 
 %% suppress the race condition check, as these functions are protected in gproc workers
 -dialyzer({nowarn_function, [do_recreate/4,
-                             do_create/3,
+                             do_create/4,
                              do_restart/1,
                              do_stop/1,
                              do_health_check/1]}).
@@ -160,7 +150,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
             case do_create_dry_run(TestInstId, ResourceType, Config) of
                 ok ->
                     do_remove(ResourceType, InstId, ResourceState),
-                    do_create(InstId, ResourceType, Config);
+                    do_create(InstId, ResourceType, Config, #{force_create => true});
                 Error ->
                     Error
             end;
@@ -170,7 +160,8 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
             {error, not_found}
     end.
 
-do_create(InstId, ResourceType, Config) ->
+do_create(InstId, ResourceType, Config, Opts) ->
+    ForceCreate = maps:get(force_create, Opts, false),
     case lookup(InstId) of
         {ok, _} -> {ok, already_created};
         _ ->
@@ -183,11 +174,14 @@ do_create(InstId, ResourceType, Config) ->
                     %% status and then do ets:insert/2
                     _ = do_health_check(Res0#{state => ResourceState}),
                     {ok, force_lookup(InstId)};
-                {error, Reason} ->
-                    logger:error("start ~ts resource ~ts failed: ~p",
+                {error, Reason} when ForceCreate == true ->
+                    logger:error("start ~ts resource ~ts failed: ~p, "
+                                 "force_create it as a stopped resource",
                                  [ResourceType, InstId, Reason]),
                     ets:insert(emqx_resource_instance, {InstId, Res0}),
-                    {ok, Res0}
+                    {ok, Res0};
+                {error, Reason} when ForceCreate == false ->
+                    {error, Reason}
             end
     end.
 

+ 1 - 4
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -142,10 +142,7 @@ t_stop_start(_) ->
 
     ?assertNot(is_process_alive(Pid0)),
 
-    ?assertException(
-       error,
-       {?ID, stopped},
-       emqx_resource:query(?ID, get_state)),
+    ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)),
 
     ok = emqx_resource:restart(?ID),