Explorar o código

Merge pull request #7783 from emqx/EMQX-4199-introduce-a-new-emqx-resource-manager-module

feat: isolate resource manager processes
Chris Hicks %!s(int64=3) %!d(string=hai) anos
pai
achega
841acb7828
Modificáronse 26 ficheiros con 644 adicións e 303 borrados
  1. 14 26
      apps/emqx_authn/src/simple_authn/emqx_authn_http.erl
  2. 1 1
      apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl
  3. 11 23
      apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl
  4. 11 21
      apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl
  5. 11 23
      apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl
  6. 8 16
      apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl
  7. 2 4
      apps/emqx_authz/src/emqx_authz_http.erl
  8. 8 12
      apps/emqx_authz/src/emqx_authz_mongodb.erl
  9. 2 6
      apps/emqx_authz/src/emqx_authz_mysql.erl
  10. 14 20
      apps/emqx_authz/src/emqx_authz_postgresql.erl
  11. 5 9
      apps/emqx_authz/src/emqx_authz_redis.erl
  12. 8 13
      apps/emqx_authz/src/emqx_authz_utils.erl
  13. 8 13
      apps/emqx_bridge/src/emqx_bridge.erl
  14. 1 1
      apps/emqx_bridge/test/emqx_bridge_SUITE.erl
  15. 2 0
      apps/emqx_connector/src/emqx_connector.erl
  16. 1 1
      apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl
  17. 1 1
      apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl
  18. 1 1
      apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl
  19. 1 1
      apps/emqx_connector/test/emqx_connector_redis_SUITE.erl
  20. 14 28
      apps/emqx_resource/src/emqx_resource.erl
  21. 429 0
      apps/emqx_resource/src/emqx_resource_manager.erl
  22. 49 0
      apps/emqx_resource/src/emqx_resource_manager_sup.erl
  23. 6 38
      apps/emqx_resource/src/emqx_resource_sup.erl
  24. 17 17
      apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl
  25. 11 12
      apps/emqx_resource/test/emqx_resource_SUITE.erl
  26. 8 16
      apps/emqx_retainer/src/emqx_retainer.erl

+ 14 - 26
apps/emqx_authn/src/simple_authn/emqx_authn_http.erl

@@ -181,34 +181,22 @@ create(
         request_timeout => RequestTimeout,
         resource_id => ResourceId
     },
-    case
-        emqx_resource:create_local(
-            ResourceId,
-            ?RESOURCE_GROUP,
-            emqx_connector_http,
-            Config#{
-                base_url => BaseUrl,
-                pool_type => random
-            },
-            #{}
-        )
-    of
-        {ok, already_created} ->
-            {ok, State};
-        {ok, _} ->
-            {ok, State};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        ResourceId,
+        ?RESOURCE_GROUP,
+        emqx_connector_http,
+        Config#{
+            base_url => BaseUrl,
+            pool_type => random
+        },
+        #{}
+    ),
+    {ok, State}.
 
 update(Config, State) ->
-    case create(Config) of
-        {ok, NewState} ->
-            ok = destroy(State),
-            {ok, NewState};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, NewState} = create(Config),
+    ok = destroy(State),
+    {ok, NewState}.
 
 authenticate(#{auth_method := _}, _) ->
     ignore;

+ 1 - 1
apps/emqx_authn/src/simple_authn/emqx_authn_jwt.erl

@@ -302,7 +302,7 @@ create2(
     } = Config
 ) ->
     ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
-    {ok, _} = emqx_resource:create_local(
+    {ok, _Data} = emqx_resource:create_local(
         ResourceId,
         ?RESOURCE_GROUP,
         emqx_authn_jwks_connector,

+ 11 - 23
apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl

@@ -146,31 +146,19 @@ create(#{filter := Filter} = Config) ->
         filter_template => FilterTemplate,
         resource_id => ResourceId
     },
-    case
-        emqx_resource:create_local(
-            ResourceId,
-            ?RESOURCE_GROUP,
-            emqx_connector_mongo,
-            Config,
-            #{}
-        )
-    of
-        {ok, already_created} ->
-            {ok, NState};
-        {ok, _} ->
-            {ok, NState};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        ResourceId,
+        ?RESOURCE_GROUP,
+        emqx_connector_mongo,
+        Config,
+        #{}
+    ),
+    {ok, NState}.
 
 update(Config, State) ->
-    case create(Config) of
-        {ok, NewState} ->
-            ok = destroy(State),
-            {ok, NewState};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, NewState} = create(Config),
+    ok = destroy(State),
+    {ok, NewState}.
 
 authenticate(#{auth_method := _}, _) ->
     ignore;

+ 11 - 21
apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl

@@ -99,29 +99,19 @@ create(
         query_timeout => QueryTimeout,
         resource_id => ResourceId
     },
-    case
-        emqx_resource:create_local(
-            ResourceId,
-            ?RESOURCE_GROUP,
-            emqx_connector_mysql,
-            Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}},
-            #{}
-        )
-    of
-        {ok, _} ->
-            {ok, State};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        ResourceId,
+        ?RESOURCE_GROUP,
+        emqx_connector_mysql,
+        Config#{prepare_statement => #{?PREPARE_KEY => PrepareSql}},
+        #{}
+    ),
+    {ok, State}.
 
 update(Config, State) ->
