Просмотр исходного кода

refactor: graceful recreate resources

Shawn 3 лет назад
Родитель
Сommit
d6ef2f7502

+ 16 - 1
apps/emqx/src/emqx_map_lib.erl

@@ -31,7 +31,8 @@
     deep_convert/3,
     diff_maps/2,
     merge_with/3,
-    best_effort_recursive_sum/3
+    best_effort_recursive_sum/3,
+    if_only_to_toggle_enable/2
 ]).
 
 -export_type([config_key/0, config_key_path/0]).
@@ -316,3 +317,17 @@ deep_filter(M, F) when is_map(M) ->
             maps:to_list(M)
         )
     ).
+
+if_only_to_toggle_enable(OldConf, Conf) ->
+    #{added := Added, removed := Removed, changed := Updated} =
+        emqx_map_lib:diff_maps(OldConf, Conf),
+    case {Added, Removed, Updated} of
+        {Added, Removed, #{enable := _} = Updated} when
+            map_size(Added) =:= 0,
+            map_size(Removed) =:= 0,
+            map_size(Updated) =:= 1
+        ->
+            true;
+        {_, _, _} ->
+            false
+    end.

+ 1 - 15
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -111,7 +111,7 @@ update(Type, Name, {OldConf, Conf}) ->
     %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
     %% without restarting the bridge.
     %%
-    case if_only_to_toggle_enable(OldConf, Conf) of
+    case emqx_map_lib:if_only_to_toggle_enable(OldConf, Conf) of
         false ->
             ?SLOG(info, #{
                 msg => "update bridge",
@@ -198,20 +198,6 @@ maybe_disable_bridge(Type, Name, Conf) ->
         true -> ok
     end.
 
-if_only_to_toggle_enable(OldConf, Conf) ->
-    #{added := Added, removed := Removed, changed := Updated} =
-        emqx_map_lib:diff_maps(OldConf, Conf),
-    case {Added, Removed, Updated} of
-        {Added, Removed, #{enable := _} = Updated} when
-            map_size(Added) =:= 0,
-            map_size(Removed) =:= 0,
-            map_size(Updated) =:= 1
-        ->
-            true;
-        {_, _, _} ->
-            false
-    end.
-
 fill_dry_run_conf(Conf) ->
     Conf#{
         <<"egress">> =>

+ 3 - 2
apps/emqx_resource/include/emqx_resource.hrl

@@ -14,7 +14,8 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 -type resource_type() :: module().
--type instance_id() :: binary().
+-type resource_id() :: binary().
+-type manager_id() :: binary().
 -type raw_resource_config() :: binary() | raw_term_resource_config().
 -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
 -type resource_config() :: term().
@@ -22,7 +23,7 @@
 -type resource_state() :: term().
 -type resource_status() :: connected | disconnected | connecting.
 -type resource_data() :: #{
-    id := instance_id(),
+    id := resource_id(),
     mod := module(),
     config := resource_config(),
     state := resource_state(),

+ 110 - 109
apps/emqx_resource/src/emqx_resource.erl

@@ -117,17 +117,17 @@
 ]).
 
 %% when calling emqx_resource:start/1
--callback on_start(instance_id(), resource_config()) ->
+-callback on_start(resource_id(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
 
 %% when calling emqx_resource:stop/1
--callback on_stop(instance_id(), resource_state()) -> term().
+-callback on_stop(resource_id(), resource_state()) -> term().
 
 %% when calling emqx_resource:query/3
--callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term().
+-callback on_query(resource_id(), Request :: term(), after_query(), resource_state()) -> term().
 
 %% when calling emqx_resource:health_check/2
--callback on_get_status(instance_id(), resource_state()) ->
+-callback on_get_status(resource_id(), resource_state()) ->
     resource_status()
     | {resource_status(), resource_state()}
     | {resource_status(), resource_state(), term()}.
@@ -167,32 +167,32 @@ apply_query_after_calls(Funcs) ->
 %% =================================================================================
 %% APIs for resource instances
 %% =================================================================================
--spec create(instance_id(), resource_group(), resource_type(), resource_config()) ->
+-spec create(resource_id(), resource_group(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-create(InstId, Group, ResourceType, Config) ->
-    create(InstId, Group, ResourceType, Config, #{}).
+create(ResId, Group, ResourceType, Config) ->
+    create(ResId, Group, ResourceType, Config, #{}).
 
--spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
+-spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-create(InstId, Group, ResourceType, Config, Opts) ->
-    emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts).
+create(ResId, Group, ResourceType, Config, Opts) ->
+    emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts).
 % --------------------------------------------
 
--spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) ->
+-spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-create_local(InstId, Group, ResourceType, Config) ->
-    create_local(InstId, Group, ResourceType, Config, #{}).
+create_local(ResId, Group, ResourceType, Config) ->
+    create_local(ResId, Group, ResourceType, Config, #{}).
 
 -spec create_local(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     resource_config(),
     create_opts()
 ) ->
     {ok, resource_data()}.
-create_local(InstId, Group, ResourceType, Config, Opts) ->
-    emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts).
+create_local(ResId, Group, ResourceType, Config, Opts) ->
+    emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts).
 
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
@@ -204,61 +204,61 @@ create_dry_run(ResourceType, Config) ->
 create_dry_run_local(ResourceType, Config) ->
     emqx_resource_manager:create_dry_run(ResourceType, Config).
 
--spec recreate(instance_id(), resource_type(), resource_config()) ->
+-spec recreate(resource_id(), resource_type(), resource_config()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
-recreate(InstId, ResourceType, Config) ->
-    recreate(InstId, ResourceType, Config, #{}).
+recreate(ResId, ResourceType, Config) ->
+    recreate(ResId, ResourceType, Config, #{}).
 
--spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
+-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
-recreate(InstId, ResourceType, Config, Opts) ->
-    emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts).
+recreate(ResId, ResourceType, Config, Opts) ->
+    emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts).
 
--spec recreate_local(instance_id(), resource_type(), resource_config()) ->
+-spec recreate_local(resource_id(), resource_type(), resource_config()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
-recreate_local(InstId, ResourceType, Config) ->
-    recreate_local(InstId, ResourceType, Config, #{}).
+recreate_local(ResId, ResourceType, Config) ->
+    recreate_local(ResId, ResourceType, Config, #{}).
 
--spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
+-spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
-recreate_local(InstId, ResourceType, Config, Opts) ->
-    emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts).
+recreate_local(ResId, ResourceType, Config, Opts) ->
+    emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
 
--spec remove(instance_id()) -> ok | {error, Reason :: term()}.
-remove(InstId) ->
-    emqx_resource_proto_v1:remove(InstId).
+-spec remove(resource_id()) -> ok | {error, Reason :: term()}.
+remove(ResId) ->
+    emqx_resource_proto_v1:remove(ResId).
 
--spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
-remove_local(InstId) ->
-    emqx_resource_manager:remove(InstId).
+-spec remove_local(resource_id()) -> ok | {error, Reason :: term()}.
+remove_local(ResId) ->
+    emqx_resource_manager:remove(ResId).
 
--spec reset_metrics_local(instance_id()) -> ok.
-reset_metrics_local(InstId) ->
-    emqx_resource_manager:reset_metrics(InstId).
+-spec reset_metrics_local(resource_id()) -> ok.
+reset_metrics_local(ResId) ->
+    emqx_resource_manager:reset_metrics(ResId).
 
--spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
-reset_metrics(InstId) ->
-    emqx_resource_proto_v1:reset_metrics(InstId).
+-spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}.
+reset_metrics(ResId) ->
+    emqx_resource_proto_v1:reset_metrics(ResId).
 
 %% =================================================================================
--spec query(instance_id(), Request :: term()) -> Result :: term().
-query(InstId, Request) ->
-    query(InstId, Request, inc_metrics_funcs(InstId)).
+-spec query(resource_id(), Request :: term()) -> Result :: term().
+query(ResId, Request) ->
+    query(ResId, Request, inc_metrics_funcs(ResId)).
 
 %% same to above, also defines what to do when the Module:on_query success or failed
 %% it is the duty of the Module to apply the `after_query()` functions.
--spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
-query(InstId, Request, AfterQuery) ->
-    case emqx_resource_manager:ets_lookup(InstId) of
+-spec query(resource_id(), Request :: term(), after_query()) -> Result :: term().
+query(ResId, Request, AfterQuery) ->
+    case emqx_resource_manager:ets_lookup(ResId) of
         {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
-            ok = emqx_metrics_worker:inc(resource_metrics, InstId, matched),
+            ok = emqx_metrics_worker:inc(resource_metrics, ResId, matched),
             try
-                Mod:on_query(InstId, Request, AfterQuery, ResourceState)
+                Mod:on_query(ResId, Request, AfterQuery, ResourceState)
             catch
                 Err:Reason:ST ->
-                    emqx_metrics_worker:inc(resource_metrics, InstId, exception),
+                    emqx_metrics_worker:inc(resource_metrics, ResId, exception),
                     erlang:raise(Err, Reason, ST)
             end;
         {ok, _Group, _Data} ->
@@ -267,39 +267,39 @@ query(InstId, Request, AfterQuery) ->
             query_error(not_found, <<"resource not found">>)
     end.
 
--spec start(instance_id()) -> ok | {error, Reason :: term()}.
-start(InstId) ->
-    start(InstId, #{}).
+-spec start(resource_id()) -> ok | {error, Reason :: term()}.
+start(ResId) ->
+    start(ResId, #{}).
 
--spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
-start(InstId, Opts) ->
-    emqx_resource_manager:start(InstId, Opts).
+-spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+start(ResId, Opts) ->
+    emqx_resource_manager:start(ResId, Opts).
 
--spec restart(instance_id()) -> ok | {error, Reason :: term()}.
-restart(InstId) ->
-    restart(InstId, #{}).
+-spec restart(resource_id()) -> ok | {error, Reason :: term()}.
+restart(ResId) ->
+    restart(ResId, #{}).
 
--spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
-restart(InstId, Opts) ->
-    emqx_resource_manager:restart(InstId, Opts).
+-spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+restart(ResId, Opts) ->
+    emqx_resource_manager:restart(ResId, Opts).
 
--spec stop(instance_id()) -> ok | {error, Reason :: term()}.
-stop(InstId) ->
-    emqx_resource_manager:stop(InstId).
+-spec stop(resource_id()) -> ok | {error, Reason :: term()}.
+stop(ResId) ->
+    emqx_resource_manager:stop(ResId).
 
--spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
-health_check(InstId) ->
-    emqx_resource_manager:health_check(InstId).
+-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
+health_check(ResId) ->
+    emqx_resource_manager:health_check(ResId).
 
-set_resource_status_connecting(InstId) ->
-    emqx_resource_manager:set_resource_status_connecting(InstId).
+set_resource_status_connecting(ResId) ->
+    emqx_resource_manager:set_resource_status_connecting(ResId).
 
--spec get_instance(instance_id()) ->
+-spec get_instance(resource_id()) ->
     {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
-get_instance(InstId) ->
-    emqx_resource_manager:lookup(InstId).
+get_instance(ResId) ->
+    emqx_resource_manager:lookup(ResId).
 
--spec list_instances() -> [instance_id()].
+-spec list_instances() -> [resource_id()].
 list_instances() ->
     [Id || #{id := Id} <- list_instances_verbose()].
 
@@ -307,36 +307,37 @@ list_instances() ->
 list_instances_verbose() ->
     emqx_resource_manager:list_all().
 
--spec list_instances_by_type(module()) -> [instance_id()].
+-spec list_instances_by_type(module()) -> [resource_id()].
 list_instances_by_type(ResourceType) ->
     filter_instances(fun
         (_, RT) when RT =:= ResourceType -> true;
         (_, _) -> false
     end).
 
--spec generate_id(term()) -> instance_id().
+-spec generate_id(term()) -> resource_id().
 generate_id(Name) when is_binary(Name) ->
-    Id = integer_to_binary(erlang:unique_integer([positive])),
+    Id = integer_to_binary(erlang:unique_integer([monotonic, positive])),
     <<Name/binary, ":", Id/binary>>.
 
--spec list_group_instances(resource_group()) -> [instance_id()].
+-spec list_group_instances(resource_group()) -> [resource_id()].
 list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
 
--spec call_start(instance_id(), module(), resource_config()) ->
+-spec call_start(manager_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
-call_start(InstId, Mod, Config) ->
-    ?SAFE_CALL(Mod:on_start(InstId, Config)).
+call_start(MgrId, Mod, Config) ->
+    ?SAFE_CALL(Mod:on_start(MgrId, Config)).
 
--spec call_health_check(instance_id(), module(), resource_state()) ->
+-spec call_health_check(manager_id(), module(), resource_state()) ->
     resource_status()
     | {resource_status(), resource_state()}
-    | {resource_status(), resource_state(), term()}.
-call_health_check(InstId, Mod, ResourceState) ->
-    ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)).
+    | {resource_status(), resource_state(), term()}
+    | {error, term()}.
+call_health_check(MgrId, Mod, ResourceState) ->
+    ?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)).
 
--spec call_stop(instance_id(), module(), resource_state()) -> term().
-call_stop(InstId, Mod, ResourceState) ->
-    ?SAFE_CALL(Mod:on_stop(InstId, ResourceState)).
+-spec call_stop(manager_id(), module(), resource_state()) -> term().
+call_stop(MgrId, Mod, ResourceState) ->
+    ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)).
 
 -spec check_config(resource_type(), raw_resource_config()) ->
     {ok, resource_config()} | {error, term()}.
@@ -344,85 +345,85 @@ check_config(ResourceType, Conf) ->
     emqx_hocon:check(ResourceType, Conf).
 
 -spec check_and_create(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     raw_resource_config()
 ) ->
     {ok, resource_data() | 'already_created'} | {error, term()}.
-check_and_create(InstId, Group, ResourceType, RawConfig) ->
-    check_and_create(InstId, Group, ResourceType, RawConfig, #{}).
+check_and_create(ResId, Group, ResourceType, RawConfig) ->
+    check_and_create(ResId, Group, ResourceType, RawConfig, #{}).
 
 -spec check_and_create(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     raw_resource_config(),
     create_opts()
 ) ->
     {ok, resource_data() | 'already_created'} | {error, term()}.
-check_and_create(InstId, Group, ResourceType, RawConfig, Opts) ->
+check_and_create(ResId, Group, ResourceType, RawConfig, Opts) ->
     check_and_do(
         ResourceType,
         RawConfig,
-        fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end
+        fun(ResConf) -> create(ResId, Group, ResourceType, ResConf, Opts) end
     ).
 
 -spec check_and_create_local(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     raw_resource_config()
 ) ->
     {ok, resource_data()} | {error, term()}.
-check_and_create_local(InstId, Group, ResourceType, RawConfig) ->
-    check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}).
+check_and_create_local(ResId, Group, ResourceType, RawConfig) ->
+    check_and_create_local(ResId, Group, ResourceType, RawConfig, #{}).
 
 -spec check_and_create_local(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     raw_resource_config(),
     create_opts()
 ) -> {ok, resource_data()} | {error, term()}.
-check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) ->
+check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
     check_and_do(
         ResourceType,
         RawConfig,
-        fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end
+        fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end
     ).
 
 -spec check_and_recreate(
-    instance_id(),
+    resource_id(),
     resource_type(),
     raw_resource_config(),
     create_opts()
 ) ->
     {ok, resource_data()} | {error, term()}.
-check_and_recreate(InstId, ResourceType, RawConfig, Opts) ->
+check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
     check_and_do(
         ResourceType,
         RawConfig,
-        fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end
+        fun(ResConf) -> recreate(ResId, ResourceType, ResConf, Opts) end
     ).
 
 -spec check_and_recreate_local(
-    instance_id(),
+    resource_id(),
     resource_type(),
     raw_resource_config(),
     create_opts()
 ) ->
     {ok, resource_data()} | {error, term()}.
-check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) ->
+check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) ->
     check_and_do(
         ResourceType,
         RawConfig,
-        fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end
+        fun(ResConf) -> recreate_local(ResId, ResourceType, ResConf, Opts) end
     ).
 
 check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
     case check_config(ResourceType, RawConfig) of
-        {ok, InstConf} -> Do(InstConf);
+        {ok, ResConf} -> Do(ResConf);
         Error -> Error
     end.
 
@@ -431,9 +432,9 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
 filter_instances(Filter) ->
     [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
 
-inc_metrics_funcs(InstId) ->
-    OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, failed]}],
-    OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}],
+inc_metrics_funcs(ResId) ->
+    OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, failed]}],
+    OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, success]}],
     {OnSucc, OnFailed}.
 
 safe_apply(Func, Args) ->

