Quellcode durchsuchen

Merge pull request #8783 from terry-xiaoyu/refactor_resource_query_mode

fix: use gen_statem:cast/3 for async query
Xinyu Liu vor 3 Jahren
Ursprung
Commit
8db9b6690c

+ 3 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -23,6 +23,7 @@
 -type resource_state() :: term().
 -type resource_status() :: connected | disconnected | connecting | stopped.
 -type callback_mode() :: always_sync | async_if_possible.
+-type query_mode() :: async | sync | dynamic.
 -type result() :: term().
 -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
 -type query_opts() :: #{
@@ -34,6 +35,7 @@
     id := resource_id(),
     mod := module(),
     callback_mode := callback_mode(),
+    query_mode := query_mode(),
     config := resource_config(),
     state := resource_state(),
     status := resource_status(),
@@ -67,7 +69,7 @@
     batch_time => pos_integer(),
     enable_queue => boolean(),
     queue_max_bytes => pos_integer(),
-    query_mode => async | sync | dynamic,
+    query_mode => query_mode(),
     resume_interval => pos_integer(),
     async_inflight_window => pos_integer()
 }.

+ 14 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -18,6 +18,7 @@
 
 -include("emqx_resource.hrl").
 -include("emqx_resource_utils.hrl").
+-include("emqx_resource_errors.hrl").
 
 %% APIs for resource types
 
@@ -254,7 +255,19 @@ query(ResId, Request) ->
 -spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) ->
     Result :: term().
 query(ResId, Request, Opts) ->
-    emqx_resource_worker:query(ResId, Request, Opts).
+    case emqx_resource_manager:ets_lookup(ResId) of
+        {ok, _Group, #{query_mode := QM, status := connected}} ->
+            case QM of
+                sync -> emqx_resource_worker:sync_query(ResId, Request, Opts);
+                async -> emqx_resource_worker:async_query(ResId, Request, Opts)
+            end;
+        {ok, _Group, #{status := stopped}} ->
+            ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
+        {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
+            ?RESOURCE_ERROR(not_connected, "resource not connected");
+        {error, not_found} ->
+            ?RESOURCE_ERROR(not_found, "resource not found")
+    end.
 
 -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
 simple_sync_query(ResId, Request) ->

+ 9 - 1
apps/emqx_resource/src/emqx_resource_manager.erl

@@ -53,7 +53,9 @@
 -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
 
 % State record
--record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}).
+-record(data, {
+    id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error
+}).
 -type data() :: #data{}.
 
 -define(ETS_TABLE, ?MODULE).
@@ -264,6 +266,11 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
         group = Group,
         mod = ResourceType,
         callback_mode = emqx_resource:get_callback_mode(ResourceType),
+        %% query_mode = dynamic | sync | async
+        %% TODO:
+        %%  dynamic mode is async mode when things are going well, but becomes sync mode
+        %%  if the resource worker is overloaded
+        query_mode = maps:get(query_mode, Opts, sync),
         config = Config,
         opts = Opts,
         status = connecting,
@@ -585,6 +592,7 @@ data_record_to_external_map_with_metrics(Data) ->
         id => Data#data.id,
         mod => Data#data.mod,
         callback_mode => Data#data.callback_mode,
+        query_mode => Data#data.query_mode,
         config => Data#data.config,
         status => Data#data.status,
         state => Data#data.state,

+ 47 - 34
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -29,7 +29,8 @@
 
 -export([
     start_link/3,
-    query/3,
+    sync_query/3,
+    async_query/3,
     block/1,
     block/2,
     resume/1
@@ -72,12 +73,17 @@ callback_mode() -> [state_functions, state_enter].
 start_link(Id, Index, Opts) ->
     gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
 
--spec query(id(), request(), query_opts()) -> Result :: term().
-query(Id, Request, Opts) ->
+-spec sync_query(id(), request(), query_opts()) -> Result :: term().
+sync_query(Id, Request, Opts) ->
     PickKey = maps:get(pick_key, Opts, self()),
     Timeout = maps:get(timeout, Opts, infinity),
     pick_call(Id, PickKey, {query, Request, Opts}, Timeout).
 
+-spec async_query(id(), request(), query_opts()) -> Result :: term().
+async_query(Id, Request, Opts) ->
+    PickKey = maps:get(pick_key, Opts, self()),
+    pick_cast(Id, PickKey, {query, Request, Opts}).
+
 %% simple query the resource without batching and queuing messages.
 -spec simple_sync_query(id(), request()) -> Result :: term().
 simple_sync_query(Id, Request) ->
@@ -125,11 +131,6 @@ init({Id, Index, Opts}) ->
         id => Id,
         index => Index,
         name => Name,
-        %% query_mode = dynamic | sync | async
-        %% TODO:
-        %%  dynamic mode is async mode when things are going well, but becomes sync mode
-        %%  if the resource worker is overloaded
-        query_mode => maps:get(query_mode, Opts, sync),
         async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
         enable_batch => maps:get(enable_batch, Opts, false),
         batch_size => BatchSize,
@@ -151,9 +152,11 @@ running(cast, block, St) ->
 running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
     Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
     {next_state, block, St#{queue := Q1}};
-running({call, From0}, {query, Request, Opts}, #{query_mode := QM} = St) ->
-    From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)),
+running({call, From}, {query, Request, _Opts}, St) ->
     query_or_acc(From, Request, St);
+running(cast, {query, Request, Opts}, St) ->
+    ReplayFun = maps:get(async_reply_fun, Opts, undefined),
+    query_or_acc(ReplayFun, Request, St);
 running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
     flush(St#{tref := undefined});
 running(info, {flush, _Ref}, _St) ->
@@ -173,11 +176,15 @@ blocked(cast, resume, St) ->
     do_resume(St);
 blocked(state_timeout, resume, St) ->
     do_resume(St);
-blocked({call, From0}, {query, Request, Opts}, #{id := Id, queue := Q, query_mode := QM} = St) ->
-    From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)),
+blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
     Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
     _ = reply_caller(Id, ?REPLY(From, Request, Error)),
-    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}.
+    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}};
+blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
+    ReplayFun = maps:get(async_reply_fun, Opts, undefined),
+    Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
+    _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)),
+    {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}.
 
 terminate(_Reason, #{id := Id, index := Index}) ->
     gproc_pool:disconnect_worker(Id, {Id, Index}).
@@ -194,24 +201,25 @@ estimate_size(QItem) ->
     size(queue_item_marshaller(QItem)).
 
 %%==============================================================================
-maybe_quick_return(sync, From, _ReplyFun) ->
-    From;
-maybe_quick_return(async, From, ReplyFun) ->
-    gen_statem:reply(From, ok),
-    ReplyFun.
-
-pick_call(Id, Key, Query, Timeout) ->
-    try gproc_pool:pick_worker(Id, Key) of
+-define(PICK(ID, KEY, EXPR),
+    try gproc_pool:pick_worker(ID, KEY) of
         Pid when is_pid(Pid) ->
-            gen_statem:call(Pid, Query, {clean_timeout, Timeout});
+            EXPR;
         _ ->
-            ?RESOURCE_ERROR(not_created, "resource not found")
+            ?RESOURCE_ERROR(not_created, "resource not created")
     catch
         error:badarg ->
-            ?RESOURCE_ERROR(not_created, "resource not found");
+            ?RESOURCE_ERROR(not_created, "resource not created");
         exit:{timeout, _} ->
             ?RESOURCE_ERROR(timeout, "call resource timeout")
-    end.
+    end
+).
+
+pick_call(Id, Key, Query, Timeout) ->
+    ?PICK(Id, Key, gen_statem:call(Pid, Query, {clean_timeout, Timeout})).
+
+pick_cast(Id, Key, Query) ->
+    ?PICK(Id, Key, gen_statem:cast(Pid, Query)).
 
 do_resume(#{queue := Q, id := Id, name := Name} = St) ->
     case inflight_get_first(Name) of
@@ -264,12 +272,12 @@ query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left
         true -> flush(St);
         false -> {keep_state, ensure_flush_timer(St)}
     end;
-query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query_mode := QM} = St) ->
+query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) ->
     QueryOpts = #{
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)
     },
