|
|
@@ -28,13 +28,15 @@
|
|
|
-include("emqx_resource_runtime.hrl").
|
|
|
-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
|
|
|
+-define(CACHE, ?RESOURCE_CACHE).
|
|
|
+
|
|
|
-record(connector, {
|
|
|
id :: binary(),
|
|
|
group :: binary(),
|
|
|
manager_pid :: pid(),
|
|
|
st_err :: st_err(),
|
|
|
config :: term(),
|
|
|
- stable :: term(),
|
|
|
+ cb :: term(),
|
|
|
extra = []
|
|
|
}).
|
|
|
|
|
|
@@ -47,10 +49,10 @@
|
|
|
extra = []
|
|
|
}).
|
|
|
|
|
|
--define(STATE_PT_KEY(ID), {?MODULE, ID}).
|
|
|
+-define(CB_PT_KEY(ID), {?MODULE, ID}).
|
|
|
|
|
|
new() ->
|
|
|
- emqx_utils_ets:new(?RESOURCE_STATE_CACHE, [
|
|
|
+ emqx_utils_ets:new(?CACHE, [
|
|
|
ordered_set,
|
|
|
public,
|
|
|
{read_concurrency, true},
|
|
|
@@ -70,7 +72,7 @@ write(ManagerPid, Group, Data) ->
|
|
|
status := Status,
|
|
|
added_channels := AddedChannels
|
|
|
} = Data,
|
|
|
- Stable = #{
|
|
|
+ Cb = #{
|
|
|
mod => Mod,
|
|
|
callback_mode => CallbackMode,
|
|
|
query_mode => QueryMode,
|
|
|
@@ -86,11 +88,11 @@ write(ManagerPid, Group, Data) ->
|
|
|
error => external_error(Error)
|
|
|
},
|
|
|
config = Config,
|
|
|
- stable =
|
|
|
+ cb =
|
|
|
case IsDryrun of
|
|
|
true ->
|
|
|
- %% save stable part in ets for dryrun
|
|
|
- Stable;
|
|
|
+ %% save callback state in ets for dryrun
|
|
|
+ Cb;
|
|
|
false ->
|
|
|
undefined
|
|
|
end,
|
|
|
@@ -99,16 +101,16 @@ write(ManagerPid, Group, Data) ->
|
|
|
Channels = lists:map(fun to_channel_record/1, maps:to_list(AddedChannels)),
|
|
|
%% erase old channels (if any)
|
|
|
ok = erase_old_channels(ID, maps:keys(AddedChannels)),
|
|
|
- %% put stable state in persistent_term
|
|
|
+ %% put callback state in persistent_term
|
|
|
case IsDryrun of
|
|
|
true ->
|
|
|
%% do not write persistent_term for dryrun
|
|
|
ok;
|
|
|
false ->
|
|
|
- ok = put_state_pt(ID, Stable)
|
|
|
+ ok = put_state_pt(ID, Cb)
|
|
|
end,
|
|
|
%% insert connector and channel states
|
|
|
- true = ets:insert(?RESOURCE_STATE_CACHE, [Connector | Channels]),
|
|
|
+ true = ets:insert(?CACHE, [Connector | Channels]),
|
|
|
ok.
|
|
|
|
|
|
%% @doc Read cached pieces and return a externalized map.
|
|
|
@@ -116,7 +118,7 @@ write(ManagerPid, Group, Data) ->
|
|
|
%% TODO: move `group' into `resource_data()'.
|
|
|
-spec read(resource_id()) -> [{resource_group(), resource_data()}].
|
|
|
read(ID) ->
|
|
|
- case ets:lookup(?RESOURCE_STATE_CACHE, ID) of
|
|
|
+ case ets:lookup(?CACHE, ID) of
|
|
|
[] ->
|
|
|
[];
|
|
|
[#connector{group = G} = C] ->
|
|
|
@@ -126,26 +128,26 @@ read(ID) ->
|
|
|
|
|
|
-spec read_status(resource_id()) -> not_found | st_err().
|
|
|
read_status(ID) ->
|
|
|
- ets:lookup_element(?RESOURCE_STATE_CACHE, ID, #connector.st_err, not_found).
|
|
|
+ ets:lookup_element(?CACHE, ID, #connector.st_err, not_found).
|
|
|
|
|
|
-spec read_manager_pid(resource_id()) -> not_found | pid().
|
|
|
read_manager_pid(ID) ->
|
|
|
- ets:lookup_element(?RESOURCE_STATE_CACHE, ID, #connector.manager_pid, not_found).
|
|
|
+ ets:lookup_element(?CACHE, ID, #connector.manager_pid, not_found).
|
|
|
|
|
|
-spec read_mod(resource_id()) -> not_found | {ok, module()}.
|
|
|
read_mod(ID) ->
|
|
|
- case get_state(ID) of
|
|
|
+ case get_cb(ID) of
|
|
|
undefined ->
|
|
|
not_found;
|
|
|
#{mod := Mod} ->
|
|
|
{ok, Mod}
|
|
|
end.
|
|
|
|
|
|
-get_state(ID) ->
|
|
|
- case get_state_pt(ID) of
|
|
|
+get_cb(ID) ->
|
|
|
+ case get_cb_pt(ID) of
|
|
|
undefined ->
|
|
|
%% maybe it's a dryrun connector
|
|
|
- ets:lookup_element(?RESOURCE_STATE_CACHE, ConnectorId, #connector.stable, undefined);
|
|
|
+ ets:lookup_element(?CACHE, ID, #connector.cb, undefined);
|
|
|
InPt ->
|
|
|
InPt
|
|
|
end.
|
|
|
@@ -153,9 +155,9 @@ get_state(ID) ->
|
|
|
-spec erase(resource_id()) -> ok.
|
|
|
erase(ID) ->
|
|
|
MS = ets:fun2ms(fun(#channel{id = {C, _}}) when C =:= ID -> true end),
|
|
|
- _ = ets:select_delete(?RESOURCE_STATE_CACHE, MS),
|
|
|
- _ = ets:delete(?RESOURCE_STATE_CACHE, ID),
|
|
|
- _ = del_state_pt(?STATE_PT_KEY(ID)),
|
|
|
+ _ = ets:select_delete(?CACHE, MS),
|
|
|
+ _ = ets:delete(?CACHE, ID),
|
|
|
+ _ = del_state_pt(?CB_PT_KEY(ID)),
|
|
|
ok.
|
|
|
|
|
|
erase_old_channels(ID, NewChanIds) ->
|
|
|
@@ -165,7 +167,7 @@ erase_old_channels(ID, NewChanIds) ->
|
|
|
|
|
|
erase_channel(ChanId) ->
|
|
|
Key = split_channel_id(ChanId),
|
|
|
- ets:delete(?RESOURCE_STATE_CACHE, Key).
|
|
|
+ ets:delete(?CACHE, Key).
|
|
|
|
|
|
-spec list_all() -> [resource_data()].
|
|
|
list_all() ->
|
|
|
@@ -185,11 +187,11 @@ list_all() ->
|
|
|
|
|
|
group_ids(Group) ->
|
|
|
MS = ets:fun2ms(fun(#connector{id = ID, group = G}) when G =:= Group -> ID end),
|
|
|
- ets:select(?RESOURCE_STATE_CACHE, MS).
|
|
|
+ ets:select(?CACHE, MS).
|
|
|
|
|
|
all_ids() ->
|
|
|
MS = ets:fun2ms(fun(#connector{id = ID}) -> ID end),
|
|
|
- ets:select(?RESOURCE_STATE_CACHE, MS).
|
|
|
+ ets:select(?CACHE, MS).
|
|
|
|
|
|
%% @doc The most performance-critical call.
|
|
|
%% NOTE: ID is the action ID, but not connector ID.
|
|
|
@@ -197,12 +199,12 @@ all_ids() ->
|
|
|
get_runtime(ID) ->
|
|
|
ChanKey = {ConnectorId, _ChanID} = split_channel_id(ID),
|
|
|
try
|
|
|
- Stable = get_state(ConnectorId),
|
|
|
+ Cb = get_cb(ConnectorId),
|
|
|
ChannelStatus = get_channel_status(ChanKey),
|
|
|
- StErr = ets:lookup_element(?RESOURCE_STATE_CACHE, ConnectorId, #connector.st_err),
|
|
|
+ StErr = ets:lookup_element(?CACHE, ConnectorId, #connector.st_err),
|
|
|
{ok, #rt{
|
|
|
st_err = StErr,
|
|
|
- stable = Stable,
|
|
|
+ cb = Cb,
|
|
|
channel_status = ChannelStatus
|
|
|
}}
|
|
|
catch
|
|
|
@@ -213,11 +215,11 @@ get_runtime(ID) ->
|
|
|
get_channel_status({_, ?NO_CHANNEL}) ->
|
|
|
?NO_CHANNEL;
|
|
|
get_channel_status(ChanKey) ->
|
|
|
- ets:lookup_element(?RESOURCE_STATE_CACHE, ChanKey, #channel.status, ?NO_CHANNEL).
|
|
|
+ ets:lookup_element(?CACHE, ChanKey, #channel.status, ?NO_CHANNEL).
|
|
|
|
|
|
-get_state_pt(ID) ->
|
|
|
+get_cb_pt(ID) ->
|
|
|
try
|
|
|
- persistent_term:get(?STATE_PT_KEY(ID))
|
|
|
+ persistent_term:get(?CB_PT_KEY(ID))
|
|
|
catch
|
|
|
error:badarg ->
|
|
|
undefined
|
|
|
@@ -256,21 +258,21 @@ split_channel_id(Id) when is_binary(Id) ->
|
|
|
%% Connector state is relatively static, so persistent_term update triggered GC is less of a concern
|
|
|
%% comparing to other fields such as `status' and `error', which may change very often.
|
|
|
put_state_pt(ID, State) ->
|
|
|
- case get_state_pt(ID) of
|
|
|
+ case get_cb_pt(ID) of
|
|
|
S when S =:= State ->
|
|
|
%% identical
|
|
|
ok;
|
|
|
_ ->
|
|
|
- _ = persistent_term:put(?STATE_PT_KEY(ID), State),
|
|
|
+ _ = persistent_term:put(?CB_PT_KEY(ID), State),
|
|
|
ok
|
|
|
end.
|
|
|
|
|
|
del_state_pt(ID) ->
|
|
|
- _ = persistent_term:erase(?STATE_PT_KEY(ID)),
|
|
|
+ _ = persistent_term:erase(?CB_PT_KEY(ID)),
|
|
|
ok.
|
|
|
|
|
|
is_exist(ID) ->
|
|
|
- ets:member(?RESOURCE_STATE_CACHE, ID).
|
|
|
+ ets:member(?CACHE, ID).
|
|
|
|
|
|
make_resource_data(ID, Connector, Channels) ->
|
|
|
#connector{
|
|
|
@@ -279,12 +281,12 @@ make_resource_data(ID, Connector, Channels) ->
|
|
|
status := Status
|
|
|
},
|
|
|
config = Config,
|
|
|
- stable = Stable0
|
|
|
+ cb = Cb0
|
|
|
} = Connector,
|
|
|
- Stable =
|
|
|
- case Stable0 of
|
|
|
+ Cb =
|
|
|
+ case Cb0 of
|
|
|
undefined ->
|
|
|
- get_state_pt(ID);
|
|
|
+ get_cb_pt(ID);
|
|
|
X ->
|
|
|
X
|
|
|
end,
|
|
|
@@ -293,7 +295,7 @@ make_resource_data(ID, Connector, Channels) ->
|
|
|
callback_mode := CallbackMode,
|
|
|
query_mode := QueryMode,
|
|
|
state := State
|
|
|
- } = Stable,
|
|
|
+ } = Cb,
|
|
|
#{
|
|
|
id => ID,
|
|
|
mod => Mod,
|
|
|
@@ -308,7 +310,7 @@ make_resource_data(ID, Connector, Channels) ->
|
|
|
|
|
|
find_channels(ConnectorID) ->
|
|
|
MS = ets:fun2ms(fun(#channel{id = {Cid, _}} = C) when Cid =:= ConnectorID -> C end),
|
|
|
- List = ets:select(?RESOURCE_STATE_CACHE, MS),
|
|
|
+ List = ets:select(?CACHE, MS),
|
|
|
lists:foldl(
|
|
|
fun(
|
|
|
#channel{
|