+ 178 - 110
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -23,7 +23,6 @@
 % API
 -export([
     ensure_resource/5,
-    create/5,
     recreate/4,
     remove/1,
     create_dry_run/2,
@@ -44,13 +43,13 @@
 ]).
 
 % Server
--export([start_link/5]).
+-export([start_link/6]).
 
 % Behaviour
 -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
 
 % State record
--record(data, {id, group, mod, config, opts, status, state, error}).
+-record(data, {id, manager_id, group, mod, config, opts, status, state, error}).
 
 -define(SHORT_HEALTHCHECK_INTERVAL, 1000).
 -define(HEALTHCHECK_INTERVAL, 15000).
@@ -70,34 +69,53 @@
 %% Triggers the emqx_resource_manager_sup supervisor to actually create
 %% and link the process itself if not already started.
 -spec ensure_resource(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     resource_config(),
     create_opts()
 ) -> {ok, resource_data()}.
-ensure_resource(InstId, Group, ResourceType, Config, Opts) ->
-    case lookup(InstId) of
+ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
+    case lookup(ResId) of
         {ok, _Group, Data} ->
             {ok, Data};
         {error, not_found} ->
-            create(InstId, Group, ResourceType, Config, Opts),
-            {ok, _Group, Data} = lookup(InstId),
-            {ok, Data}
+            MgrId = set_new_owner(ResId),
+            create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts)
     end.
 