-    case create(Config) of
-        {ok, NewState} ->
-            ok = destroy(State),
-            {ok, NewState};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, NewState} = create(Config),
+    ok = destroy(State),
+    {ok, NewState}.
 
 authenticate(#{auth_method := _}, _) ->
     ignore;

+ 11 - 23
apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl

@@ -96,31 +96,19 @@ create(
         password_hash_algorithm => Algorithm,
         resource_id => ResourceId
     },
-    case
-        emqx_resource:create_local(
-            ResourceId,
-            ?RESOURCE_GROUP,
-            emqx_connector_pgsql,
-            Config#{prepare_statement => #{ResourceId => Query}},
-            #{}
-        )
-    of
-        {ok, already_created} ->
-            {ok, State};
-        {ok, _} ->
-            {ok, State};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        ResourceId,
+        ?RESOURCE_GROUP,
+        emqx_connector_pgsql,
+        Config#{prepare_statement => #{ResourceId => Query}},
+        #{}
+    ),
+    {ok, State}.
 
 update(Config, State) ->
-    case create(Config) of
-        {ok, NewState} ->
-            ok = destroy(State),
-            {ok, NewState};
-        {error, Reason} ->
-            {error, Reason}
-    end.
+    {ok, NewState} = create(Config),
+    ok = destroy(State),
+    {ok, NewState}.
 
 authenticate(#{auth_method := _}, _) ->
     ignore;

+ 8 - 16
apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl

@@ -115,22 +115,14 @@ create(
             cmd => NCmd,
             resource_id => ResourceId
         },
-        case
-            emqx_resource:create_local(
-                ResourceId,
-                ?RESOURCE_GROUP,
-                emqx_connector_redis,
-                Config,
-                #{}
-            )
-        of
-            {ok, already_created} ->
-                {ok, NState};
-            {ok, _} ->
-                {ok, NState};
-            {error, Reason} ->
-                {error, Reason}
-        end
+        {ok, _Data} = emqx_resource:create_local(
+            ResourceId,
+            ?RESOURCE_GROUP,
+            emqx_connector_redis,
+            Config,
+            #{}
+        ),
+        {ok, NState}
     catch
         error:{unsupported_cmd, _Cmd} ->
             {error, {unsupported_cmd, Cmd}};

+ 2 - 4
apps/emqx_authz/src/emqx_authz_http.erl

@@ -52,10 +52,8 @@ description() ->
 
 init(Config) ->
     NConfig = parse_config(Config),
-    case emqx_authz_utils:create_resource(emqx_connector_http, NConfig) of
-        {error, Reason} -> error({load_config_error, Reason});
-        {ok, Id} -> NConfig#{annotations => #{id => Id}}
-    end.
+    {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_http, NConfig),
+    NConfig#{annotations => #{id => Id}}.
 
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove_local(Id).

+ 8 - 12
apps/emqx_authz/src/emqx_authz_mongodb.erl

@@ -46,18 +46,14 @@ description() ->
     "AuthZ with MongoDB".
 
 init(#{filter := Filter} = Source) ->
-    case emqx_authz_utils:create_resource(emqx_connector_mongo, Source) of
-        {error, Reason} ->
-            error({load_config_error, Reason});
-        {ok, Id} ->
-            Source#{
-                annotations => #{id => Id},
-                filter_template => emqx_authz_utils:parse_deep(
-                    Filter,
-                    ?PLACEHOLDERS
-                )
-            }
-    end.
+    {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_mongo, Source),
+    Source#{
+        annotations => #{id => Id},
+        filter_template => emqx_authz_utils:parse_deep(
+            Filter,
+            ?PLACEHOLDERS
+        )
+    }.
 
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove_local(Id).

+ 2 - 6
apps/emqx_authz/src/emqx_authz_mysql.erl

@@ -52,12 +52,8 @@ description() ->
 init(#{query := SQL} = Source0) ->
     {PrepareSQL, TmplToken} = emqx_authz_utils:parse_sql(SQL, '?', ?PLACEHOLDERS),
     Source = Source0#{prepare_statement => #{?PREPARE_KEY => PrepareSQL}},
-    case emqx_authz_utils:create_resource(emqx_connector_mysql, Source) of
-        {error, Reason} ->
-            error({load_config_error, Reason});
-        {ok, Id} ->
-            Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}
-    end.
+    {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_mysql, Source),
+    Source#{annotations => #{id => Id, tmpl_oken => TmplToken}}.
 
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove_local(Id).

+ 14 - 20
apps/emqx_authz/src/emqx_authz_postgresql.erl

@@ -54,26 +54,20 @@ init(#{query := SQL0} = Source) ->
         ?PLACEHOLDERS
     ),
     ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql),
-    case
-        emqx_resource:create_local(
-            ResourceID,
-            ?RESOURCE_GROUP,
-            emqx_connector_pgsql,
-            Source#{prepare_statement => #{ResourceID => SQL}},
-            #{}
-        )
-    of
-        {ok, _} ->
-            Source#{
-                annotations =>
-                    #{
-                        id => ResourceID,
-                        placeholders => PlaceHolders
-                    }
-            };
-        {error, Reason} ->
-            error({load_config_error, Reason})
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        ResourceID,
+        ?RESOURCE_GROUP,
+        emqx_connector_pgsql,
+        Source#{prepare_statement => #{ResourceID => SQL}},
+        #{}
+    ),
+    Source#{
+        annotations =>
+            #{
+                id => ResourceID,
+                placeholders => PlaceHolders
+            }
+    }.
 
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove_local(Id).

+ 5 - 9
apps/emqx_authz/src/emqx_authz_redis.erl

