Просмотр исходного кода

Merge pull request #13838 from zmstone/0918-store-connector-state-in-pt

perf: optimize resource status/state/error cache
zmstone 1 год назад
Родитель
Сommit
5622735f09

+ 4 - 0
Makefile

@@ -332,6 +332,10 @@ fmt: $(REBAR)
 	@$(SCRIPTS)/erlfmt -w 'bin/nodetool'
 	@mix format
 
+.PHONY: fmt-diff
+fmt-diff:
+	@env ERLFMT_WRITE=true ./scripts/git-hook-pre-commit.sh
+
 .PHONY: clean-test-cluster-config
 clean-test-cluster-config:
 	@rm -f apps/emqx_conf/data/configs/cluster.hocon || true

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge_resource.erl

@@ -282,7 +282,7 @@ create_dry_run(Type0, Conf0) ->
     end.
 
 create_dry_run_bridge_v1(Type, Conf0) ->
-    TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
+    TmpName = ?PROBE_ID_NEW(),
     TmpPath = emqx_utils:safe_filename(TmpName),
     %% Already type checked, no need to catch errors
     TypeBin = bin(Type),

+ 2 - 2
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -781,7 +781,7 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
     end.
 
 create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) ->
-    BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
+    BridgeName = ?PROBE_ID_NEW(),
     ConnectorType = connector_type(BridgeV2Type),
     OnReadyCallback =
         fun(ConnectorId) ->
@@ -1708,7 +1708,7 @@ get_conf_root_key(_NoMatch) ->
 
 bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
     RawConf = maps:without([<<"name">>], RawConfig0),
-    TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
+    TmpName = ?PROBE_ID_NEW(),
     PreviousRawConf = undefined,
     try
         #{

+ 2 - 2
apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl

@@ -532,11 +532,11 @@ do_send_msg(sync, KafkaTopic, KafkaMessage, Producers, SyncTimeout) ->
             {error, timeout}
     end;
 do_send_msg(async, KafkaTopic, KafkaMessage, Producers, AsyncReplyFn) ->
-    %% * Must be a batch because wolff:cast2 are batch APIs
+    %% * Must be a batch because wolff send and cast are batch APIs
     %% * Must be a single element batch because wolff books calls, but not batch sizes
     %%   for counters and gauges.
     Batch = [KafkaMessage],
-    {_Partition, Pid} = wolff:cast2(
+    {_Partition, Pid} = wolff:send2(
         Producers, KafkaTopic, Batch, {fun ?MODULE:on_kafka_ack/3, [AsyncReplyFn]}
     ),
     %% This Pid is returned, but not monitored by caller

+ 0 - 9
apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl

@@ -234,14 +234,6 @@ t_rest_api(Config) ->
     },
     ok = kafka_bridge_rest_api_helper(Cfg).
 
-%% So that we can check if new atoms are created when they are not supposed to be created
-pre_create_atoms() ->
-    [
-        kafka_producer__probe_,
-        probedryrun,
-        kafka__probe_
-    ].
-
 http_get_bridges(UrlPath, Name0) ->
     Name = iolist_to_binary(Name0),
     {ok, _Code, BridgesData} = http_get(UrlPath),