+%% @doc Called from emqx_resource when recreating a resource which may or may not exist
+-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) ->
+    {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
+recreate(ResId, ResourceType, NewConfig, Opts) ->
+    case lookup(ResId) of
+        {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
+            _ = remove(ResId, false),
+            MgrId = set_new_owner(ResId),
+            create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts);
+        {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
+            {error, updating_to_incorrect_resource_type};
+        {error, not_found} ->
+            {error, not_found}
+    end.
+
+create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) ->
+    create(MgrId, ResId, Group, ResourceType, Config, Opts),
+    {ok, _Group, Data} = lookup(ResId),
+    {ok, Data}.
+
 %% @doc Create a resource_manager and wait until it is running
-create(InstId, Group, ResourceType, Config, Opts) ->
+create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     % The state machine will make the actual call to the callback/resource module after init
-    ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts),
+    ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts),
     ok = emqx_metrics_worker:create_metrics(
         resource_metrics,
-        InstId,
+        ResId,
         [matched, success, failed, exception],
         [matched]
     ),
     case maps:get(start_after_created, Opts, true) of
-        true -> wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000));
+        true -> wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
         false -> ok
     end,
     ok.
@@ -109,68 +127,55 @@ create(InstId, Group, ResourceType, Config, Opts) ->
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run(ResourceType, Config) ->
-    InstId = make_test_id(),
-    ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}),
-    case wait_for_resource_ready(InstId, 15000) of
+    ResId = make_test_id(),
+    MgrId = set_new_owner(ResId),
+    ok = emqx_resource_manager_sup:ensure_child(
+        MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
+    ),
+    case wait_for_resource_ready(ResId, 15000) of
         ok ->