@@ -50,15 +50,11 @@ description() ->
 init(#{cmd := CmdStr} = Source) ->
     Cmd = tokens(CmdStr),
     CmdTemplate = emqx_authz_utils:parse_deep(Cmd, ?PLACEHOLDERS),
-    case emqx_authz_utils:create_resource(emqx_connector_redis, Source) of
-        {error, Reason} ->
-            error({load_config_error, Reason});
-        {ok, Id} ->
-            Source#{
-                annotations => #{id => Id},
-                cmd_template => CmdTemplate
-            }
-    end.
+    {ok, Id} = emqx_authz_utils:create_resource(emqx_connector_redis, Source),
+    Source#{
+        annotations => #{id => Id},
+        cmd_template => CmdTemplate
+    }.
 
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove_local(Id).

+ 8 - 13
apps/emqx_authz/src/emqx_authz_utils.erl

@@ -38,19 +38,14 @@
 
 create_resource(Module, Config) ->
     ResourceID = make_resource_id(Module),
-    case
-        emqx_resource:create_local(
-            ResourceID,
-            ?RESOURCE_GROUP,
-            Module,
-            Config,
-            #{}
-        )
-    of
-        {ok, already_created} -> {ok, ResourceID};
-        {ok, _} -> {ok, ResourceID};
-        {error, Reason} -> {error, Reason}
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        ResourceID,
+        ?RESOURCE_GROUP,
+        Module,
+        Config,
+        #{}
+    ),
+    {ok, ResourceID}.
 
 cleanup_resources() ->
     lists:foreach(

+ 8 - 13
apps/emqx_bridge/src/emqx_bridge.erl

@@ -273,19 +273,14 @@ create(Type, Name, Conf) ->
         name => Name,
         config => Conf
     }),
-    case
-        emqx_resource:create_local(
-            resource_id(Type, Name),
-            <<"emqx_bridge">>,
-            emqx_bridge:resource_type(Type),
-            parse_confs(Type, Name, Conf),
-            #{}
-        )
-    of
-        {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
-        {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
-        {error, Reason} -> {error, Reason}
-    end.
+    {ok, _Data} = emqx_resource:create_local(
+        resource_id(Type, Name),
+        <<"emqx_bridge">>,
+        emqx_bridge:resource_type(Type),
+        parse_confs(Type, Name, Conf),
+        #{}
+    ),
+    maybe_disable_bridge(Type, Name, Conf).
 
 update(BridgeId, {OldConf, Conf}) ->
     {BridgeType, BridgeName} = parse_bridge_id(BridgeId),

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_SUITE.erl

@@ -134,7 +134,7 @@ setup_fake_telemetry_data() ->
     Predicate = fun(#{?snk_kind := K}) -> K =:= emqx_bridge_monitor_loaded_bridge end,
     NEvents = 3,
     BackInTime = 0,
-    Timeout = 1_000,
+    Timeout = 11_000,
     {ok, Sub} = snabbkaffe_collector:subscribe(Predicate, NEvents, Timeout, BackInTime),
     ok = emqx_bridge:load(),
     {ok, _} = snabbkaffe_collector:receive_events(Sub),

+ 2 - 0
apps/emqx_connector/src/emqx_connector.erl

@@ -120,6 +120,8 @@ lookup_raw(Type, Name) ->
             end
     end.
 
+-spec create_dry_run(module(), binary() | #{binary() => term()} | [#{binary() => term()}]) ->
+    ok | {error, Reason :: term()}.
 create_dry_run(Type, Conf) ->
     emqx_bridge:create_dry_run(Type, Conf).
 

+ 1 - 1
apps/emqx_connector/test/emqx_connector_mongo_SUITE.erl

@@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     }} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual(StoppedStatus, disconnected),
-    ?assertEqual(ok, emqx_resource:health_check(PoolName)),
+    ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
     % Can call stop/1 again on an already stopped instance

+ 1 - 1
apps/emqx_connector/test/emqx_connector_mysql_SUITE.erl

@@ -102,7 +102,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     }} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual(StoppedStatus, disconnected),
-    ?assertEqual(ok, emqx_resource:health_check(PoolName)),
+    ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
     % Can call stop/1 again on an already stopped instance

+ 1 - 1
apps/emqx_connector/test/emqx_connector_pgsql_SUITE.erl

@@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
     }} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual(StoppedStatus, disconnected),
-    ?assertEqual(ok, emqx_resource:health_check(PoolName)),
+    ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
     % Can call stop/1 again on an already stopped instance

+ 1 - 1
apps/emqx_connector/test/emqx_connector_redis_SUITE.erl

@@ -109,7 +109,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
     }} =
         emqx_resource:get_instance(PoolName),
     ?assertEqual(StoppedStatus, disconnected),
-    ?assertEqual(ok, emqx_resource:health_check(PoolName)),
+    ?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
     % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
     ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
     % Can call stop/1 again on an already stopped instance

+ 14 - 28
apps/emqx_resource/src/emqx_resource.erl

@@ -186,9 +186,9 @@ create_local(InstId, Group, ResourceType, Config) ->
     resource_config(),
     create_opts()
 ) ->
-    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
+    {ok, resource_data()}.
 create_local(InstId, Group, ResourceType, Config, Opts) ->