@@ -307,7 +299,6 @@ kafka_bridge_rest_api_helper(Config) ->
         ?assertMatch([#{<<"type">> := <<"kafka">>}], http_get_bridges(BridgesParts, BridgeName)),
         %% Probe should work
         %% no extra atoms should be created when probing
-        %% See pre_create_atoms() above
         AtomsBefore = erlang:system_info(atom_count),
         {ok, 204, _} = http_post(BridgesProbeParts, CreateBody),
         AtomsAfter = erlang:system_info(atom_count),

+ 1 - 1
apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl

@@ -859,7 +859,7 @@ t_invalid_partition_count_metrics(Config) ->
             %% Simulate `invalid_partition_count'
             emqx_common_test_helpers:with_mock(
                 wolff,
-                cast2,
+                send2,
                 fun(_Producers, _Topic, _Msgs, _AckCallback) ->
                     throw(#{
                         cause => invalid_partition_count,

+ 6 - 4
apps/emqx_connector/src/emqx_connector_resource.erl

@@ -52,6 +52,8 @@
 
 -export([parse_url/1]).
 
+-define(PROBE_ID_SEP, $_).
+
 -callback connector_config(ParsedConfig, Context) ->
     ParsedConfig
 when
@@ -90,7 +92,8 @@ parse_connector_id(ConnectorId) ->
     {atom(), atom() | binary()}.
 parse_connector_id(<<"connector:", ConnectorId/binary>>, Opts) ->
     parse_connector_id(ConnectorId, Opts);
-parse_connector_id(<<?TEST_ID_PREFIX, _:16/binary, ConnectorId/binary>>, Opts) ->
+parse_connector_id(?PROBE_ID_MATCH(Suffix), Opts) ->
+    <<?PROBE_ID_SEP, ConnectorId/binary>> = Suffix,
     parse_connector_id(ConnectorId, Opts);
 parse_connector_id(ConnectorId, Opts) ->
     emqx_resource:parse_resource_id(ConnectorId, Opts).
@@ -214,9 +217,8 @@ create_dry_run(Type, Conf0, Callback) ->
     TypeAtom = safe_atom(Type),
     %% We use a fixed name here to avoid creating an atom
     %% to avoid potential race condition, the resource id should be unique
-    Prefix = emqx_resource_manager:make_test_id(),
-    TmpName =
-        iolist_to_binary([Prefix, TypeBin, ":", <<"probedryrun">>]),
+    Prefix = ?PROBE_ID_NEW(),
+    TmpName = iolist_to_binary([Prefix, ?PROBE_ID_SEP, TypeBin, $:, "dryrun"]),
     TmpPath = emqx_utils:safe_filename(TmpName),
     Conf1 = maps:without([<<"name">>], Conf0),
     RawConf = #{<<"connectors">> => #{TypeBin => #{<<"temp_name">> => Conf1}}},

+ 14 - 4
apps/emqx_resource/include/emqx_resource.hrl

@@ -14,6 +14,8 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
+-ifndef(EMQX_RESOURCE_HRL).
+-define(EMQX_RESOURCE_HRL, true).
 %% bridge/connector/action status
 -define(status_connected, connected).
 -define(status_connecting, connecting).
@@ -26,7 +28,7 @@
 -type resource_type() :: atom().
 -type resource_module() :: module().
 -type resource_id() :: binary().
--type channel_id() :: binary().
+-type channel_id() :: action_resource_id() | source_resource_id().
 -type raw_resource_config() :: binary() | raw_term_resource_config().
 -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
 -type resource_config() :: term().
@@ -71,9 +73,9 @@
     query_mode := query_mode(),
     config := resource_config(),
     error := term(),
-    state := resource_state(),
     status := resource_status(),
-    added_channels := term()
+    added_channels := term(),
+    state := resource_state()
 }.
 -type resource_group() :: binary().
 -type creation_opts() :: #{
@@ -157,7 +159,12 @@
 
 %% Keep this test_id_prefix is match "^[A-Za-z0-9]+[A-Za-z0-9-_]*$".
 %% See `hocon_tconf`
--define(TEST_ID_PREFIX, "t_probe_").
+-define(PROBE_ID_PREFIX, "PROBE_").
+-define(PROBE_ID_RAND_BYTES, 8).
+-define(PROBE_ID_NEW(),
+    iolist_to_binary([?PROBE_ID_PREFIX, emqx_utils:rand_id(?PROBE_ID_RAND_BYTES)])
+).
+-define(PROBE_ID_MATCH(Suffix), <<?PROBE_ID_PREFIX, _:?PROBE_ID_RAND_BYTES/binary, Suffix/binary>>).
 -define(RES_METRICS, resource_metrics).
 -define(LOG_LEVEL(_L_),
     case _L_ of
@@ -168,3 +175,6 @@
 -define(TAG, "RESOURCE").
 
 -define(RESOURCE_ALLOCATION_TAB, emqx_resource_allocations).
+-define(RESOURCE_CACHE, emqx_resource_cache).
+
+-endif.

+ 47 - 0
apps/emqx_resource/include/emqx_resource_runtime.hrl

@@ -0,0 +1,47 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_RESOURCE_RUNTIME_HRL).
+-define(EMQX_RESOURCE_RUNTIME_HRL, true).
+
+-include("emqx_resource.hrl").
+
+-define(NO_CHANNEL, no_channel).
+
+%% status and error, cached in ets for each connector
+-type st_err() :: #{
+    status := resource_status(),
+    error := term()
+}.
+
+%% the relatively stable part to be cached in persistent_term for each connector
+-type cb() :: #{
+    mod := module(),
+    callback_mode := callback_mode(),
+    query_mode := query_mode(),
+    state := term()
+}.
+
+%% the rutime context to be used for each channel
+-record(rt, {
+    st_err :: st_err(),
+    cb :: cb(),
+    channel_status :: ?NO_CHANNEL | channel_status()
+}).
+
+-type runtime() :: #rt{}.
+
+-endif.

+ 14 - 10
apps/emqx_resource/src/emqx_resource.erl

@@ -473,14 +473,16 @@ fetch_creation_opts(Opts) ->
 
 -spec list_instances() -> [resource_id()].
 list_instances() ->
-    [Id || #{id := Id} <- list_instances_verbose()].
+    emqx_resource_cache:all_ids().
 
 -spec list_instances_verbose() -> [_ResourceDataWithMetrics :: map()].
 list_instances_verbose() ->
-    [
-        Res#{metrics => get_metrics(ResId)}
-     || #{id := ResId} = Res <- emqx_resource_manager:list_all()
-    ].
+    lists:map(
+        fun(#{id := ResId} = Res) ->
+            Res#{metrics => get_metrics(ResId)}
+        end,
+        emqx_resource_manager:list_all()
+    ).
 
 -spec list_instances_by_type(module()) -> [resource_id()].
 list_instances_by_type(ResourceType) ->
@@ -797,11 +799,13 @@ validate_name(Name) ->
     ok.
 
 -spec is_dry_run(resource_id()) -> boolean().
-is_dry_run(ResId) ->
-    case string:find(ResId, ?TEST_ID_PREFIX) of
-        nomatch -> false;
-        TestIdStart -> string:equal(TestIdStart, ResId)
-    end.
+is_dry_run(?PROBE_ID_MATCH(_)) ->
+    %% A probe connector
+    true;
+is_dry_run(ID) ->
+    %% A probe action/source
+    RE = ":" ++ ?PROBE_ID_PREFIX ++ "[a-zA-Z0-9]{8}:",
+    match =:= re:run(ID, RE, [{capture, none}]).
 
 validate_name(<<>>, _Opts) ->
     invalid_data("Name cannot be empty string");

+ 58 - 71
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -19,7 +19,7 @@
 
 -module(emqx_resource_buffer_worker).
 
--include("emqx_resource.hrl").
+-include("emqx_resource_runtime.hrl").
 -include("emqx_resource_errors.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("stdlib/include/ms_transform.hrl").
@@ -1205,27 +1205,34 @@ handle_async_worker_down(Data0, Pid) ->
 -spec call_query(force_sync | async_if_possible, _, _, _, _, _) -> _.
 call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
     ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM}),
-    case emqx_resource_manager:lookup_cached(extract_connector_id(Id)) of
+    case emqx_resource_cache:get_runtime(Id) of
         %% This seems to be the only place where the `rm_status_stopped' status matters,
         %% to distinguish from the `disconnected' status.
-        {ok, _Group, #{status := ?rm_status_stopped}} ->
+        {ok, #rt{st_err = #{status := ?rm_status_stopped}}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
-        {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
+        {ok, #rt{st_err = #{status := ?status_connecting, error := unhealthy_target}}} ->
             {error, {unrecoverable_error, unhealthy_target}};
-        {ok, _Group, Resource} ->
-            PrevLoggerProcessMetadata = logger:get_process_metadata(),
-            QueryResult =
-                try
-                    set_rule_id_trace_meta_data(Query),
-                    do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource)
-                after
-                    reset_logger_process_metadata(PrevLoggerProcessMetadata)
-                end,
-            QueryResult;
+        {ok, #rt{st_err = #{status := Status}, cb = Resource, channel_status = ChanSt}} ->
+            IsAlwaysSend = is_always_send(QueryOpts, Resource),
+            case Status =:= ?status_connected orelse IsAlwaysSend of
+                true ->
+                    call_query2(QM, Id, Index, Ref, Query, QueryOpts, Resource, ChanSt);
+                false ->
+                    ?RESOURCE_ERROR(not_connected, "resource not connected")
+            end;
         {error, not_found} ->
             ?RESOURCE_ERROR(not_found, "resource not found")
     end.
 
+call_query2(QM, Id, Index, Ref, Query, QueryOpts, Resource, ChanSt) ->
+    PrevLoggerProcessMetadata = logger:get_process_metadata(),
+    try
+        set_rule_id_trace_meta_data(Query),
+        do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource, ChanSt)
+    after
+        reset_logger_process_metadata(PrevLoggerProcessMetadata)
+    end.
+
 set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
     %% Get the rule ids from requests
     RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
@@ -1290,75 +1297,53 @@ extract_connector_id(Id) when is_binary(Id) ->
             <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>;
         _ ->
             Id
-    end;
-extract_connector_id(Id) ->
-    Id.
-
-is_channel_id(Id) ->
-    extract_connector_id(Id) =/= Id.
+    end.
 
 %% Check if channel is installed in the connector state.
 %% There is no need to query the conncector if the channel is not
 %% installed as the query will fail anyway.
-pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when
-    is_map_key(Id, Channels)
-->
-    ChannelStatus = maps:get(Id, Channels),
-    case emqx_resource_manager:channel_status_is_channel_added(ChannelStatus) of
+pre_query_channel_check(Id, {Id, _} = _Request, ChanSt, IsSimpleQuery) ->
+    case emqx_resource_manager:channel_status_is_channel_added(ChanSt) of
         true ->
             ok;
         false ->
-            error_if_channel_is_not_installed(Id, QueryOpts)
+            error_if_channel_is_not_installed(Id, IsSimpleQuery)
     end;
-pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) ->
-    error_if_channel_is_not_installed(Id, QueryOpts);
-pre_query_channel_check(_Request, _Channels, _QueryOpts) ->
+pre_query_channel_check(_Id, _Request, _ChanSt, _IsSimpleQuery) ->
+    %% Not a per-channel request
     ok.
 
-error_if_channel_is_not_installed(Id, QueryOpts) ->
+error_if_channel_is_not_installed(Id, IsSimpleQuery) ->
     %% Fail with a recoverable error if the channel is not installed and there are buffer
     %% workers involved so that the operation can be retried.  Otherwise, this is
     %% unrecoverable.  It is emqx_resource_manager's responsibility to ensure that the
     %% channel installation is retried.
-    IsSimpleQuery = is_simple_query(QueryOpts),
-    case is_channel_id(Id) of
-        true when IsSimpleQuery ->
-            {error,
-                {unrecoverable_error,
-                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
-        true ->
-            {error,
-                {recoverable_error,
-                    iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))}};
-        false ->
-            ok
-    end.
+    Msg = iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id])),
+    ErrorType =
+        case IsSimpleQuery of
+            true ->
+                unrecoverable_error;
+            false ->
+                recoverable_error
+        end,
+    {error, {ErrorType, Msg}}.
 
-do_call_query(QM, Id, Index, Ref, Query, #{query_mode := ReqQM} = QueryOpts, Resource) when
-    ?IS_BYPASS(ReqQM)
-->
+is_always_send(#{query_mode := M}, _) when ?IS_BYPASS(M) ->
     %% The query overrides the query mode of the resource, send even in disconnected state
-    ?tp(simple_query_override, #{query_mode => ReqQM}),
-    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
-    CallMode = call_mode(QM, CBM),
+    ?tp(simple_query_override, #{query_mode => M}),
     ?tp(simple_query_enter, #{}),
-    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
-do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{query_mode := ResQM} = Resource) when
-    ?IS_BYPASS(ResQM)
-->
+    true;
+is_always_send(_, #{query_mode := M}) when ?IS_BYPASS(M) ->
     %% The connector supports buffer, send even in disconnected state
-    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
-    CallMode = call_mode(QM, CBM),
     ?tp(simple_query_enter, #{}),
-    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
-do_call_query(QM, Id, Index, Ref, Query, QueryOpts, #{status := connected} = Resource) ->
-    %% when calling from the buffer worker or other simple queries,
-    %% only apply the query fun when it's at connected status
-    #{mod := Mod, state := ResSt, callback_mode := CBM, added_channels := Channels} = Resource,
+    true;
+is_always_send(_, _) ->
+    false.
+
+do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource, ChanSt) ->
+    #{mod := Mod, state := ResSt, callback_mode := CBM} = Resource,
     CallMode = call_mode(QM, CBM),
-    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, Channels, QueryOpts);
-do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
-    ?RESOURCE_ERROR(not_connected, "resource not connected").
+    apply_query_fun(CallMode, Mod, Id, Index, Ref, Query, ResSt, ChanSt, QueryOpts).
 
 -define(APPLY_RESOURCE(NAME, EXPR, REQ),
     try
@@ -1395,7 +1380,7 @@ apply_query_fun(
     _Ref,
     ?QUERY(_, Request, _, _, _TraceCtx) = _Query,
     ResSt,
-    Channels,
+    ChanSt,
     QueryOpts
 ) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
@@ -1403,7 +1388,7 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_query,
             begin
-                case pre_query_channel_check(Request, Channels, QueryOpts) of
+                case pre_query_channel_check(Id, Request, ChanSt, is_simple_query(QueryOpts)) of
                     ok ->
                         Mod:on_query(extract_connector_id(Id), Request, ResSt);
                     Error ->
@@ -1422,7 +1407,7 @@ apply_query_fun(
     Ref,
     ?QUERY(_, Request, _, _, _TraceCtx) = Query,
     ResSt,
-    Channels,
+    ChanSt,
     QueryOpts
 ) ->
     ?tp(call_query_async, #{
@@ -1447,7 +1432,7 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            case pre_query_channel_check(Request, Channels, QueryOpts) of
+            case pre_query_channel_check(Id, Request, ChanSt, IsSimpleQuery) of
                 ok ->
                     case
                         Mod:on_query_async(
@@ -1476,7 +1461,7 @@ apply_query_fun(
     _Ref,
     [?QUERY(_, FirstRequest, _, _, _) | _] = Batch,
     ResSt,
-    Channels,
+    ChanSt,
     QueryOpts
 ) ->
     ?tp(call_batch_query, #{
@@ -1489,7 +1474,9 @@ apply_query_fun(
         ?APPLY_RESOURCE(
             call_batch_query,
             begin
-                case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
+                case
+                    pre_query_channel_check(Id, FirstRequest, ChanSt, is_simple_query(QueryOpts))
+                of
                     ok ->
                         Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt);
                     Error ->
@@ -1508,7 +1495,7 @@ apply_query_fun(
     Ref,
     [?QUERY(_, FirstRequest, _, _, _) | _] = Batch,
     ResSt,
-    Channels,
+    ChanSt,
     QueryOpts
 ) ->
     ?tp(call_batch_query_async, #{
@@ -1536,7 +1523,7 @@ apply_query_fun(
             AsyncWorkerMRef = undefined,
             InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
             ok = inflight_append(InflightTID, InflightItem),
-            case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of
+            case pre_query_channel_check(Id, FirstRequest, ChanSt, IsSimpleQuery) of
                 ok ->
                     case
                         Mod:on_batch_query_async(

+ 327 - 0
apps/emqx_resource/src/emqx_resource_cache.erl

@@ -0,0 +1,327 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_cache).
+
+%% CRUD APIs
+-export([new/0, write/3, is_exist/1, read/1, erase/1]).
+%% For Config management
+-export([all_ids/0, list_all/0, group_ids/1]).
+%% For health checks etc.
+-export([read_status/1, read_mod/1, read_manager_pid/1]).
+%% Hot-path
+-export([get_runtime/1]).
+
+-include("emqx_resource_runtime.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-define(CACHE, ?RESOURCE_CACHE).
+-define(NO_CB, no_state).
+
+-record(connector, {
+    id :: binary(),
+    group :: binary(),
+    manager_pid :: pid(),
+    st_err :: st_err(),
+    config :: term(),
+    cb = ?NO_CB :: term(),
+    extra = []
+}).
+
+-type chan_key() :: {connector_resource_id(), channel_id()}.
+
+-record(channel, {
+    id :: chan_key(),
+    error :: term(),
+    status :: channel_status(),
+    extra = []
+}).
+
+-define(CB_PT_KEY(ID), {?MODULE, ID}).
+
+new() ->
+    emqx_utils_ets:new(?CACHE, [
+        ordered_set,
+        public,
+        {read_concurrency, true},
+        {keypos, 2}
+    ]).
+
+-spec write(pid(), binary(), resource_data()) -> ok.
+write(ManagerPid, Group, Data) ->
+    #{
+        id := ID,
+        mod := Mod,
+        callback_mode := CallbackMode,
+        query_mode := QueryMode,
+        config := Config,
+        error := Error,
+        state := State,
+        status := Status,
+        added_channels := AddedChannels
+    } = Data,
+    Cb = #{
+        mod => Mod,
+        callback_mode => CallbackMode,
+        query_mode => QueryMode,
+        state => State
+    },
+    IsDryrun = emqx_resource:is_dry_run(ID),
+    Connector = #connector{
+        id = ID,
+        group = Group,
+        manager_pid = ManagerPid,
+        st_err = #{
+            status => Status,
+            error => external_error(Error)
+        },
+        config = Config,
+        cb =
+            case IsDryrun of
+                true ->
+                    %% save callback state in ets for dryrun
+                    Cb;
+                false ->
+                    ?NO_CB
+            end,
+        extra = []
+    },
+    Channels = lists:map(fun to_channel_record/1, maps:to_list(AddedChannels)),
+    %% erase old channels (if any)
+    ok = erase_old_channels(ID, maps:keys(AddedChannels)),
+    %% put callback state in persistent_term
+    case IsDryrun of
+        true ->
+            %% do not write persistent_term for dryrun
+            ok;
+        false ->
+            ok = put_state_pt(ID, Cb)
+    end,
+    %% insert connector and channel states
+    true = ets:insert(?CACHE, [Connector | Channels]),
+    ok.
+
+%% @doc Read cached pieces and return a externalized map.
+%% NOTE: Do not call this in hot-path.
+%% TODO: move `group' into `resource_data()'.
+-spec read(resource_id()) -> [{resource_group(), resource_data()}].
+read(ID) ->
+    case ets:lookup(?CACHE, ID) of
+        [] ->
+            [];
+        [#connector{group = G} = C] ->
+            Channels = find_channels(ID),
+            [{G, make_resource_data(ID, C, Channels)}]
+    end.
+
+-spec read_status(resource_id()) -> not_found | st_err().
+read_status(ID) ->
+    ets:lookup_element(?CACHE, ID, #connector.st_err, not_found).
+
+-spec read_manager_pid(resource_id()) -> not_found | pid().
+read_manager_pid(ID) ->
+    ets:lookup_element(?CACHE, ID, #connector.manager_pid, not_found).
+
+-spec read_mod(resource_id()) -> not_found | {ok, module()}.
+read_mod(ID) ->
+    case get_cb(ID) of
+        ?NO_CB ->
+            not_found;
+        #{mod := Mod} ->
+            {ok, Mod}
+    end.
+
+get_cb(ID) ->
+    case get_cb_pt(ID) of
+        ?NO_CB ->
+            %% maybe it's a dryrun connector
+            ets:lookup_element(?CACHE, ID, #connector.cb, ?NO_CB);
+        InPt ->
+            InPt
+    end.
+
+-spec erase(resource_id()) -> ok.
+erase(ID) ->
+    MS = ets:fun2ms(fun(#channel{id = {C, _}}) when C =:= ID -> true end),
+    _ = ets:select_delete(?CACHE, MS),
+    _ = ets:delete(?CACHE, ID),
+    _ = del_state_pt(?CB_PT_KEY(ID)),
+    ok.
+
+erase_old_channels(ID, NewChanIds) ->
+    OldChanIds = maps:keys(find_channels(ID)),
+    DelChanIds = OldChanIds -- NewChanIds,
+    lists:foreach(fun erase_channel/1, DelChanIds).
+
+erase_channel(ChanId) ->
+    Key = split_channel_id(ChanId),
+    ets:delete(?CACHE, Key).
+
+-spec list_all() -> [resource_data()].
+list_all() ->
+    IDs = all_ids(),
+    lists:foldr(
+        fun(ID, Acc) ->
+            case read(ID) of
+                [] ->
+                    Acc;
+                [{_G, Data}] ->
+                    [Data | Acc]
+            end
+        end,
+        [],
+        IDs
+    ).
+
+group_ids(Group) ->
+    MS = ets:fun2ms(fun(#connector{id = ID, group = G}) when G =:= Group -> ID end),
+    ets:select(?CACHE, MS).
+
+all_ids() ->
+    MS = ets:fun2ms(fun(#connector{id = ID}) -> ID end),
+    ets:select(?CACHE, MS).
+
+%% @doc The most performance-critical call.
+%% NOTE: ID is the action ID, but not connector ID.
+-spec get_runtime(resource_id()) -> {ok, runtime()} | {error, not_found}.
+get_runtime(ID) ->
+    ChanKey = {ConnectorId, _ChanID} = split_channel_id(ID),
+    try
+        Cb = get_cb(ConnectorId),
+        ChannelStatus = get_channel_status(ChanKey),
+        StErr = ets:lookup_element(?CACHE, ConnectorId, #connector.st_err),
+        {ok, #rt{
+            st_err = StErr,
+            cb = Cb,
+            channel_status = ChannelStatus
+        }}
+    catch
+        error:badarg ->
+            {error, not_found}
+    end.
+
+get_channel_status({_, ?NO_CHANNEL}) ->
+    ?NO_CHANNEL;
+get_channel_status(ChanKey) ->
+    ets:lookup_element(?CACHE, ChanKey, #channel.status, ?NO_CHANNEL).
+
+get_cb_pt(ID) ->
+    persistent_term:get(?CB_PT_KEY(ID), ?NO_CB).
+
+to_channel_record({ID0, #{status := Status, error := Error}}) ->
+    ID = split_channel_id(ID0),
+    #channel{
+        id = ID,
+        status = Status,
+        error = Error,
+        extra = []
+    }.
+
+split_channel_id(Id) when is_binary(Id) ->
+    case binary:split(Id, <<":">>, [global]) of
+        [
+            ChannelGlobalType,
+            ChannelSubType,
+            ChannelName,
+            <<"connector">>,
+            ConnectorType,
+            ConnectorName
+        ] ->
+            ConnectorId = <<"connector:", ConnectorType/binary, ":", ConnectorName/binary>>,
+            ChannelId =
+                <<ChannelGlobalType/binary, ":", ChannelSubType/binary, ":", ChannelName/binary>>,
+            {ConnectorId, ChannelId};
+        _ ->
+            %% this is not a per-channel query, e.g. for authn/authz
+            {Id, ?NO_CHANNEL}
+    end.
+
+%% State can be quite bloated, caching it in ets means excessive large term copies,
+%% for each and every query so we keep it in persistent_term instead.
+%% Connector state is relatively static, so persistent_term update triggered GC is less of a concern
+%% comparing to other fields such as `status' and `error', which may change very often.
+put_state_pt(ID, State) ->
+    case get_cb_pt(ID) of
+        S when S =:= State ->
+            %% identical
+            ok;
+        _ ->
+            _ = persistent_term:put(?CB_PT_KEY(ID), State),
+            ok
+    end.
+
+del_state_pt(ID) ->
+    _ = persistent_term:erase(?CB_PT_KEY(ID)),
+    ok.
+
+is_exist(ID) ->
+    ets:member(?CACHE, ID).
+
+make_resource_data(ID, Connector, Channels) ->
+    #connector{
+        st_err = #{
+            error := Error,
+            status := Status
+        },
+        config = Config,
+        cb = Cb0
+    } = Connector,
+    Cb =
+        case Cb0 of
+            ?NO_CB ->
+                get_cb_pt(ID);
+            X ->
+                X
+        end,
+    #{
+        mod := Mod,
+        callback_mode := CallbackMode,
+        query_mode := QueryMode,
+        state := State
+    } = Cb,
+    #{
+        id => ID,
+        mod => Mod,
+        callback_mode => CallbackMode,
+        query_mode => QueryMode,
+        error => Error,
+        status => Status,
+        config => Config,
+        added_channels => Channels,
+        state => State
+    }.
+
+find_channels(ConnectorID) ->
+    MS = ets:fun2ms(fun(#channel{id = {Cid, _}} = C) when Cid =:= ConnectorID -> C end),
+    List = ets:select(?CACHE, MS),
+    lists:foldl(
+        fun(
+            #channel{
+                id = {ConnectorId, ChannelId},
+                status = Status,
+                error = Error
+            },
+            Acc
+        ) ->
+            Key = iolist_to_binary([ChannelId, ":", ConnectorId]),
+            Acc#{Key => #{status => Status, error => Error}}
+        end,
+        #{},
+        List
+    ).
+
+external_error({error, Reason}) -> Reason;
+external_error(Other) -> Other.

+ 74 - 0
apps/emqx_resource/src/emqx_resource_cache_cleaner.erl

@@ -0,0 +1,74 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_cache_cleaner).
+
+-export([start_link/0]).
+-export([
+    init/1,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    terminate/2
+]).
+-export([add/2]).
+
+-define(SERVER, ?MODULE).
+
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+add(ID, Pid) ->
+    gen_server:call(?SERVER, {add, ID, Pid}, infinity).
+
+init(_) ->
+    process_flag(trap_exit, true),
+    {ok, #{pmon => emqx_pmon:new()}}.
+
+handle_call({add, ID, Pid}, _From, #{pmon := Pmon} = State) ->
+    NewPmon = emqx_pmon:monitor(Pid, ID, Pmon),
+    {reply, ok, State#{pmon => NewPmon}};
+handle_call(_Request, _From, State) ->
+    {reply, ok, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, #{pmon := Pmon} = State) ->
+    NewPmon =
+        case emqx_pmon:find(Pid, Pmon) of
+            {ok, ID} ->
+                maybe_erase_cache(Pid, ID),
+                emqx_pmon:erase(Pid, Pmon);
+            error ->
+                Pmon
+        end,
+    {noreply, State#{pmon => NewPmon}};
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+maybe_erase_cache(DownManager, ID) ->
+    case emqx_resource_cache:read_manager_pid(ID) =:= DownManager of
+        true ->
+            emqx_resource_cache:erase(ID);
+        false ->
+            %% already erased, or already replaced by another manager due to quick
+            %% retart by supervisor
+            ok
+    end.

+ 49 - 80
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -55,8 +55,7 @@
 ]).
 
 -export([
-    set_resource_status_connecting/1,
-    make_test_id/0
+    set_resource_status_connecting/1
 ]).
 
 % Server
@@ -109,6 +108,7 @@
     },
     extra
 }).
+
 -type data() :: #data{}.
 -type channel_status_map() :: #{status := channel_status(), error := term()}.
 
@@ -230,7 +230,7 @@ create(ResId, Group, ResourceType, Config, Opts) ->
 -spec create_dry_run(resource_module(), resource_config()) ->
     ok | {error, Reason :: term()}.
 create_dry_run(ResourceType, Config) ->
-    ResId = make_test_id(),
+    ResId = ?PROBE_ID_NEW(),
     create_dry_run(ResId, ResourceType, Config).
 
 create_dry_run(ResId, ResourceType, Config) ->
@@ -380,17 +380,16 @@ lookup(ResId) ->
 %% @doc Lookup the group and data of a resource from the cache
 -spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
 lookup_cached(ResId) ->
-    try read_cache(ResId) of
-        Data = #data{group = Group} ->
-            {ok, Group, data_record_to_external_map(Data)}
-    catch
-        error:badarg ->
+    case read_cache(ResId) of
+        [{Group, Data}] ->
+            {ok, Group, Data};
+        [] ->
             {error, not_found}
     end.
 
 %% @doc Check if the resource is cached.
 is_exist(ResId) ->
-    {error, not_found} =/= lookup_cached(ResId).
+    emqx_resource_cache:is_exist(ResId).
 
 %% @doc Get the metrics for the specified resource
 get_metrics(ResId) ->
@@ -404,16 +403,12 @@ reset_metrics(ResId) ->
 %% @doc Returns the data for all resources
 -spec list_all() -> [resource_data()].
 list_all() ->
-    lists:map(
-        fun data_record_to_external_map/1,
-        gproc:select({local, names}, [{{?NAME('_'), '_', '$1'}, [], ['$1']}])
-    ).
+    emqx_resource_cache:list_all().
 
 %% @doc Returns a list of ids for all the resources in a group
 -spec list_group(resource_group()) -> [resource_id()].
 list_group(Group) ->
-    Guard = {'==', {element, #data.group, '$1'}, Group},
-    gproc:select({local, names}, [{{?NAME('$2'), '_', '$1'}, [Guard], ['$2']}]).
+    emqx_resource_cache:group_ids(Group).
 
 -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
 health_check(ResId) ->
@@ -519,11 +514,11 @@ force_kill(ResId, MRef0) ->
     end.
 
 try_clean_allocated_resources(ResId) ->
-    case try_read_cache(ResId) of
-        #data{mod = Mod} ->
+    case emqx_resource_cache:read_mod(ResId) of
+        {ok, Mod} ->
             catch emqx_resource:clean_allocated_resources(ResId, Mod),
             ok;
-        _ ->
+        not_found ->
             ok
     end.
 
@@ -558,6 +553,7 @@ init({DataIn, Opts}) ->
         true ->
             %% init the cache so that lookup/1 will always return something
             UpdatedData = update_state(Data#data{status = ?status_connecting}),
+            emqx_resource_cache_cleaner:add(Data#data.id, self()),
             {ok, ?state_connecting, UpdatedData, {next_event, internal, start_resource}};
         false ->
             %% init the cache so that lookup/1 will always return something
@@ -581,7 +577,7 @@ callback_mode() -> [handle_event_function, state_enter].
 
 % Called during testing to force a specific state
 handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
-    UpdatedData = update_state(Data#data{status = ?status_connecting}, Data),
+    UpdatedData = update_state(Data#data{status = ?status_connecting}),
     {next_state, ?state_connecting, UpdatedData, [{reply, From, ok}]};
 % Called when the resource is to be restarted
 handle_event({call, From}, restart, _State, Data) ->
@@ -600,7 +596,7 @@ handle_event({call, From}, stop, ?state_stopped, _Data) ->
     {keep_state_and_data, [{reply, From, ok}]};
 handle_event({call, From}, stop, _State, Data) ->
     UpdatedData = stop_resource(Data),
-    {next_state, ?state_stopped, update_state(UpdatedData, Data), [{reply, From, ok}]};
+    {next_state, ?state_stopped, update_state(UpdatedData), [{reply, From, ok}]};
 % Called when a resource is to be stopped and removed.
 handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
     handle_remove_event(From, ClearMetrics, Data);
@@ -733,8 +729,10 @@ handle_event(EventType, EventData, State, Data) ->
     ),
     keep_state_and_data.
 
-log_status_consistency(Status, #data{status = Status} = Data) ->
-    log_cache_consistency(read_cache(Data#data.id), remove_runtime_data(Data));
+log_status_consistency(Status, #data{status = Status} = Data0) ->
+    [{_Group, Cached}] = read_cache(Data0#data.id),
+    Data = data_record_to_external_map(Data0),
+    log_cache_consistency(Cached, Data);
 log_status_consistency(Status, Data) ->
     ?tp(warning, "inconsistent_status", #{
         status => Status,
@@ -752,21 +750,14 @@ log_cache_consistency(DataCached, Data) ->
 %%------------------------------------------------------------------------------
 %% internal functions
 %%------------------------------------------------------------------------------
-insert_cache(ResId, Data = #data{}) ->
-    gproc:set_value(?NAME(ResId), Data).
+insert_cache(Group, Data) ->
+    emqx_resource_cache:write(self(), Group, Data).
 
 read_cache(ResId) ->
-    gproc:lookup_value(?NAME(ResId)).
+    emqx_resource_cache:read(ResId).
 
-erase_cache(_Data = #data{id = ResId}) ->
-    gproc:unreg(?NAME(ResId)).
-
-try_read_cache(ResId) ->
-    try
-        read_cache(ResId)
-    catch
-        error:badarg -> not_found
-    end.
+erase_cache(#data{id = ResId}) ->
+    emqx_resource_cache:erase(ResId).
 
 retry_actions(Data) ->
     case maps:get(health_check_interval, Data#data.opts, ?HEALTHCHECK_INTERVAL) of
@@ -803,7 +794,7 @@ start_resource(Data, From) ->
             UpdatedData3 = maybe_update_callback_mode(UpdatedData2),
 
             Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
-            {next_state, ?state_connecting, update_state(UpdatedData3, Data), Actions};
+            {next_state, ?state_connecting, update_state(UpdatedData3), Actions};
         {error, Reason} = Err ->
             IsDryRun = emqx_resource:is_dry_run(ResId),
             ?SLOG(
@@ -822,7 +813,7 @@ start_resource(Data, From) ->
             %% so that the Reason can be returned when the verification call is made.
             NewData2 = NewData1#data{status = ?status_disconnected, error = Err},
             Actions = maybe_reply(retry_actions(NewData2), From, Err),
-            {next_state, ?state_disconnected, update_state(NewData2, Data), Actions}
+            {next_state, ?state_disconnected, update_state(NewData2), Actions}
     end.
 
 add_channels(Data) ->
@@ -1015,10 +1006,6 @@ terminate_health_check_workers(Data) ->
         Pending
     ).
 
-make_test_id() ->
-    RandId = iolist_to_binary(emqx_utils:gen_id(16)),
-    <<?TEST_ID_PREFIX, RandId/binary>>.
-
 handle_add_channel(From, Data, ChannelId, Config) ->
     Channels = Data#data.added_channels,
     case
@@ -1036,7 +1023,7 @@ handle_add_channel(From, Data, ChannelId, Config) ->
             %% take care of the rest
             NewChannels = maps:put(ChannelId, channel_status_not_added(Config), Channels),
             NewData = Data#data{added_channels = NewChannels},
-            {keep_state, update_state(NewData, Data), [
+            {keep_state, update_state(NewData), [
                 {reply, From, ok}
             ]};
         true ->
@@ -1049,7 +1036,7 @@ handle_not_connected_add_channel(From, ChannelId, ChannelConfig, State, Data) ->
     %% When state is not connected the channel will be added to the channels
     %% map but nothing else will happen.
     NewData = add_or_update_channel_status(Data, ChannelId, ChannelConfig, State),
-    {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
+    {keep_state, update_state(NewData), [{reply, From, ok}]}.
 
 handle_remove_channel(From, ChannelId, Data) ->
     Channels = Data#data.added_channels,
@@ -1092,7 +1079,7 @@ handle_remove_channel_exists(From, ChannelId, Data) ->
                 state = NewState,
                 added_channels = NewAddedChannelsMap
             },
-            {keep_state, update_state(UpdatedData, Data), [{reply, From, ok}]};
+            {keep_state, update_state(UpdatedData), [{reply, From, ok}]};
         {error, Reason} = Error ->
             IsDryRun = emqx_resource:is_dry_run(Id),
             ?SLOG(
@@ -1117,7 +1104,7 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
     NewData = Data#data{added_channels = NewChannels},
     IsDryRun = emqx_resource:is_dry_run(Data#data.id),
     _ = maybe_clear_alarm(IsDryRun, ChannelId),
-    {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
+    {keep_state, update_state(NewData), [{reply, From, ok}]}.
 
 handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when
     map_size(HCWorkers) > 0
@@ -1191,7 +1178,7 @@ continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
     Data1 = Data0#data{
         state = NewState, status = NewStatus, error = Err
     },
-    Data = update_state(Data1, Data0),
+    Data = update_state(Data1),
     case CurrentState of
         ?state_connected ->
             continue_resource_health_check_connected(NewStatus, Data);
@@ -1206,7 +1193,7 @@ continue_resource_health_check_connected(NewStatus, Data0) ->
         ?status_connected ->
             {Replies, Data1} = reply_pending_resource_health_check_callers(NewStatus, Data0),
             Data2 = channels_health_check(?status_connected, Data1),
-            Data = update_state(Data2, Data0),
+            Data = update_state(Data2),
             Actions = Replies ++ resource_health_check_actions(Data),
             {keep_state, Data, Actions};
         _ ->
@@ -1311,7 +1298,7 @@ channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
     Data1 = add_channels_in_list(ChannelsNotAddedWithConfigs, Data0),
     %% Now that we have done the adding, we can get the status of all channels
     Data2 = trigger_health_check_for_added_channels(Data1),
-    update_state(Data2, Data0);
+    update_state(Data2);
 channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
     %% Whenever the resource is connecting:
     %% 1. Change the status of all added channels to connecting
@@ -1349,7 +1336,7 @@ channels_health_check(?status_connecting = _ConnectorStatus, Data0) ->
         ChannelsWithNewAndPrevErrorStatuses
     ),
     Data1 = Data0#data{added_channels = NewChannels},
-    update_state(Data1, Data0);
+    update_state(Data1);
 channels_health_check(ConnectorStatus, Data0) ->
     %% Whenever the resource is not connected and not connecting:
     %% 1. Remove all added channels
@@ -1393,7 +1380,7 @@ channels_health_check(ConnectorStatus, Data0) ->
         ChannelsWithNewAndOldStatuses
     ),
     Data2 = Data1#data{added_channels = NewChannels},
-    update_state(Data2, Data0).
+    update_state(Data2).
 
 resource_not_connected_channel_error_msg(ResourceStatus, ChannelId, Data1) ->
     ResourceId = Data1#data.id,
@@ -1572,7 +1559,7 @@ handle_channel_health_check_worker_down(Data0, WorkerRef, ExitResult) ->
     ),
     CHCActions = start_channel_health_check_action(ChannelId, NewStatus, PreviousChanStatus, Data),
     Actions = Replies ++ CHCActions,
-    {keep_state, update_state(Data, Data0), Actions}.
+    {keep_state, update_state(Data), Actions}.
 
 handle_channel_health_check_worker_down_new_channels_and_status(
     ChannelId,
@@ -1635,28 +1622,11 @@ get_config_from_map_or_channel_status(ChannelId, ChannelIdToConfig, ChannelStatu
     end.
 
 -spec update_state(data()) -> data().
-update_state(Data) ->
-    update_state(Data, undefined).
-
--spec update_state(data(), data() | undefined) -> data().
-update_state(DataWas, DataWas) ->
-    DataWas;
-update_state(Data, _DataWas) ->
-    _ = insert_cache(Data#data.id, remove_runtime_data(Data)),
+update_state(#data{group = Group} = Data) ->
+    ToCache = data_record_to_external_map(Data),
+    ok = insert_cache(Group, ToCache),
     Data.
 
-remove_runtime_data(#data{} = Data0) ->
-    Data0#data{
-        hc_workers = #{
-            resource => #{},
-            channel => #{pending => [], ongoing => #{}}
-        },
-        hc_pending_callers = #{
-            resource => [],
-            channel => #{}
-        }
-    }.
-
 health_check_interval(Opts) ->
     maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
 
@@ -1768,12 +1738,13 @@ wait_for_ready(ResId, WaitTime) ->
 do_wait_for_ready(_ResId, 0) ->
     timeout;
 do_wait_for_ready(ResId, Retry) ->
-    case try_read_cache(ResId) of
-        #data{status = ?status_connected} ->
+    case emqx_resource_cache:read_status(ResId) of
+        #{status := ?status_connected} ->
             ok;
-        #data{status = ?status_disconnected, error = Err} ->
-            {error, external_error(Err)};
+        #{status := ?status_disconnected, error := Err} ->
+            {error, Err};
         _ ->
+            %% connecting, or not_found
             timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
             do_wait_for_ready(ResId, Retry - 1)
     end.
@@ -1862,13 +1833,11 @@ channel_error_status(Reason) ->
         error => Reason
     }.
 
-channel_status_is_channel_added(#{
-    status := ?status_connected
-}) ->
+channel_status_is_channel_added(#{status := St}) ->
+    channel_status_is_channel_added(St);
+channel_status_is_channel_added(?status_connected) ->
     true;
-channel_status_is_channel_added(#{
-    status := ?status_connecting
-}) ->
+channel_status_is_channel_added(?status_connecting) ->
     true;
 channel_status_is_channel_added(_Status) ->
     false.

+ 10 - 1
apps/emqx_resource/src/emqx_resource_sup.erl

@@ -27,8 +27,17 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
+    ok = emqx_resource_cache:new(),
     SupFlags = #{strategy => one_for_one, intensity => 10, period => 10},
     Metrics = emqx_metrics_worker:child_spec(?RES_METRICS),
+    CacheCleaner = #{
+        id => emqx_resource_cache_cleaner,
+        start => {emqx_resource_cache_cleaner, start_link, []},
+        restart => permanent,
+        shutdown => 5_000,
+        type => worker,
+        modules => [emqx_resource_cache_cleaner]
+    },
     ResourceManager =
         #{
             id => emqx_resource_manager_sup,
@@ -45,4 +54,4 @@ init([]) ->
         shutdown => infinity,
         type => supervisor
     },
-    {ok, {SupFlags, [Metrics, ResourceManager, WorkerSup]}}.
+    {ok, {SupFlags, [Metrics, CacheCleaner, ResourceManager, WorkerSup]}}.

+ 39 - 0
apps/emqx_resource/test/emqx_resource_tests.erl

@@ -0,0 +1,39 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_resource_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+-include("emqx_resource.hrl").
+
+is_dry_run_test_() ->
+    [
+        ?_assert(emqx_resource:is_dry_run(?PROBE_ID_NEW())),
+        ?_assertNot(emqx_resource:is_dry_run("foobar")),
+        ?_assert(emqx_resource:is_dry_run(bin([?PROBE_ID_NEW(), "_abc"]))),
+        ?_assert(
+            emqx_resource:is_dry_run(
+                bin(["action:typeA:", ?PROBE_ID_NEW(), ":connector:typeB:dryrun"])
+            )
+        ),
+        ?_assertNot(
+            emqx_resource:is_dry_run(
+                bin(["action:type1:dryrun:connector:typeb:dryrun"])
+            )
+        )
+    ].
+
+bin(X) -> iolist_to_binary(X).

+ 14 - 10
scripts/git-hook-pre-commit.sh

@@ -6,20 +6,24 @@ if [ -n "${FORCE:-}" ]; then
     exit 0
 fi
 
-OPT="${1:--c}"
-
-# mix format check is quite fast
-which mix && mix format --check-formatted
-
 files_dirty="$(git diff --name-only | grep -E '.*\.erl' || true)"
 files_cached="$(git diff --cached --name-only | grep -E '.*\.erl' || true)"
 if [[ "${files_dirty}" == '' ]] && [[ "${files_cached}" == '' ]]; then
     exit 0
 fi
 files="$(echo -e "${files_dirty} \n ${files_cached}" | xargs)"
-# shellcheck disable=SC2086
-if ! (./scripts/erlfmt $OPT $files); then
-    echo "EXECUTE 'make fmt' to fix" >&2
-    exit 1
+
+# mix format check is quite fast
+which mix && mix format --check-formatted
+
+if [ "${ERLFMT_WRITE:-false}" = 'true' ]; then
+    # shellcheck disable=SC2086
+    ./scripts/erlfmt -w $files
+else
+    # shellcheck disable=SC2086
+    if ! (./scripts/erlfmt -c $files); then
+        echo "EXECUTE 'make fmt-diff' to fix" >&2
+        exit 1
+    fi
+    ./scripts/apps-version-check.sh
 fi
-./scripts/apps-version-check.sh