-            remove(InstId);
+            remove(ResId);
         timeout ->
-            _ = remove(InstId),
+            _ = remove(ResId),
             {error, timeout}
     end.
 
-%% @doc Called from emqx_resource when recreating a resource which may or may not exist
--spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
-    {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
-recreate(InstId, ResourceType, NewConfig, Opts) ->
-    case lookup(InstId) of
-        {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
-            _ = remove(InstId, false),
-            create(InstId, Group, ResourceType, NewConfig, Opts),
-            {ok, _Group, Data} = lookup(InstId),
-            {ok, Data};
-        {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
-            {error, updating_to_incorrect_resource_type};
-        {error, not_found} ->
-            {error, not_found}
-    end.
-
 %% @doc Stops a running resource_manager and clears the metrics for the resource
--spec remove(instance_id()) -> ok | {error, Reason :: term()}.
-remove(InstId) when is_binary(InstId) ->
-    remove(InstId, true).
+-spec remove(resource_id()) -> ok | {error, Reason :: term()}.
+remove(ResId) when is_binary(ResId) ->
+    remove(ResId, true).
 
 %% @doc Stops a running resource_manager and optionally clears the metrics for the resource
--spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}.
-remove(InstId, ClearMetrics) when is_binary(InstId) ->
-    safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION).
+-spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
+remove(ResId, ClearMetrics) when is_binary(ResId) ->
+    safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION).
 
 %% @doc Stops and then starts an instance that was already running
--spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
-restart(InstId, Opts) when is_binary(InstId) ->
-    case safe_call(InstId, restart, ?T_OPERATION) of
+-spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+restart(ResId, Opts) when is_binary(ResId) ->
+    case safe_call(ResId, restart, ?T_OPERATION) of
         ok ->
-            wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+            wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)),
             ok;
         {error, _Reason} = Error ->
             Error
     end.
 
 %% @doc Start the resource
--spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
-start(InstId, Opts) ->
-    case safe_call(InstId, start, ?T_OPERATION) of
+-spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
+start(ResId, Opts) ->
+    case safe_call(ResId, start, ?T_OPERATION) of
         ok ->
-            wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+            wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)),
             ok;
         {error, _Reason} = Error ->
             Error
     end.
 
 %% @doc Stop the resource
--spec stop(instance_id()) -> ok | {error, Reason :: term()}.
-stop(InstId) ->
-    case safe_call(InstId, stop, ?T_OPERATION) of
+-spec stop(resource_id()) -> ok | {error, Reason :: term()}.
+stop(ResId) ->
+    case safe_call(ResId, stop, ?T_OPERATION) of
         ok ->
             ok;
         {error, _Reason} = Error ->
@@ -178,36 +183,36 @@ stop(InstId) ->
     end.
 
 %% @doc Test helper
--spec set_resource_status_connecting(instance_id()) -> ok.
-set_resource_status_connecting(InstId) ->
-    safe_call(InstId, set_resource_status_connecting, infinity).
+-spec set_resource_status_connecting(resource_id()) -> ok.
+set_resource_status_connecting(ResId) ->
+    safe_call(ResId, set_resource_status_connecting, infinity).
 
 %% @doc Lookup the group and data of a resource
--spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
-lookup(InstId) ->
-    case safe_call(InstId, lookup, ?T_LOOKUP) of
-        {error, timeout} -> ets_lookup(InstId);
+-spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
+lookup(ResId) ->
+    case safe_call(ResId, lookup, ?T_LOOKUP) of
+        {error, timeout} -> ets_lookup(ResId);
         Result -> Result
     end.
 
 %% @doc Lookup the group and data of a resource
--spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
-ets_lookup(InstId) ->
-    case ets:lookup(?ETS_TABLE, InstId) of
-        [{_Id, Group, Data}] ->
+-spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
+ets_lookup(ResId) ->
+    case read_cache(ResId) of
+        {Group, Data} ->
             {ok, Group, data_record_to_external_map_with_metrics(Data)};
-        [] ->
+        not_found ->
             {error, not_found}
     end.
 
 %% @doc Get the metrics for the specified resource
-get_metrics(InstId) ->
-    emqx_metrics_worker:get_metrics(resource_metrics, InstId).
+get_metrics(ResId) ->
+    emqx_metrics_worker:get_metrics(resource_metrics, ResId).
 
 %% @doc Reset the metrics for the specified resource
--spec reset_metrics(instance_id()) -> ok.
-reset_metrics(InstId) ->
-    emqx_metrics_worker:reset_metrics(resource_metrics, InstId).
+-spec reset_metrics(resource_id()) -> ok.
+reset_metrics(ResId) ->
+    emqx_metrics_worker:reset_metrics(resource_metrics, ResId).
 
 %% @doc Returns the data for all resorces
 -spec list_all() -> [resource_data()] | [].
@@ -222,21 +227,22 @@ list_all() ->
     end.
 
 %% @doc Returns a list of ids for all the resources in a group
--spec list_group(resource_group()) -> [instance_id()].
+-spec list_group(resource_group()) -> [resource_id()].
 list_group(Group) ->
     List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
     lists:flatten(List).
 
--spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
-health_check(InstId) ->
-    safe_call(InstId, health_check, ?T_OPERATION).
+-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
+health_check(ResId) ->
+    safe_call(ResId, health_check, ?T_OPERATION).
 
 %% Server start/stop callbacks
 
 %% @doc Function called from the supervisor to actually start the server
-start_link(InstId, Group, ResourceType, Config, Opts) ->
+start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
     Data = #data{
-        id = InstId,
+        id = ResId,
+        manager_id = MgrId,
         group = Group,
         mod = ResourceType,
         config = Config,
@@ -245,12 +251,14 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
         state = undefined,
         error = undefined
     },
-    gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, {Data, Opts}, []).
+    Module = atom_to_binary(?MODULE),
+    ProcName = binary_to_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
+    gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []).
 
 init({Data, Opts}) ->
     process_flag(trap_exit, true),
     %% init the cache so that lookup/1 will always return something