-    case send_query(QM, From, Request, Id, QueryOpts) of
+    case send_query(From, Request, Id, QueryOpts) of
         true ->
             Query = ?QUERY(From, Request),
             {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
@@ -277,8 +285,8 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query
             {keep_state, St}
     end.
 
-send_query(QM, From, Request, Id, QueryOpts) ->
-    Result = call_query(QM, Id, ?QUERY(From, Request), QueryOpts),
+send_query(From, Request, Id, QueryOpts) ->
+    Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts),
     reply_caller(Id, ?REPLY(From, Request, Result)).
 
 flush(#{acc := []} = St) ->
@@ -288,15 +296,14 @@ flush(
         id := Id,
         acc := Batch,
         batch_size := Size,
-        queue := Q0,
-        query_mode := QM
+        queue := Q0
     } = St
 ) ->
     QueryOpts = #{
         inflight_name => maps:get(name, St),
         inflight_window => maps:get(async_inflight_window, St)
     },
-    Result = call_query(QM, Id, Batch, QueryOpts),
+    Result = call_query(configured, Id, Batch, QueryOpts),
     St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
     case batch_reply_caller(Id, Result, Batch) of
         true ->
@@ -362,9 +369,15 @@ handle_query_result(Id, Result, BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, success),
     BlockWorker.
 
-call_query(QM, Id, Query, QueryOpts) ->
+call_query(QM0, Id, Query, QueryOpts) ->
     case emqx_resource_manager:ets_lookup(Id) of
-        {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} ->
+        {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} ->
+            QM =
+                case QM0 of
+                    configured -> maps:get(query_mode, Data);
+                    _ -> QM0
+                end,
+            CM = maps:get(callback_mode, Data),
             apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts);
         {ok, _Group, #{status := stopped}} ->
             ?RESOURCE_ERROR(stopped, "resource stopped or disabled");

+ 3 - 2
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -133,7 +133,7 @@ t_create_remove_local(_) ->
     {error, _} = emqx_resource:remove_local(?ID),
 
     ?assertMatch(
-        ?RESOURCE_ERROR(not_created),
+        ?RESOURCE_ERROR(not_found),
         emqx_resource:query(?ID, get_state)
     ),
     ?assertNot(is_process_alive(Pid)).
@@ -183,7 +183,7 @@ t_query(_) ->
     {ok, #{pid := _}} = emqx_resource:query(?ID, get_state),
 
     ?assertMatch(
-        ?RESOURCE_ERROR(not_created),
+        ?RESOURCE_ERROR(not_found),
         emqx_resource:query(<<"unknown">>, get_state)
     ),
 
@@ -371,6 +371,7 @@ t_query_counter_async_inflight(_) ->
     ok = emqx_resource:query(?ID, {inc_counter, 1}, #{
         async_reply_fun => {Insert, [Tab0, tmp_query]}
     }),
+    timer:sleep(100),
     ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)),
 
     %% all response should be received after the resource is resumed.