-    call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}).
+    emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts).
 
 -spec create_dry_run(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
@@ -198,8 +198,7 @@ create_dry_run(ResourceType, Config) ->
 -spec create_dry_run_local(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run_local(ResourceType, Config) ->
-    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
-    call_instance(RandId, {create_dry_run, ResourceType, Config}).
+    emqx_resource_manager:create_dry_run(ResourceType, Config).
 
 -spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
@@ -209,7 +208,7 @@ recreate(InstId, ResourceType, Config, Opts) ->
 -spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate_local(InstId, ResourceType, Config, Opts) ->
-    call_instance(InstId, {recreate, InstId, ResourceType, Config, Opts}).
+    emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts).
 
 -spec remove(instance_id()) -> ok | {error, Reason :: term()}.
 remove(InstId) ->
@@ -217,11 +216,11 @@ remove(InstId) ->
 
 -spec remove_local(instance_id()) -> ok | {error, Reason :: term()}.
 remove_local(InstId) ->
-    call_instance(InstId, {remove, InstId}).
+    emqx_resource_manager:remove(InstId).
 
 -spec reset_metrics_local(instance_id()) -> ok.
 reset_metrics_local(InstId) ->
-    call_instance(InstId, {reset_metrics, InstId}).
+    emqx_resource_manager:reset_metrics(InstId).
 
 -spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}.
 reset_metrics(InstId) ->
@@ -236,17 +235,7 @@ query(InstId, Request) ->
 %% it is the duty of the Module to apply the `after_query()` functions.
 -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
 query(InstId, Request, AfterQuery) ->