-    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
+    insert_cache(Data#data.id, Data#data.group, Data),
     case maps:get(start_after_created, Opts, true) of
         true -> {ok, connecting, Data, {next_event, internal, start_resource}};
         false -> {ok, stopped, Data}
@@ -258,7 +266,7 @@ init({Data, Opts}) ->
 
 terminate(_Reason, _State, Data) ->
     _ = maybe_clear_alarm(Data#data.id),
-    ets:delete(?ETS_TABLE, Data#data.id),
+    delete_cache(Data#data.id, Data#data.manager_id),
     ok.
 
 %% Behavior callback
@@ -279,6 +287,12 @@ handle_event({call, From}, start, stopped, Data) ->
     start_resource(Data, From);
 handle_event({call, From}, start, _State, _Data) ->
     {keep_state_and_data, [{reply, From, ok}]};
+% Called when the resource received a `quit` message
+handle_event(info, quit, stopped, _Data) ->
+    {stop, {shutdown, quit}};
+handle_event(info, quit, _State, Data) ->
+    _ = stop_resource(Data),
+    {stop, {shutdown, quit}};
 % Called when the resource is to be stopped
 handle_event({call, From}, stop, stopped, _Data) ->
     {keep_state_and_data, [{reply, From, ok}]};
@@ -301,7 +315,7 @@ handle_event({call, From}, health_check, _State, Data) ->
     handle_manually_health_check(From, Data);
 % State: CONNECTING
 handle_event(enter, _OldState, connecting, Data) ->
-    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
+    insert_cache(Data#data.id, Data#data.group, Data),
     Actions = [{state_timeout, 0, health_check}],
     {keep_state_and_data, Actions};
 handle_event(internal, start_resource, connecting, Data) ->
@@ -312,7 +326,7 @@ handle_event(state_timeout, health_check, connecting, Data) ->
 %% The connected state is entered after a successful on_start/2 of the callback mod
 %% and successful health_checks
 handle_event(enter, _OldState, connected, Data) ->
-    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
+    insert_cache(Data#data.id, Data#data.group, Data),
     _ = emqx_alarm:deactivate(Data#data.id),
     Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
     {next_state, connected, Data, Actions};
@@ -320,7 +334,7 @@ handle_event(state_timeout, health_check, connected, Data) ->
     handle_connected_health_check(Data);
 %% State: DISCONNECTED
 handle_event(enter, _OldState, disconnected, Data) ->
-    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
+    insert_cache(Data#data.id, Data#data.group, Data),
     handle_disconnected_state_enter(Data);
 handle_event(state_timeout, auto_retry, disconnected, Data) ->
     start_resource(Data, undefined);
@@ -328,14 +342,14 @@ handle_event(state_timeout, auto_retry, disconnected, Data) ->
 %% The stopped state is entered after the resource has been explicitly stopped
 handle_event(enter, _OldState, stopped, Data) ->
     UpdatedData = Data#data{status = disconnected},
-    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}),
+    insert_cache(Data#data.id, Data#data.group, UpdatedData),
     {next_state, stopped, UpdatedData};
 % Ignore all other events
 handle_event(EventType, EventData, State, Data) ->
     ?SLOG(
         error,
         #{
-            msg => "ignore all other events",
+            msg => ignore_all_other_events,
             event_type => EventType,
             event_data => EventData,
             state => State,
@@ -347,6 +361,47 @@ handle_event(EventType, EventData, State, Data) ->
 %%------------------------------------------------------------------------------
 %% internal functions
 %%------------------------------------------------------------------------------
+insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) ->
+    case get_owner(ResId) of
+        not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data});
+        MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data});
+        _ -> self() ! quit
+    end.
+
+read_cache(ResId) ->
+    case ets:lookup(?ETS_TABLE, ResId) of
+        [{_Id, Group, Data}] -> {Group, Data};
+        [] -> not_found
+    end.
+
+delete_cache(ResId, MgrId) ->
+    case get_owner(ResId) of
+        MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId ->
+            do_delete_cache(ResId);
+        _ ->
+            ok
+    end.
+
+do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
+    ets:delete(?ETS_TABLE, {owner, ResId}),
+    ets:delete(?ETS_TABLE, ResId);
+do_delete_cache(ResId) ->
+    ets:delete(?ETS_TABLE, ResId).
+
+set_new_owner(ResId) ->
+    MgrId = make_manager_id(ResId),
+    ok = set_owner(ResId, MgrId),
+    MgrId.
+
+set_owner(ResId, MgrId) ->
+    ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}),
+    ok.
+
+get_owner(ResId) ->
+    case ets:lookup(?ETS_TABLE, {owner, ResId}) of
+        [{_, MgrId}] -> MgrId;
+        [] -> not_found
+    end.
 
 handle_disconnected_state_enter(Data) ->
     case maps:get(auto_retry_interval, Data#data.opts, undefined) of
@@ -367,8 +422,8 @@ handle_remove_event(From, ClearMetrics, Data) ->
 
 start_resource(Data, From) ->
     %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
-    ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
-    case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of
+    insert_cache(Data#data.id, Data#data.group, Data),
+    case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of
         {ok, ResourceState} ->
             UpdatedData = Data#data{state = ResourceState, status = connecting},
             %% Perform an initial health_check immediately before transitioning into a connected state
@@ -390,14 +445,16 @@ stop_resource(Data) ->
     %% We don't care the return value of the Mod:on_stop/2.
     %% The callback mod should make sure the resource is stopped after on_stop/2
     %% is returned.
-    _ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
+    _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state),
     _ = maybe_clear_alarm(Data#data.id),
     ok.
 
-proc_name(Id) ->
-    Module = atom_to_binary(?MODULE),
-    Connector = <<"_">>,
-    binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
+make_manager_id(ResId) ->
+    emqx_resource:generate_id(ResId).
+
+make_test_id() ->
+    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
+    <<?TEST_ID_PREFIX, RandId/binary>>.
 
 handle_manually_health_check(From, Data) ->
     with_health_check(Data, fun(Status, UpdatedData) ->
@@ -434,13 +491,13 @@ handle_connected_health_check(Data) ->
 
 with_health_check(Data, Func) ->
     ResId = Data#data.id,
-    HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
-    {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
+    HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
+    {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
     _ = maybe_alarm(Status, ResId),
     UpdatedData = Data#data{
         state = NewState, status = Status, error = Err
     },
-    ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}),
+    insert_cache(ResId, UpdatedData#data.group, UpdatedData),
     Func(Status, UpdatedData).
 
 maybe_alarm(connected, _ResId) ->
@@ -459,12 +516,22 @@ maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
 maybe_clear_alarm(ResId) ->
     emqx_alarm:deactivate(ResId).
 
-parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) ->
-    {Status, OldState, undefined};
-parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) ->
+parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
+    {Status, Data#data.state, undefined};
+parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
     {Status, NewState, undefined};
-parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) ->
-    {Status, NewState, Error}.
+parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
+    {Status, NewState, Error};
+parse_health_check_result({error, Error}, Data) ->
+    ?SLOG(
+        error,
+        #{
+            msg => health_check_exception,
+            resource_id => Data#data.id,
+            reason => Error
+        }
+    ),
+    {disconnected, Data#data.state, Error}.
 
 maybe_reply(Actions, undefined, _Reply) ->
     Actions;
@@ -481,29 +548,30 @@ data_record_to_external_map_with_metrics(Data) ->
         metrics => get_metrics(Data#data.id)
     }.
 
-make_test_id() ->
-    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
-    <<?TEST_ID_PREFIX, RandId/binary>>.
-
--spec wait_for_resource_ready(instance_id(), integer()) -> ok | timeout.
-wait_for_resource_ready(InstId, WaitTime) ->
-    do_wait_for_resource_ready(InstId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
+-spec wait_for_resource_ready(resource_id(), integer()) -> ok | timeout.
+wait_for_resource_ready(ResId, WaitTime) ->
+    do_wait_for_resource_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
 
-do_wait_for_resource_ready(_InstId, 0) ->
+do_wait_for_resource_ready(_ResId, 0) ->
     timeout;
-do_wait_for_resource_ready(InstId, Retry) ->
-    case ets_lookup(InstId) of
+do_wait_for_resource_ready(ResId, Retry) ->
+    case ets_lookup(ResId) of
         {ok, _Group, #{status := connected}} ->
             ok;
         _ ->
             timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
-            do_wait_for_resource_ready(InstId, Retry - 1)
+            do_wait_for_resource_ready(ResId, Retry - 1)
     end.
 
-safe_call(InstId, Message, Timeout) ->
+safe_call(ResId, Message, Timeout) ->
     try
-        gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout})
+        Module = atom_to_binary(?MODULE),
+        MgrId = get_owner(ResId),
+        ProcName = binary_to_existing_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
+        gen_statem:call(ProcName, Message, {clean_timeout, Timeout})
     catch
+        error:badarg ->
+            {error, not_found};
         exit:{R, _} when R == noproc; R == normal; R == shutdown ->
             {error, not_found};
         exit:{timeout, _} ->

+ 3 - 3
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -17,14 +17,14 @@
 
 -behaviour(supervisor).
 
--export([ensure_child/5]).
+-export([ensure_child/6]).
 
 -export([start_link/0]).
 
 -export([init/1]).
 
-ensure_child(InstId, Group, ResourceType, Config, Opts) ->
-    _ = supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]),
+ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) ->
+    _ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]),
     ok.
 
 start_link() ->

