Parcourir la source

feat: add start_after_created option to resource:create/4

Shawn il y a 3 ans
Parent
commit
cc25f92273

+ 8 - 10
apps/emqx_authn/src/emqx_authn_utils.erl

@@ -59,16 +59,14 @@ create_resource(ResourceId, Module, Config) ->
     ).
 
 update_resource(Module, Config, ResourceId) ->
-    %% recreate before maybe stop
-    %% resource will auto start during recreate
-    Result = emqx_resource:recreate_local(ResourceId, Module, Config),
-    case Config of
-        #{enable := true} ->
-            Result;
-        #{enable := false} ->
-            ok = emqx_resource:stop(ResourceId),
-            Result
-    end.
+    Opts = #{start_after_created => false},
+    Result = emqx_resource:recreate_local(ResourceId, Module, Config, Opts),
+    _ =
+        case Config of
+            #{enable := true} -> emqx_resource:start(ResourceId);
+            #{enable := false} -> ok
+        end,
+    Result.
 
 check_password_from_selected_map(_Algorithm, _Selected, undefined) ->
     {error, bad_username_or_password};

+ 1 - 1
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -38,7 +38,7 @@
 
 -export([mongo_query/5, check_worker_health/1]).
 
--define(HEALTH_CHECK_TIMEOUT, 10000).
+-define(HEALTH_CHECK_TIMEOUT, 30000).
 
 %% mongo servers don't need parse
 -define(MONGO_HOST_OPTIONS, #{

+ 8 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -33,7 +33,15 @@
 -type create_opts() :: #{
     health_check_interval => integer(),
     health_check_timeout => integer(),
+    %% We can choose to block the return of emqx_resource:start until
+    %% the resource connected, wait max to `wait_for_resource_ready` ms.
     wait_for_resource_ready => integer(),
+    %% If `start_after_created` is set to true, the resource is started right
+    %% after it is created. But note that a `started` resource is not guaranteed
+    %% to be `connected`.
+    start_after_created => boolean(),
+    %% If the resource disconnected, we can set to retry starting the resource
+    %% periodically.
     auto_retry_interval => integer()
 }.
 -type after_query() ::

+ 11 - 2
apps/emqx_resource/src/emqx_resource.erl

@@ -70,8 +70,9 @@
 %% Calls to the callback module with current resource state
 %% They also save the state after the call finished (except query/2,3).
 
-%% restart the instance.
 -export([
+    start/1,
+    start/2,
     restart/1,
     restart/2,
     %% verify if the resource is working normally
@@ -261,11 +262,19 @@ query(InstId, Request, AfterQuery) ->
                     erlang:raise(Err, Reason, ST)
             end;
         {ok, _Group, _Data} ->
-            query_error(not_found, <<"resource not connected">>);
+            query_error(not_connected, <<"resource not connected">>);
         {error, not_found} ->
             query_error(not_found, <<"resource not found">>)
     end.
 
+-spec start(instance_id()) -> ok | {error, Reason :: term()}.
+start(InstId) ->
+    start(InstId, #{}).
+
+-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
+start(InstId, Opts) ->
+    emqx_resource_manager:start(InstId, Opts).
+
 -spec restart(instance_id()) -> ok | {error, Reason :: term()}.
 restart(InstId) ->
     restart(InstId, #{}).

+ 18 - 10
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -96,7 +96,10 @@ create(InstId, Group, ResourceType, Config, Opts) ->
         [matched, success, failed, exception],
         [matched]
     ),
-    wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+    case maps:get(start_after_created, Opts, true) of
+        true -> wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000));
+        false -> ok
+    end,
     ok.
 
 %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
@@ -108,9 +111,9 @@ create(InstId, Group, ResourceType, Config, Opts) ->
 create_dry_run(ResourceType, Config) ->
     InstId = make_test_id(),
     ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}),
-    case wait_for_resource_ready(InstId, 5000) of
+    case wait_for_resource_ready(InstId, 15000) of
         ok ->
-            _ = remove(InstId);
+            remove(InstId);
         timeout ->
             _ = remove(InstId),
             {error, timeout}
