Quellcode durchsuchen

fix: add test cases for query async

Shawn vor 3 Jahren
Ursprung
Commit
82550a585a

+ 6 - 0
apps/emqx_resource/include/emqx_resource.hrl

@@ -23,6 +23,12 @@
 -type resource_state() :: term().
 -type resource_status() :: connected | disconnected | connecting | stopped.
 -type callback_mode() :: always_sync | async_if_possible.
+-type result() :: term().
+-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
+-type query_opts() :: #{
+    %% The key used for picking a resource worker
+    pick_key => term()
+}.
 -type resource_data() :: #{
     id := resource_id(),
     mod := module(),

+ 12 - 1
apps/emqx_resource/src/emqx_resource.erl

@@ -75,7 +75,10 @@
     %% stop the instance
     stop/1,
     %% query the instance
-    query/2
+    query/2,
+    %% query the instance without batching and queuing messages.
+    simple_sync_query/2,
+    simple_async_query/3
 ]).
 
 %% Direct calls to the callback module
@@ -232,6 +235,14 @@ query(ResId, Request) ->
 query(ResId, Request, Opts) ->
     emqx_resource_worker:query(ResId, Request, Opts).
 
+-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
+simple_sync_query(ResId, Request) ->
+    emqx_resource_worker:simple_sync_query(ResId, Request).
+
+-spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term().
+simple_async_query(ResId, Request, ReplyFun) ->
+    emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun).
+
 -spec start(resource_id()) -> ok | {error, Reason :: term()}.
 start(ResId) ->
     start(ResId, #{}).

+ 34 - 14
apps/emqx_resource/src/emqx_resource_worker.erl

@@ -34,6 +34,11 @@
     resume/1
 ]).
 
+-export([
+    simple_sync_query/2,
+    simple_async_query/3
+]).
+
 -export([
     callback_mode/0,
     init/1,
@@ -68,13 +73,7 @@
 -type id() :: binary().
 -type query() :: {query, from(), request()}.
 -type request() :: term().
--type result() :: term().
--type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
 -type from() :: pid() | reply_fun().
--type query_opts() :: #{
-    %% The key used for picking a resource worker
-    pick_key => term()
-}.
 
 -export_type([query_opts/0]).
 
@@ -92,6 +91,19 @@ query(Id, Request, Opts) ->
     Timeout = maps:get(timeout, Opts, infinity),
     pick_call(Id, PickKey, {query, Request}, Timeout).
 
+%% simple query the resource without batching and queuing messages.
+-spec simple_sync_query(id(), request()) -> Result :: term().
+simple_sync_query(Id, Request) ->
+    Result = call_query(sync, Id, ?QUERY(self(), Request), 1),
+    _ = handle_query_result(Id, Result, false),
+    Result.
+
+-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
+simple_async_query(Id, Request, ReplyFun) ->
+    Result = call_query(async, Id, ?QUERY(ReplyFun, Request), 1),
+    _ = handle_query_result(Id, Result, false),
+    Result.
+
 -spec block(pid() | atom()) -> ok.
 block(ServerRef) ->
     gen_statem:cast(ServerRef, block).
@@ -188,7 +200,7 @@ estimate_size(QItem) ->
 maybe_quick_return(sync, From, _ReplyFun) ->
     From;
 maybe_quick_return(async, From, ReplyFun) ->
-    ok = gen_statem:reply(From),
+    gen_statem:reply(From, ok),
     ReplyFun.
 
 pick_call(Id, Key, Query, Timeout) ->
@@ -295,7 +307,11 @@ reply_caller(Id, Reply) ->
 reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) ->
     handle_query_result(Id, Result, BlockWorker);
 reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
-    ?SAFE_CALL(ReplyFun(Result, Args)),
+    _ =
+        case Result of
+            {async_return, _} -> ok;
+            _ -> apply(ReplyFun, Args ++ [Result])
+        end,
     handle_query_result(Id, Result, BlockWorker);
 reply_caller(Id, ?REPLY(From, _, Result), BlockWorker) ->
     gen_statem:reply(From, Result),