+ 12 - 12
apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl

@@ -34,16 +34,16 @@ introduced_in() ->
     "5.0.0".
 
 -spec create(
-    instance_id(),
+    resource_id(),
     resource_group(),
     resource_type(),
     resource_config(),
     create_opts()
 ) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-create(InstId, Group, ResourceType, Config, Opts) ->
+create(ResId, Group, ResourceType, Config, Opts) ->
     emqx_cluster_rpc:multicall(emqx_resource, create_local, [
-        InstId, Group, ResourceType, Config, Opts
+        ResId, Group, ResourceType, Config, Opts
     ]).
 
 -spec create_dry_run(
@@ -55,19 +55,19 @@ create_dry_run(ResourceType, Config) ->
     emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]).
 
 -spec recreate(
-    instance_id(),
+    resource_id(),
     resource_type(),
     resource_config(),
     create_opts()
 ) ->
     {ok, resource_data()} | {error, Reason :: term()}.
-recreate(InstId, ResourceType, Config, Opts) ->
-    emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]).
+recreate(ResId, ResourceType, Config, Opts) ->
+    emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [ResId, ResourceType, Config, Opts]).
 
--spec remove(instance_id()) -> ok | {error, Reason :: term()}.
-remove(InstId) ->
-    emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]).
+-spec remove(resource_id()) -> ok | {error, Reason :: term()}.
+remove(ResId) ->
+    emqx_cluster_rpc:multicall(emqx_resource, remove_local, [ResId]).
 
--spec reset_metrics(instance_id()) -> ok | {error, any()}.
-reset_metrics(InstId) ->
-    emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]).
+-spec reset_metrics(resource_id()) -> ok | {error, any()}.
+reset_metrics(ResId) ->
+    emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [ResId]).

+ 10 - 1
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -348,6 +348,16 @@ t_list_filter(_) ->
     ).
 
 t_create_dry_run_local(_) ->
+    ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}),
+    lists:foreach(
+        fun(_) ->
+            create_dry_run_local_succ()
+        end,
+        lists:seq(1, 10)
+    ),
+    [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}).
+
+create_dry_run_local_succ() ->
     ?assertEqual(
         ok,
         emqx_resource:create_dry_run_local(
@@ -355,7 +365,6 @@ t_create_dry_run_local(_) ->
             #{name => test_resource, register => true}
         )
     ),
-    timer:sleep(100),
     ?assertEqual(undefined, whereis(test_resource)).
 
 t_create_dry_run_local_failed(_) ->