@@ -123,7 +126,9 @@ recreate(InstId, ResourceType, NewConfig, Opts) ->
     case lookup(InstId) of
         {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
             _ = remove(InstId, false),
-            ensure_resource(InstId, Group, ResourceType, NewConfig, Opts);
+            create(InstId, Group, ResourceType, NewConfig, Opts),
+            {ok, _Group, Data} = lookup(InstId),
+            {ok, Data};
         {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
             {error, updating_to_incorrect_resource_type};
         {error, not_found} ->
@@ -151,7 +156,7 @@ restart(InstId, Opts) when is_binary(InstId) ->
             Error
     end.
 
-%% @doc Stop the resource
+%% @doc Start the resource
 -spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
 start(InstId, Opts) ->
     case safe_call(InstId, start, ?T_OPERATION) of
@@ -162,7 +167,7 @@ start(InstId, Opts) ->
             Error
     end.
 
-%% @doc Start the resource
+%% @doc Stop the resource
 -spec stop(instance_id()) -> ok | {error, Reason :: term()}.
 stop(InstId) ->
     case safe_call(InstId, stop, ?T_OPERATION) of
@@ -240,13 +245,16 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
         state = undefined,
         error = undefined
     },
-    gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
+    gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, {Data, Opts}, []).
 
-init(Data) ->
+init({Data, Opts}) ->
     process_flag(trap_exit, true),
     %% init the cache so that lookup/1 will always return something
     ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
-    {ok, connecting, Data, {next_event, internal, try_connect}}.
+    case maps:get(start_after_created, Opts, true) of
+        true -> {ok, connecting, Data, {next_event, internal, start_resource}};
+        false -> {ok, stopped, Data}
+    end.
 
 terminate(_Reason, _State, Data) ->
     _ = maybe_clear_alarm(Data#data.id),
@@ -296,7 +304,7 @@ handle_event(enter, _OldState, connecting, Data) ->
     ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
     Actions = [{state_timeout, 0, health_check}],
     {keep_state_and_data, Actions};
-handle_event(internal, try_connect, connecting, Data) ->
+handle_event(internal, start_resource, connecting, Data) ->
     start_resource(Data, undefined);
 handle_event(state_timeout, health_check, connecting, Data) ->
     handle_connecting_health_check(Data);

+ 40 - 2
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -126,8 +126,46 @@ t_create_remove_local(_) ->
     ok = emqx_resource:remove_local(?ID),
     {error, _} = emqx_resource:remove_local(?ID),
 
+    ?assertMatch(
+        {error, {emqx_resource, #{reason := not_found}}},
+        emqx_resource:query(?ID, get_state)
+    ),
     ?assertNot(is_process_alive(Pid)).
 
+t_do_not_start_after_created(_) ->
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource},
+        #{start_after_created => false}
+    ),
+    %% the resource should remain `disconnected` after created
+    timer:sleep(200),
+    ?assertMatch(
+        {error, {emqx_resource, #{reason := not_connected}}},
+        emqx_resource:query(?ID, get_state)
+    ),
+    ?assertMatch(
+        {ok, _, #{status := disconnected}},
+        emqx_resource:get_instance(?ID)
+    ),
+
+    %% start the resource manually..
+    ok = emqx_resource:start(?ID),
+    #{pid := Pid} = emqx_resource:query(?ID, get_state),
+    ?assert(is_process_alive(Pid)),
+
+    %% restart the resource
+    ok = emqx_resource:restart(?ID),
+    ?assertNot(is_process_alive(Pid)),
+    #{pid := Pid2} = emqx_resource:query(?ID, get_state),
+    ?assert(is_process_alive(Pid2)),
+
+    ok = emqx_resource:remove_local(?ID),
+
+    ?assertNot(is_process_alive(Pid2)).
+
 t_query(_) ->
     {ok, _} = emqx_resource:create_local(
         ?ID,
@@ -231,7 +269,7 @@ t_stop_start(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_found}}},
+        {error, {emqx_resource, #{reason := not_connected}}},
         emqx_resource:query(?ID, get_state)
     ),
 
@@ -273,7 +311,7 @@ t_stop_start_local(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := not_found}}},
+        {error, {emqx_resource, #{reason := not_connected}}},
         emqx_resource:query(?ID, get_state)
     ),