Sfoglia il codice sorgente

Merge pull request #12544 from zmstone/0220-delete-non-prod-code

0220 delete non prod code in emqx_resource.erl
Zaiming (Stone) Shi 2 anni fa
parent
commit
415d27a4c9

+ 1 - 0
apps/emqx/priv/bpapi.versions

@@ -62,6 +62,7 @@
 {emqx_prometheus,1}.
 {emqx_prometheus,2}.
 {emqx_resource,1}.
+{emqx_resource,2}.
 {emqx_retainer,1}.
 {emqx_retainer,2}.
 {emqx_rule_engine,1}.

+ 1 - 1
apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl

@@ -315,7 +315,7 @@ delete_all_bridges() ->
     %% at some point during the tests, sometimes `emqx_bridge:list()'
     %% returns an empty list, but `emqx:get_config([bridges])' returns
     %% a bunch of orphan test bridges...
-    lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()),
+    lists:foreach(fun emqx_resource:remove_local/1, emqx_resource:list_instances()),
     emqx_config:put([bridges], #{}),
     ok.
 

+ 1 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -1234,7 +1234,7 @@ delete_all_bridges() ->
     %% at some point during the tests, sometimes `emqx_bridge:list()'
     %% returns an empty list, but `emqx:get_config([bridges])' returns
     %% a bunch of orphan test bridges...
-    lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()),
+    lists:foreach(fun emqx_resource:remove_local/1, emqx_resource:list_instances()),
     emqx_config:put([bridges], #{}),
     ok.
 

+ 1 - 1
apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl

@@ -680,7 +680,7 @@ to_schema(Object) ->
         post => #{responses => #{200 => Object, 201 => Object}}
     }.
 
-rotos() -> [].
+roots() -> [].
 namespace() -> undefined.
 
 fields(good_ref) ->

+ 1 - 80
apps/emqx_resource/src/emqx_resource.erl

@@ -28,11 +28,8 @@
 
 -export([
     check_config/2,
-    check_and_create/4,
-    check_and_create/5,
     check_and_create_local/4,
     check_and_create_local/5,
-    check_and_recreate/4,
     check_and_recreate_local/4
 ]).
 