@@ -316,6 +332,10 @@ handle_query_result(Id, {error, _}, BlockWorker) ->
 handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
     emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
     true;
+handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) ->
+    true;
+handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
+    BlockWorker;
 handle_query_result(Id, Result, BlockWorker) ->
     %% assert
     true = is_ok_result(Result),
@@ -352,16 +372,16 @@ call_query(QM, Id, Query, QueryLen) ->
     end
 ).
 
-apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request) = _Query, ResSt) ->
+apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}),
     ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
-apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) ->
+apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt) ->
     ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}),
     ReplyFun = fun ?MODULE:reply_after_query/4,
     ?APPLY_RESOURCE(
         begin
-            _ = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt),
-            ok_async
+            Result = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt),
+            {async_return, Result}
         end,
         Request
     );
@@ -375,8 +395,8 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
     ReplyFun = fun ?MODULE:batch_reply_after_query/4,
     ?APPLY_RESOURCE(
         begin
-            _ = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt),
-            ok_async
+            Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt),
+            {async_return, Result}
         end,
         Batch
     ).

+ 15 - 2
apps/emqx_resource/test/emqx_connector_demo.erl

@@ -26,11 +26,12 @@
     on_start/2,
     on_stop/2,
     on_query/3,
+    on_query_async/4,
     on_batch_query/3,
     on_get_status/2
 ]).
 
--export([counter_loop/1]).
+-export([counter_loop/1, set_callback_mode/1]).
 
 %% callbacks for emqx_resource config schema
 -export([roots/0]).
@@ -50,7 +51,12 @@ register(required) -> true;
 register(default) -> false;
 register(_) -> undefined.
 
-callback_mode() -> always_sync.
+-define(CM_KEY, {?MODULE, callback_mode}).
+callback_mode() ->
+    persistent_term:get(?CM_KEY).
+
+set_callback_mode(Mode) ->
+    persistent_term:put(?CM_KEY, Mode).
 
 on_start(_InstId, #{create_error := true}) ->
     error("some error");
@@ -91,6 +97,10 @@ on_query(_InstId, get_counter, #{pid := Pid}) ->
         {error, timeout}
     end.
 
+on_query_async(_InstId, Query, ReplyFun, State) ->
+    Result = on_query(_InstId, Query, State),
+    apply_reply(ReplyFun, Result).
+
 on_batch_query(InstId, BatchReq, State) ->
     %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed.
     case hd(BatchReq) of
@@ -147,3 +157,6 @@ maybe_register(Name, Pid, true) ->
     erlang:register(Name, Pid);
 maybe_register(_Name, _Pid, false) ->
     true.
+
+apply_reply({ReplyFun, Args}, Result) when is_function(ReplyFun) ->
+    apply(ReplyFun, Args ++ [Result]).

+ 104 - 18
apps/emqx_resource/test/emqx_resource_SUITE.erl

@@ -28,6 +28,7 @@
 -define(ID, <<"id">>).
 -define(DEFAULT_RESOURCE_GROUP, <<"default">>).
 -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}).
+-define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -36,6 +37,7 @@ groups() ->
     [].
 
 init_per_testcase(_, Config) ->
+    emqx_connector_demo:set_callback_mode(always_sync),
     Config.
 end_per_testcase(_, _Config) ->
     _ = emqx_resource:remove(?ID).