-    case get_instance(InstId) of
-        {ok, _Group, #{status := connecting}} ->
-            query_error(connecting, <<
-                "cannot serve query when the resource "
-                "instance is still connecting"
-            >>);
-        {ok, _Group, #{status := disconnected}} ->
-            query_error(disconnected, <<
-                "cannot serve query when the resource "
-                "instance is disconnected"
-            >>);
+    case emqx_resource_manager:ets_lookup(InstId) of
         {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
             %% the resource state is readonly to Module:on_query/4
             %% and the `after_query()` functions should be thread safe
@@ -268,23 +257,23 @@ restart(InstId) ->
 
 -spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
 restart(InstId, Opts) ->
-    call_instance(InstId, {restart, InstId, Opts}).
+    emqx_resource_manager:restart(InstId, Opts).
 
 -spec stop(instance_id()) -> ok | {error, Reason :: term()}.
 stop(InstId) ->
-    call_instance(InstId, {stop, InstId}).
+    emqx_resource_manager:stop(InstId).
 
 -spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
 health_check(InstId) ->
-    call_instance(InstId, {health_check, InstId}).
+    emqx_resource_manager:health_check(InstId).
 
 set_resource_status_connecting(InstId) ->
-    call_instance(InstId, {set_resource_status_connecting, InstId}).
+    emqx_resource_manager:set_resource_status_connecting(InstId).
 
 -spec get_instance(instance_id()) ->
     {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
 get_instance(InstId) ->
-    emqx_resource_instance:lookup(InstId).
+    emqx_resource_manager:lookup(InstId).
 
 -spec list_instances() -> [instance_id()].
 list_instances() ->
@@ -292,7 +281,7 @@ list_instances() ->
 
 -spec list_instances_verbose() -> [resource_data()].
 list_instances_verbose() ->
-    emqx_resource_instance:list_all().
+    emqx_resource_manager:list_all().
 
 -spec list_instances_by_type(module()) -> [instance_id()].
 list_instances_by_type(ResourceType) ->
@@ -307,7 +296,7 @@ generate_id(Name) when is_binary(Name) ->
     <<Name/binary, ":", Id/binary>>.
 
 -spec list_group_instances(resource_group()) -> [instance_id()].
-list_group_instances(Group) -> emqx_resource_instance:list_group(Group).
+list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
 
 -spec call_start(instance_id(), module(), resource_config()) ->
     {ok, resource_state()} | {error, Reason :: term()}.
@@ -422,9 +411,6 @@ inc_metrics_funcs(InstId) ->
     OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}],
     {OnSucc, OnFailed}.
 
-call_instance(InstId, Query) ->
-    emqx_resource_instance:hash_call(InstId, Query).
-
 safe_apply(Func, Args) ->
     ?SAFE_CALL(erlang:apply(Func, Args)).
 

+ 429 - 0
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -0,0 +1,429 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_resource_manager).
+-behaviour(gen_statem).
+
+-include("emqx_resource.hrl").
+-include("emqx_resource_utils.hrl").
+
+% API
+-export([
+    ensure_resource/5,
+    create_dry_run/2,
+    ets_lookup/1,
+    get_metrics/1,
+    health_check/1,
+    list_all/0,
+    list_group/1,
+    lookup/1,
+    recreate/4,
+    remove/1,
+    reset_metrics/1,
+    restart/2,
+    set_resource_status_connecting/1,
+    stop/1
+]).
+
+% Server
+-export([start_link/5]).
+
+% Behaviour
+-export([init/1, callback_mode/0, handle_event/4, terminate/3]).
+
+% State record
+-record(data, {id, group, mod, config, opts, status, state, error}).
+
+-define(SHORT_HEALTHCHECK_INTERVAL, 1000).
+-define(HEALTHCHECK_INTERVAL, 15000).
+-define(ETS_TABLE, emqx_resource_manager).
+-define(WAIT_FOR_RESOURCE_DELAY, 100).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+%% @doc Called from emqx_resource when starting a resource instance.
+%%
+%% Triggers the emqx_resource_manager_sup supervisor to actually create
+%% and link the process itself if not already started.
+-spec ensure_resource(
+    instance_id(),
+    resource_group(),
+    resource_type(),
+    resource_config(),
+    create_opts()
+) -> {ok, resource_data()}.
+ensure_resource(InstId, Group, ResourceType, Config, Opts) ->
+    case lookup(InstId) of
+        {ok, _Group, Data} ->
+            {ok, Data};
+        {error, not_found} ->
+            do_start(InstId, Group, ResourceType, Config, Opts),
+            {ok, _Group, Data} = lookup(InstId),
+            {ok, Data}
+    end.
+
+%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
+%%
+%% Triggers the `emqx_resource_manager_sup` supervisor to actually create
+%% and link the process itself if not already started, and then immedately stops.
+-spec create_dry_run(resource_type(), resource_config()) ->
+    ok | {error, Reason :: term()}.
+create_dry_run(ResourceType, Config) ->
+    InstId = make_test_id(),
+    ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}),
+    case wait_for_resource_ready(InstId, 5000) of
+        ok ->
+            _ = stop(InstId);
+        timeout ->
+            _ = stop(InstId),
+            {error, timeout}
+    end.
+
+%% @doc Called from emqx_resource when recreating a resource which may or may not exist
+-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
+    {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
+recreate(InstId, ResourceType, NewConfig, Opts) ->
+    case lookup(InstId) of
+        {ok, Group, #{mod := ResourceType, status := _} = _Data} ->
+            _ = remove(InstId, false),
+            ensure_resource(InstId, Group, ResourceType, NewConfig, Opts);
+        {ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
+            {error, updating_to_incorrect_resource_type};
+        {error, not_found} ->
+            {error, not_found}
+    end.
+
+%% @doc Stops a running resource_manager and clears the metrics for the resource
+-spec remove(instance_id()) -> ok | {error, Reason :: term()}.
+remove(InstId) when is_binary(InstId) ->
+    remove(InstId, true).
+
+%% @doc Stops a running resource_manager and optionally clears the metrics for the resource
+-spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}.
+remove(InstId, ClearMetrics) when is_binary(InstId) ->
+    safe_call(InstId, {remove, ClearMetrics}).
+
+%% @doc Stops and then starts an instance that was already running
+-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
+restart(InstId, Opts) when is_binary(InstId) ->
+    case lookup(InstId) of
+        {ok, Group, #{mod := ResourceType, config := Config} = _Data} ->
+            _ = remove(InstId),
+            do_start(InstId, Group, ResourceType, Config, Opts);
+        Error ->
+            Error
+    end.
+
+%% @doc Stop the resource manager process
+-spec stop(instance_id()) -> ok | {error, Reason :: term()}.
+stop(InstId) ->
+    case safe_call(InstId, stop) of
+        ok ->
+            ok;
+        {error, not_found} ->
+            ok;
+        {error, _Reason} = Error ->
+            Error
+    end.
+
+%% @doc Test helper
+-spec set_resource_status_connecting(instance_id()) -> ok.
+set_resource_status_connecting(InstId) ->
+    safe_call(InstId, set_resource_status_connecting).
+
+%% @doc Lookup the group and data of a resource
+-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
+lookup(InstId) ->
+    safe_call(InstId, lookup).
+
+%% @doc Lookup the group and data of a resource
+-spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
+ets_lookup(InstId) ->
+    case ets:lookup(?ETS_TABLE, InstId) of
+        [{_Id, Group, Data}] ->
+            {ok, Group, data_record_to_external_map_with_metrics(Data)};
+        [] ->
+            {error, not_found}
+    end.
+
+%% @doc Get the metrics for the specified resource
+get_metrics(InstId) ->
+    emqx_metrics_worker:get_metrics(resource_metrics, InstId).
+
+%% @doc Reset the metrics for the specified resource
+-spec reset_metrics(instance_id()) -> ok.
+reset_metrics(InstId) ->
+    emqx_metrics_worker:reset_metrics(resource_metrics, InstId).
+
+%% @doc Returns the data for all resorces
+-spec list_all() -> [resource_data()] | [].
+list_all() ->
+    try
+        [
+            data_record_to_external_map_with_metrics(Data)
+         || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE)
+        ]
+    catch
+        error:badarg -> []
+    end.
+
+%% @doc Returns a list of ids for all the resources in a group
+-spec list_group(resource_group()) -> [instance_id()].
+list_group(Group) ->
+    List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
+    lists:flatten(List).
+
+-spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
+health_check(InstId) ->
+    safe_call(InstId, health_check).
+
+%% Server start/stop callbacks
+
+%% @doc Function called from the supervisor to actually start the server
+start_link(InstId, Group, ResourceType, Config, Opts) ->
+    Data = #data{
+        id = InstId,
+        group = Group,
+        mod = ResourceType,
+        config = Config,
+        opts = Opts,
+        status = undefined,
+        state = undefined,
+        error = undefined
+    },
+    gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
+
+init(Data) ->
+    {ok, connecting, Data}.
+
+terminate(_Reason, _State, Data) ->
+    ets:delete(?ETS_TABLE, Data#data.id),
+    ok.
+
+%% Behavior callback
+
+callback_mode() -> [handle_event_function, state_enter].
+
+%% Common event Function
+
+% Called during testing to force a specific state
+handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
+    {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]};
+% Called when the resource is to be stopped
+handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) ->
+    {next_state, stopped, Data, [{reply, From, ok}]};
+handle_event({call, From}, stop, _State, Data) ->
+    Result = do_stop(Data),
+    UpdatedData = Data#data{status = disconnected},
+    {next_state, stopped, UpdatedData, [{reply, From, Result}]};
+% Nothing happens once the stopped state is entered. It is a 'holding' state waiting for external actions.
+handle_event(enter, _OldState, stopped, Data) ->
+    {next_state, stopped, Data};
+% Called when a resource is to be stopped and removed.
+handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
+    handle_remove_event(From, ClearMetrics, Data);
+% Called when the state of the resource is being looked up.
+handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
+    Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
+    {keep_state_and_data, [{reply, From, Reply}]};
+% An external health check call. Disconnected usually means an error has happened.
+handle_event({call, From}, health_check, disconnected, Data) ->
+    Actions = [{reply, From, {error, Data#data.error}}],
+    {keep_state_and_data, Actions};
+% Resource has been explicitly stopped, so return that as the error reason.
+handle_event({call, From}, health_check, stopped, _Data) ->
+    Actions = [{reply, From, {error, stopped}}],
+    {keep_state_and_data, Actions};
+handle_event({call, From}, health_check, _State, Data) ->
+    handle_health_check_event(From, Data);
+% Connecting state enter
+handle_event(enter, connecting, connecting, Data) ->
+    handle_connecting_state_enter_event(Data);
+handle_event(enter, _OldState, connecting, Data) ->
+    Actions = [{state_timeout, 0, health_check}],
+    {next_state, connecting, Data, Actions};
+% Connecting state health_check timeouts.
+% First clause supports desired behavior on initial connection.
+handle_event(state_timeout, health_check, connecting, #data{status = disconnected} = Data) ->
+    {next_state, disconnected, Data};
+handle_event(state_timeout, health_check, connecting, Data) ->
+    connecting_health_check(Data);
+%% The connected state is entered after a successful start of the callback mod
+%% and successful health_checks
+handle_event(enter, _OldState, connected, Data) ->
+    Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
+    {next_state, connected, Data, Actions};
+handle_event(state_timeout, health_check, connected, Data) ->
+    perform_connected_health_check(Data);
+% Disconnected state entered when a healtcheck has failed.
+handle_event(enter, _OldState, disconnected, #data{id = InstId} = Data) ->
+    UpdatedData = Data#data{status = disconnected},
+    ets:delete(?ETS_TABLE, InstId),
+    {next_state, disconnected, UpdatedData}.
+
+%%------------------------------------------------------------------------------
+%% internal functions
+%%------------------------------------------------------------------------------
+
+handle_connecting_state_enter_event(Data) ->
+    Result = emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config),
+    case Result of
+        {ok, ResourceState} ->
+            UpdatedData = Data#data{state = ResourceState, status = connecting},
+            %% Perform an initial health_check immediately before transitioning into a connected state
+            Actions = [{state_timeout, 0, health_check}],
+            {next_state, connecting, UpdatedData, Actions};
+        {error, Reason} ->
+            %% Keep track of the error reason why the connection did not work
+            %% so that the Reason can be returned when the verification call is made.
+            UpdatedData = Data#data{status = disconnected, error = Reason},
+            Actions = [{state_timeout, 0, health_check}],
+            {next_state, connecting, UpdatedData, Actions}
+    end.
+
+handle_remove_event(From, ClearMetrics, Data) ->
+    do_stop(Data),
+    ets:delete(?ETS_TABLE, Data#data.id),
+    case ClearMetrics of
+        true -> ok = emqx_metrics_worker:clear_metrics(resource_metrics, Data#data.id);
+        false -> ok
+    end,
+    {stop_and_reply, normal, [{reply, From, ok}]}.
+
+do_start(InstId, Group, ResourceType, Config, Opts) ->
+    % The state machine will make the actual call to the callback/resource module after init
+    ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts),
+    ok = emqx_metrics_worker:create_metrics(
+        resource_metrics,
+        InstId,
+        [matched, success, failed, exception],
+        [matched]
+    ),
+    wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
+    ok.
+
+do_stop(#data{state = undefined} = _Data) ->
+    ok;
+do_stop(Data) ->
+    Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state),
+    ets:delete(?ETS_TABLE, Data#data.id),
+    Result.
+
+proc_name(Id) ->
+    Module = atom_to_binary(?MODULE),
+    Connector = <<"_">>,
+    binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
+
+handle_health_check_event(From, Data) ->
+    case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
+        connected ->
+            UpdatedData = Data#data{status = connected, error = undefined},
+            update_resource(Data#data.id, Data#data.group, UpdatedData),
+            Actions = [{reply, From, ok}],
+            {next_state, connected, UpdatedData, Actions};
+        {connected, NewResourceState} ->
+            UpdatedData = Data#data{
+                state = NewResourceState, status = connected, error = undefined
+            },
+            update_resource(Data#data.id, Data#data.group, UpdatedData),
+            Actions = [{reply, From, ok}],
+            {next_state, connected, UpdatedData, Actions};
+        ConnectStatus ->
+            logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
+            UpdatedData = Data#data{status = connecting, error = ConnectStatus},
+            ets:delete(?ETS_TABLE, Data#data.id),
+            Actions = [{reply, From, {error, ConnectStatus}}],
+            {next_state, connecting, UpdatedData, Actions}
+    end.
+
+connecting_health_check(Data) ->
+    case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
+        connected ->
+            UpdatedData = Data#data{status = connected, error = undefined},
+            update_resource(Data#data.id, Data#data.group, UpdatedData),
+            {next_state, connected, UpdatedData};
+        {connected, NewResourceState} ->
+            UpdatedData = Data#data{
+                state = NewResourceState, status = connected, error = undefined
+            },
+            update_resource(Data#data.id, Data#data.group, UpdatedData),
+            {next_state, connected, UpdatedData};
+        ConnectStatus ->
+            logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
+            UpdatedData = Data#data{error = ConnectStatus},
+            Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}],
+            {keep_state, UpdatedData, Actions}
+    end.
+
+perform_connected_health_check(Data) ->
+    case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
+        connected ->
+            Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
+            {keep_state_and_data, Actions};
+        {connected, NewResourceState} ->
+            UpdatedData = Data#data{
+                state = NewResourceState, status = connected, error = undefined
+            },
+            update_resource(Data#data.id, Data#data.group, UpdatedData),
+            Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
+            {keep_state, NewResourceState, Actions};
+        ConnectStatus ->
+            logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
+            UpdatedData = Data#data{error = ConnectStatus},
+            ets:delete(?ETS_TABLE, Data#data.id),
+            {next_state, connecting, UpdatedData}
+    end.
+
+data_record_to_external_map_with_metrics(Data) ->
+    #{
+        id => Data#data.id,
+        mod => Data#data.mod,
+        config => Data#data.config,
+        status => Data#data.status,
+        state => Data#data.state,
+        metrics => get_metrics(Data#data.id)
+    }.
+
+make_test_id() ->
+    RandId = iolist_to_binary(emqx_misc:gen_id(16)),
+    <<?TEST_ID_PREFIX, RandId/binary>>.
+
+-spec wait_for_resource_ready(instance_id(), integer()) -> ok | timeout.
+wait_for_resource_ready(InstId, WaitTime) ->
+    do_wait_for_resource_ready(InstId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
+
+do_wait_for_resource_ready(_InstId, 0) ->
+    timeout;
+do_wait_for_resource_ready(InstId, Retry) ->
+    case ets_lookup(InstId) of
+        {ok, _Group, #{status := connected}} ->
+            ok;
+        _ ->
+            timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
+            do_wait_for_resource_ready(InstId, Retry - 1)
+    end.
+
+safe_call(InstId, Message) ->
+    try
+        gen_statem:call(proc_name(InstId), Message)
+    catch
+        exit:_ ->
+            {error, not_found}
+    end.
+
+update_resource(InstId, Group, Data) ->
+    ets:insert(?ETS_TABLE, {InstId, Group, Data}).

+ 49 - 0
apps/emqx_resource/src/emqx_resource_manager_sup.erl

@@ -0,0 +1,49 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_resource_manager_sup).
+
+-behaviour(supervisor).
+
+-export([ensure_child/5]).
+
+-export([start_link/0]).
+
+-export([init/1]).
+
+ensure_child(InstId, Group, ResourceType, Config, Opts) ->
+    _ = supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]),
+    ok.
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    TabOpts = [named_table, set, public, {read_concurrency, true}],
+    _ = ets:new(emqx_resource_manager, TabOpts),
+
+    ChildSpecs = [
+        #{
+            id => emqx_resource_manager,
+            start => {emqx_resource_manager, start_link, []},
+            restart => transient,
+            shutdown => brutal_kill,
+            type => worker,
+            modules => [emqx_resource_manager]
+        }
+    ],
+
+    SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10},
+    {ok, {SupFlags, ChildSpecs}}.

+ 6 - 38
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -35,45 +35,13 @@ init([]) ->
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
     Metrics = emqx_metrics_worker:child_spec(resource_metrics),
 
-    Pool = ?RESOURCE_INST_MOD,
-    Mod = ?RESOURCE_INST_MOD,
-    ensure_pool(Pool, hash, [{size, ?POOL_SIZE}]),
-    ResourceInsts = [
-        begin
-            ensure_pool_worker(Pool, {Pool, Idx}, Idx),
-            #{
-                id => {Mod, Idx},
-                start => {Mod, start_link, [Pool, Idx]},
-                restart => transient,
-                shutdown => 5000,
-                type => worker,
-                modules => [Mod]
-            }
-        end
-     || Idx <- lists:seq(1, ?POOL_SIZE)
-    ],
-    HealthCheck =
+    ResourceManager =
         #{
-            id => emqx_resource_health_check_sup,
-            start => {emqx_resource_health_check_sup, start_link, []},
-            restart => transient,
+            id => emqx_resource_manager_sup,
+            start => {emqx_resource_manager_sup, start_link, []},
+            restart => permanent,
             shutdown => infinity,
             type => supervisor,
-            modules => [emqx_resource_health_check_sup]
+            modules => [emqx_resource_manager_sup]
         },
-    {ok, {SupFlags, [HealthCheck, Metrics | ResourceInsts]}}.
-
-%% internal functions
-ensure_pool(Pool, Type, Opts) ->
-    try
-        gproc_pool:new(Pool, Type, Opts)
-    catch
-        error:exists -> ok
-    end.
-
-ensure_pool_worker(Pool, Name, Slot) ->
-    try
-        gproc_pool:add_worker(Pool, Name, Slot)
-    catch
-        error:exists -> ok
-    end.
+    {ok, {SupFlags, [Metrics, ResourceManager]}}.

+ 17 - 17
apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl

@@ -20,7 +20,6 @@
 
 -export([
     introduced_in/0,
-
     create/5,
     create_dry_run/2,
     recreate/4,
@@ -29,47 +28,48 @@
 ]).
 
 -include_lib("emqx/include/bpapi.hrl").
+-include("emqx_resource.hrl").
 
 introduced_in() ->
     "5.0.0".
 
 -spec create(
-    emqx_resource:instance_id(),
-    emqx_resource:resource_group(),
-    emqx_resource:resource_type(),
-    emqx_resource:resource_config(),
-    emqx_resource:create_opts()
+    instance_id(),
+    resource_group(),
+    resource_type(),
+    resource_config(),
+    create_opts()
 ) ->
-    emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
+    emqx_cluster_rpc:multicall_return(resource_data()).
 create(InstId, Group, ResourceType, Config, Opts) ->
     emqx_cluster_rpc:multicall(emqx_resource, create_local, [
         InstId, Group, ResourceType, Config, Opts
     ]).
 
 -spec create_dry_run(
-    emqx_resource:resource_type(),
-    emqx_resource:resource_config()
+    resource_type(),
+    resource_config()
 ) ->
-    emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
+    emqx_cluster_rpc:multicall_return(resource_data()).
 create_dry_run(ResourceType, Config) ->
     emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]).
 
 -spec recreate(
-    emqx_resource:instance_id(),
-    emqx_resource:resource_type(),
-    emqx_resource:resource_config(),
-    emqx_resource:create_opts()
+    instance_id(),
+    resource_type(),
+    resource_config(),
+    create_opts()
 ) ->
-    emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
+    emqx_cluster_rpc:multicall_return(resource_data()).
 recreate(InstId, ResourceType, Config, Opts) ->
     emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]).
 