@@ -42,22 +39,14 @@
 
 %% store the config and start the instance
 -export([
-    create/4,
-    create/5,
     create_local/4,
     create_local/5,
-    %% run start/2, health_check/2 and stop/1 sequentially
-    create_dry_run/2,
     create_dry_run_local/2,
     create_dry_run_local/3,
     create_dry_run_local/4,
-    %% this will do create_dry_run, stop the old instance and start a new one
-    recreate/3,
-    recreate/4,
     recreate_local/3,
     recreate_local/4,
     %% remove the config and stop the instance
-    remove/1,
     remove_local/1,
     reset_metrics/1,
     reset_metrics_local/1,
@@ -275,16 +264,6 @@ is_resource_mod(Module) ->
 %% =================================================================================
 %% APIs for resource instances
 %% =================================================================================
--spec create(resource_id(), resource_group(), resource_type(), resource_config()) ->
-    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-create(ResId, Group, ResourceType, Config) ->
-    create(ResId, Group, ResourceType, Config, #{}).
-
--spec create(resource_id(), resource_group(), resource_type(), resource_config(), creation_opts()) ->
-    {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
-create(ResId, Group, ResourceType, Config, Opts) ->
-    emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts).
-% --------------------------------------------
 
 -spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) ->
     {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
@@ -302,11 +281,6 @@ create_local(ResId, Group, ResourceType, Config) ->
 create_local(ResId, Group, ResourceType, Config, Opts) ->
     emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts).
 
--spec create_dry_run(resource_type(), resource_config()) ->
-    ok | {error, Reason :: term()}.
-create_dry_run(ResourceType, Config) ->
-    emqx_resource_proto_v1:create_dry_run(ResourceType, Config).
-
 -spec create_dry_run_local(resource_type(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run_local(ResourceType, Config) ->
@@ -322,16 +296,6 @@ when
 create_dry_run_local(ResId, ResourceType, Config, OnReadyCallback) ->
     emqx_resource_manager:create_dry_run(ResId, ResourceType, Config, OnReadyCallback).
 
--spec recreate(resource_id(), resource_type(), resource_config()) ->
-    {ok, resource_data()} | {error, Reason :: term()}.
-recreate(ResId, ResourceType, Config) ->
-    recreate(ResId, ResourceType, Config, #{}).
-
--spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) ->
-    {ok, resource_data()} | {error, Reason :: term()}.
-recreate(ResId, ResourceType, Config, Opts) ->
-    emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts).
-
 -spec recreate_local(resource_id(), resource_type(), resource_config()) ->
     {ok, resource_data()} | {error, Reason :: term()}.
 recreate_local(ResId, ResourceType, Config) ->
@@ -342,10 +306,6 @@ recreate_local(ResId, ResourceType, Config) ->
 recreate_local(ResId, ResourceType, Config, Opts) ->
     emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
 
--spec remove(resource_id()) -> ok | {error, Reason :: term()}.
-remove(ResId) ->
-    emqx_resource_proto_v1:remove(ResId).
-
 -spec remove_local(resource_id()) -> ok.
 remove_local(ResId) ->
     case emqx_resource_manager:remove(ResId) of
@@ -369,7 +329,7 @@ reset_metrics_local(ResId) ->
 
 -spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}.
 reset_metrics(ResId) ->
-    emqx_resource_proto_v1:reset_metrics(ResId).
+    emqx_resource_proto_v2:reset_metrics(ResId).
 
 %% =================================================================================
 -spec query(resource_id(), Request :: term()) -> Result :: term().
@@ -616,31 +576,6 @@ query_mode(Mod, Config, Opts) ->
 check_config(ResourceType, Conf) ->
     emqx_hocon:check(ResourceType, Conf).
 
--spec check_and_create(
-    resource_id(),
-    resource_group(),
-    resource_type(),
-    raw_resource_config()
-) ->
-    {ok, resource_data() | 'already_created'} | {error, term()}.
-check_and_create(ResId, Group, ResourceType, RawConfig) ->
-    check_and_create(ResId, Group, ResourceType, RawConfig, #{}).
-
--spec check_and_create(
-    resource_id(),
-    resource_group(),
-    resource_type(),
-    raw_resource_config(),
-    creation_opts()
-) ->
-    {ok, resource_data() | 'already_created'} | {error, term()}.
-check_and_create(ResId, Group, ResourceType, RawConfig, Opts) ->
-    check_and_do(
-        ResourceType,
-        RawConfig,
-        fun(ResConf) -> create(ResId, Group, ResourceType, ResConf, Opts) end
-    ).
-
 -spec check_and_create_local(
     resource_id(),
     resource_group(),
@@ -665,20 +600,6 @@ check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
         fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end
     ).
 
--spec check_and_recreate(
-    resource_id(),
-    resource_type(),
-    raw_resource_config(),
-    creation_opts()
-) ->
-    {ok, resource_data()} | {error, term()}.
-check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
-    check_and_do(
-        ResourceType,
-        RawConfig,
-        fun(ResConf) -> recreate(ResId, ResourceType, ResConf, Opts) end
-    ).
-
 -spec check_and_recreate_local(
     resource_id(),
     resource_type(),

+ 4 - 0
apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl

@@ -20,6 +20,7 @@
 
 -export([
     introduced_in/0,
+    deprecated_since/0,
     create/5,
     create_dry_run/2,
     recreate/4,
@@ -33,6 +34,9 @@
 introduced_in() ->
     "5.0.0".
 
+deprecated_since() ->
+    "5.6.0".
+
 -spec create(
     resource_id(),
     resource_group(),

+ 34 - 0
apps/emqx_resource/src/proto/emqx_resource_proto_v2.erl

@@ -0,0 +1,34 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_proto_v2).
+
+-behaviour(emqx_bpapi).
+
+-export([
+    introduced_in/0,
+    reset_metrics/1
+]).
+
+-include_lib("emqx/include/bpapi.hrl").
+-include("emqx_resource.hrl").
+
+introduced_in() ->
+    "5.6.0".
+
+-spec reset_metrics(resource_id()) -> ok | {error, any()}.
+reset_metrics(ResId) ->
+    emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [ResId]).

+ 68 - 62
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -46,7 +46,7 @@ init_per_testcase(_, Config) ->
 
 end_per_testcase(_, _Config) ->
     snabbkaffe:stop(),
-    _ = emqx_resource:remove(?ID),
+    _ = emqx_resource:remove_local(?ID),
     emqx_common_test_helpers:call_janitor(),
     ok.
 
@@ -88,7 +88,7 @@ t_create_remove(_) ->
 
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -98,7 +98,7 @@ t_create_remove(_) ->
 
             ?assertMatch(
                 {ok, _},
-                emqx_resource:recreate(
+                emqx_resource:recreate_local(
                     ?ID,
                     ?TEST_RESOURCE,
                     #{name => test_resource},
@@ -110,8 +110,8 @@ t_create_remove(_) ->
 
             ?assert(is_process_alive(Pid)),
 
-            ?assertEqual(ok, emqx_resource:remove(?ID)),
-            ?assertMatch(ok, emqx_resource:remove(?ID)),
+            ?assertEqual(ok, emqx_resource:remove_local(?ID)),
+            ?assertMatch(ok, emqx_resource:remove_local(?ID)),
 
             ?assertNot(is_process_alive(Pid))
         end,
@@ -136,7 +136,7 @@ t_create_remove_local(_) ->
 
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create_local(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -185,7 +185,7 @@ t_do_not_start_after_created(_) ->
         begin
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create_local(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -226,7 +226,7 @@ t_do_not_start_after_created(_) ->
     ).
 
 t_query(_) ->
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -243,7 +243,7 @@ t_query(_) ->
     ok = emqx_resource:remove_local(?ID).
 
 t_query_counter(_) ->
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -260,7 +260,7 @@ t_query_counter(_) ->
 
 t_batch_query_counter(_) ->
     BatchSize = 100,
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -313,7 +313,7 @@ t_batch_query_counter(_) ->
     ok = emqx_resource:remove_local(?ID).
 
 t_query_counter_async_query(_) ->
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -371,7 +371,7 @@ t_query_counter_async_callback(_) ->
         ets:insert(Tab, {make_ref(), Result})
     end,
     ReqOpts = #{async_reply_fun => {Insert, [Tab0]}},
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -452,7 +452,7 @@ t_query_counter_async_inflight(_) ->
     end,
     ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
     WindowSize = 15,
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -635,7 +635,7 @@ t_query_counter_async_inflight_batch(_) ->
     ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end,
     BatchSize = 2,
     WindowSize = 15,
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -837,7 +837,7 @@ t_healthy_timeout(_) ->
         begin
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create_local(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -866,7 +866,7 @@ t_healthy(_) ->
         begin
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create_local(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -904,7 +904,7 @@ t_unhealthy_target(_) ->
     HealthCheckError = {unhealthy_target, "some message"},
     ?assertMatch(
         {ok, _},
-        emqx_resource:create_local(
+        create(
             ?ID,
             ?DEFAULT_RESOURCE_GROUP,
             ?TEST_RESOURCE,
@@ -937,7 +937,7 @@ t_stop_start(_) ->
         begin
             ?assertMatch(
                 {error, _},
-                emqx_resource:check_and_create(
+                emqx_resource:check_and_create_local(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -947,7 +947,7 @@ t_stop_start(_) ->
 
             ?assertMatch(
                 {ok, _},
-                emqx_resource:check_and_create(
+                emqx_resource:check_and_create_local(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -964,7 +964,7 @@ t_stop_start(_) ->
 
             ?assertMatch(
                 {ok, _},
-                emqx_resource:check_and_recreate(
+                emqx_resource:check_and_recreate_local(
                     ?ID,
                     ?TEST_RESOURCE,
                     #{<<"name">> => <<"test_resource">>},
@@ -1071,13 +1071,13 @@ t_stop_start_local(_) ->
     ).
 
 t_list_filter(_) ->
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         emqx_resource:generate_id(<<"a">>),
         <<"group1">>,
         ?TEST_RESOURCE,
         #{name => a}
     ),
-    {ok, _} = emqx_resource:create_local(
+    {ok, _} = create(
         emqx_resource:generate_id(<<"a">>),
         <<"group2">>,
         ?TEST_RESOURCE,
@@ -1208,7 +1208,7 @@ t_test_func(_) ->
     ).
 
 t_reset_metrics(_) ->
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1218,11 +1218,11 @@ t_reset_metrics(_) ->
     {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
     emqx_resource:reset_metrics(?ID),
     ?assert(is_process_alive(Pid)),
-    ok = emqx_resource:remove(?ID),
+    ok = emqx_resource:remove_local(?ID),
     ?assertNot(is_process_alive(Pid)).
 
 t_auto_retry(_) ->
-    {Res, _} = emqx_resource:create_local(
+    {Res, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1239,7 +1239,7 @@ t_start_throw_error(_Config) ->
     ?assertMatch(
         {{ok, _}, {ok, _}},
         ?wait_async_action(
-            emqx_resource:create_local(
+            create(
                 ?ID,
                 ?DEFAULT_RESOURCE_GROUP,
                 ?TEST_RESOURCE,
@@ -1257,7 +1257,7 @@ t_start_throw_error(_Config) ->
 t_health_check_disconnected(_) ->
     ?check_trace(
         begin
-            _ = emqx_resource:create_local(
+            _ = create(
                 ?ID,
                 ?DEFAULT_RESOURCE_GROUP,
                 ?TEST_RESOURCE,
@@ -1276,7 +1276,7 @@ t_health_check_disconnected(_) ->
     ).
 
 t_unblock_only_required_buffer_workers(_) ->
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1292,7 +1292,7 @@ t_unblock_only_required_buffer_workers(_) ->
         fun emqx_resource_buffer_worker:block/1,
         emqx_resource_buffer_worker_sup:worker_pids(?ID)
     ),
-    emqx_resource:create(
+    create(
         ?ID1,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1323,7 +1323,7 @@ t_unblock_only_required_buffer_workers(_) ->
     ).
 
 t_retry_batch(_Config) ->
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1422,7 +1422,7 @@ t_retry_batch(_Config) ->
 
 t_delete_and_re_create_with_same_name(_Config) ->
     NumBufferWorkers = 2,
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1489,7 +1489,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
             %% re-create the resource with the *same name*
             {{ok, _}, {ok, _Events}} =
                 ?wait_async_action(
-                    emqx_resource:create(
+                    create(
                         ?ID,
                         ?DEFAULT_RESOURCE_GROUP,
                         ?TEST_RESOURCE,
@@ -1521,7 +1521,7 @@ t_delete_and_re_create_with_same_name(_Config) ->
 %% check that, if we configure a max queue size too small, then we
 %% never send requests and always overflow.
 t_always_overflow(_Config) ->
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1560,7 +1560,7 @@ t_always_overflow(_Config) ->
 t_retry_sync_inflight(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1610,7 +1610,7 @@ t_retry_sync_inflight(_Config) ->
 t_retry_sync_inflight_batch(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1662,7 +1662,7 @@ t_retry_sync_inflight_batch(_Config) ->
 t_retry_async_inflight(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1706,7 +1706,7 @@ t_retry_async_inflight_full(_Config) ->
     ResumeInterval = 1_000,
     AsyncInflightWindow = 5,
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1769,7 +1769,7 @@ t_async_reply_multi_eval(_Config) ->
     AsyncInflightWindow = 3,
     TotalQueries = AsyncInflightWindow * 5,
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1818,7 +1818,7 @@ t_async_reply_multi_eval(_Config) ->
 t_retry_async_inflight_batch(_Config) ->
     ResumeInterval = 1_000,
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1865,7 +1865,7 @@ t_async_pool_worker_death(_Config) ->
     ResumeInterval = 1_000,
     NumBufferWorkers = 2,
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1956,7 +1956,7 @@ t_async_pool_worker_death(_Config) ->
 
 t_expiration_sync_before_sending(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1973,7 +1973,7 @@ t_expiration_sync_before_sending(_Config) ->
 
 t_expiration_sync_batch_before_sending(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -1991,7 +1991,7 @@ t_expiration_sync_batch_before_sending(_Config) ->
 
 t_expiration_async_before_sending(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2008,7 +2008,7 @@ t_expiration_async_before_sending(_Config) ->
 
 t_expiration_async_batch_before_sending(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2089,7 +2089,7 @@ do_t_expiration_before_sending(QueryMode) ->
 
 t_expiration_sync_before_sending_partial_batch(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2108,7 +2108,7 @@ t_expiration_sync_before_sending_partial_batch(_Config) ->
 
 t_expiration_async_before_sending_partial_batch(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2265,7 +2265,7 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
 
 t_expiration_async_after_reply(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2282,7 +2282,7 @@ t_expiration_async_after_reply(_Config) ->
 
 t_expiration_async_batch_after_reply(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2407,7 +2407,7 @@ do_t_expiration_async_after_reply(IsBatch) ->
 t_expiration_batch_all_expired_after_reply(_Config) ->
     ResumeInterval = 300,
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2496,7 +2496,7 @@ t_expiration_batch_all_expired_after_reply(_Config) ->
 
 t_expiration_retry(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2512,7 +2512,7 @@ t_expiration_retry(_Config) ->
 
 t_expiration_retry_batch(_Config) ->
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2612,7 +2612,7 @@ do_t_expiration_retry() ->
 t_expiration_retry_batch_multiple_times(_Config) ->
     ResumeInterval = 300,
     emqx_connector_demo:set_callback_mode(always_sync),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2680,7 +2680,7 @@ t_expiration_retry_batch_multiple_times(_Config) ->
 
 t_recursive_flush(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2695,7 +2695,7 @@ t_recursive_flush(_Config) ->
 
 t_recursive_flush_batch(_Config) ->
     emqx_connector_demo:set_callback_mode(async_if_possible),
-    {ok, _} = emqx_resource:create(
+    {ok, _} = create(
         ?ID,
         ?DEFAULT_RESOURCE_GROUP,
         ?TEST_RESOURCE,
@@ -2750,7 +2750,7 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
             %% calls, even if the underlying connector itself only
             %% supports sync calls.
             emqx_connector_demo:set_callback_mode(always_sync),
-            {ok, _} = emqx_resource:create(
+            {ok, _} = create(
                 ?ID,
                 ?DEFAULT_RESOURCE_GROUP,
                 ?TEST_RESOURCE,
@@ -2775,7 +2775,7 @@ t_call_mode_uncoupled_from_query_mode(_Config) ->
             %% calls can be called synchronously, but the underlying
             %% call should be async.
             emqx_connector_demo:set_callback_mode(async_if_possible),
-            {ok, _} = emqx_resource:create(
+            {ok, _} = create(
                 ?ID,
                 ?DEFAULT_RESOURCE_GROUP,
                 ?TEST_RESOURCE,
@@ -2829,7 +2829,7 @@ t_volatile_offload_mode(_Config) ->
             %% default to equal max bytes.
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -2842,7 +2842,7 @@ t_volatile_offload_mode(_Config) ->
             %% Create with segment bytes < max bytes
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -2857,7 +2857,7 @@ t_volatile_offload_mode(_Config) ->
             %% Create with segment bytes = max bytes
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -2874,7 +2874,7 @@ t_volatile_offload_mode(_Config) ->
             %% to max bytes.
             ?assertMatch(
                 {ok, _},
-                emqx_resource:create(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -2931,7 +2931,7 @@ t_late_call_reply(_Config) ->
     RequestTTL = 500,
     ?assertMatch(
         {ok, _},
-        emqx_resource:create(
+        create(
             ?ID,
             ?DEFAULT_RESOURCE_GROUP,
             ?TEST_RESOURCE,
@@ -2987,7 +2987,7 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) ->
     ?check_trace(
         begin
             ?wait_async_action(
-                emqx_resource:create_local(
+                create(
                     ?ID,
                     ?DEFAULT_RESOURCE_GROUP,
                     ?TEST_RESOURCE,
@@ -3288,3 +3288,9 @@ gauge_metric_set_fns() ->
             _ -> false
         end
     ].
+
+create(Id, Group, Type, Config) ->
+    emqx_resource:create_local(Id, Group, Type, Config).
+
+create(Id, Group, Type, Config, Opts) ->
+    emqx_resource:create_local(Id, Group, Type, Config, Opts).