@@ -213,7 +215,7 @@ t_batch_query_counter(_) ->
     ),
 
     ?check_trace(
-        #{timetrap => 10000, timeout => 1000},
+        ?TRACE_OPTS,
         emqx_resource:query(?ID, get_counter),
         fun(Result, Trace) ->
             ?assertMatch({ok, 0}, Result),
@@ -223,7 +225,7 @@ t_batch_query_counter(_) ->
     ),
 
     ?check_trace(
-        #{timetrap => 10000, timeout => 1000},
+        ?TRACE_OPTS,
         inc_counter_in_parallel(1000),
         fun(Trace) ->
             QueryTrace = ?of_kind(call_batch_query, Trace),
@@ -234,23 +236,90 @@ t_batch_query_counter(_) ->
 
     ok = emqx_resource:remove_local(?ID).
 
-inc_counter_in_parallel(N) ->
-    Parent = self(),
-    Pids = [
-        erlang:spawn(fun() ->
-            ok = emqx_resource:query(?ID, {inc_counter, 1}),
-            Parent ! {complete, self()}
-        end)
-     || _ <- lists:seq(1, N)
-    ],
-    [
-        receive
-            {complete, Pid} -> ok
-        after 1000 ->
-            ct:fail({wait_for_query_timeout, Pid})
+t_query_counter_async(_) ->
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, register => true},
+        #{query_mode => async}
+    ),
+    ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(1000),
+        fun(Trace) ->
+            %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
+            QueryTrace = ?of_kind(call_query, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
         end
-     || Pid <- Pids
-    ].
+    ),
+    %% wait for 1s to make sure all the aysnc query is sent to the resource.
+    timer:sleep(1000),
+    %% simple query ignores the query_mode and batching settings in the resource_worker
+    ?check_trace(
+        ?TRACE_OPTS,
+        emqx_resource:simple_sync_query(?ID, get_counter),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, 1000}, Result),
+            %% the callback_mode if 'emqx_connector_demo' is 'always_sync'.
+            QueryTrace = ?of_kind(call_query, Trace),
+            ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace)
+        end
+    ),
+    {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
+    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ok = emqx_resource:remove_local(?ID).
+
+t_query_counter_async_2(_) ->
+    emqx_connector_demo:set_callback_mode(async_if_possible),
+
+    Tab0 = ets:new(?FUNCTION_NAME, [bag, public]),
+    Insert = fun(Tab, Result) ->
+        ets:insert(Tab, {make_ref(), Result})
+    end,
+    {ok, _} = emqx_resource:create_local(
+        ?ID,
+        ?DEFAULT_RESOURCE_GROUP,
+        ?TEST_RESOURCE,
+        #{name => test_resource, register => true},
+        #{query_mode => async, async_reply_fun => {Insert, [Tab0]}}
+    ),
+    ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)),
+    ?check_trace(
+        ?TRACE_OPTS,
+        inc_counter_in_parallel(1000),
+        fun(Trace) ->
+            QueryTrace = ?of_kind(call_query_async, Trace),
+            ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace)
+        end
+    ),
+
+    %% wait for 1s to make sure all the aysnc query is sent to the resource.
+    timer:sleep(1000),
+    %% simple query ignores the query_mode and batching settings in the resource_worker
+    ?check_trace(
+        ?TRACE_OPTS,
+        emqx_resource:simple_sync_query(?ID, get_counter),
+        fun(Result, Trace) ->
+            ?assertMatch({ok, 1000}, Result),
+            QueryTrace = ?of_kind(call_query, Trace),
+            ?assertMatch([#{query := {query, _, get_counter}}], QueryTrace)
+        end
+    ),
+    {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
+    ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C),
+    ?assertMatch(1000, ets:info(Tab0, size)),
+    ?assert(
+        lists:all(
+            fun
+                ({_, ok}) -> true;
+                (_) -> false
+            end,
+            ets:tab2list(Tab0)
+        )
+    ),
+    ok = emqx_resource:remove_local(?ID).
 
 t_healthy_timeout(_) ->
     {ok, _} = emqx_resource:create_local(
@@ -480,6 +549,23 @@ t_auto_retry(_) ->
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------
+inc_counter_in_parallel(N) ->
+    Parent = self(),
+    Pids = [
+        erlang:spawn(fun() ->
+            emqx_resource:query(?ID, {inc_counter, 1}),
+            Parent ! {complete, self()}
+        end)
+     || _ <- lists:seq(1, N)
+    ],
+    [
+        receive
+            {complete, Pid} -> ok
+        after 1000 ->
+            ct:fail({wait_for_query_timeout, Pid})
+        end
+     || Pid <- Pids
+    ].
 
 bin_config() ->
     <<"\"name\": \"test_resource\"">>.