--spec remove(emqx_resource:instance_id()) ->
+-spec remove(instance_id()) ->
     emqx_cluster_rpc:multicall_return(ok).
 remove(InstId) ->
     emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]).
 
--spec reset_metrics(emqx_resource:instance_id()) ->
+-spec reset_metrics(instance_id()) ->
     emqx_cluster_rpc:multicall_return(ok).
 reset_metrics(InstId) ->
     emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]).

+ 11 - 12
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -73,7 +73,7 @@ t_create_remove(_) ->
         #{name => test_resource}
     ),
 
-    emqx_resource:recreate(
+    {ok, _} = emqx_resource:recreate(
         ?ID,
         ?TEST_RESOURCE,
         #{name => test_resource},
@@ -178,7 +178,6 @@ t_healthy(_) ->
     ),
     timer:sleep(400),
 
-    emqx_resource_health_check:create_checker(?ID, 15000, 10000),
     #{pid := Pid} = emqx_resource:query(?ID, get_state),
     timer:sleep(300),
     emqx_resource:set_resource_status_connecting(?ID),
@@ -192,10 +191,10 @@ t_healthy(_) ->
 
     erlang:exit(Pid, shutdown),
 
-    ?assertEqual(ok, emqx_resource:health_check(?ID)),
+    ?assertEqual({error, connecting}, emqx_resource:health_check(?ID)),
 
     ?assertMatch(
-        [#{status := connecting}],
+        [],
         emqx_resource:list_instances_verbose()
     ),
 
@@ -232,7 +231,7 @@ t_stop_start(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := disconnected}}},
+        {error, {emqx_resource, #{reason := not_found}}},
         emqx_resource:query(?ID, get_state)
     ),
 
@@ -275,7 +274,7 @@ t_stop_start_local(_) ->
     ?assertNot(is_process_alive(Pid0)),
 
     ?assertMatch(
-        {error, {emqx_resource, #{reason := disconnected}}},
+        {error, {emqx_resource, #{reason := not_found}}},
         emqx_resource:query(?ID, get_state)
     ),
 
@@ -323,23 +322,23 @@ t_create_dry_run_local(_) ->
     ?assertEqual(undefined, whereis(test_resource)).
 
 t_create_dry_run_local_failed(_) ->
-    {Res, _} = emqx_resource:create_dry_run_local(
+    {Res1, _} = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{cteate_error => true}
     ),
-    ?assertEqual(error, Res),
+    ?assertEqual(error, Res1),
 
-    {Res, _} = emqx_resource:create_dry_run_local(
+    {Res2, _} = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, health_check_error => true}
     ),
-    ?assertEqual(error, Res),
+    ?assertEqual(error, Res2),
 
-    {Res, _} = emqx_resource:create_dry_run_local(
+    {Res3, _} = emqx_resource:create_dry_run_local(
         ?TEST_RESOURCE,
         #{name => test_resource, stop_error => true}
     ),
-    ?assertEqual(error, Res).
+    ?assertEqual(error, Res3).
 
 t_test_func(_) ->
     ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),

+ 8 - 16
apps/emqx_retainer/src/emqx_retainer.erl

@@ -385,22 +385,14 @@ create_resource(Context, #{type := built_in_database} = Cfg) ->
     Context;
 create_resource(Context, #{type := DB} = Config) ->
     ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]),
-    case
-        emqx_resource:create(
-            ResourceID,
-            <<"emqx_retainer">>,
-            list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
-            Config,
-            #{}
-        )
-    of
-        {ok, already_created} ->
-            Context#{resource_id => ResourceID};
-        {ok, _} ->
-            Context#{resource_id => ResourceID};
-        {error, Reason} ->
-            error({load_config_error, Reason})
-    end.
+    _ = emqx_resource:create(
+        ResourceID,
+        <<"emqx_retainer">>,
+        list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
+        Config,
+        #{}
+    ),
+    Context#{resource_id => ResourceID}.
 
 -spec close_resource(context()) -> ok | {error, term()}.
 close_resource(#{resource_id := ResourceId}) ->