Преглед на файлове

fix: use ReplyTo in QUERY for async

Stefan Strigler преди 2 години
родител
ревизия
321fd53132

+ 2 - 1
apps/emqx_resource/include/emqx_resource.hrl

@@ -32,7 +32,8 @@
     expire_at => infinity | integer(),
     expire_at => infinity | integer(),
     async_reply_fun => reply_fun(),
     async_reply_fun => reply_fun(),
     simple_query => boolean(),
     simple_query => boolean(),
-    is_buffer_supported => boolean()
+    is_buffer_supported => boolean(),
+    reply_to => reply_fun()
 }.
 }.
 -type resource_data() :: #{
 -type resource_data() :: #{
     id := resource_id(),
     id := resource_id(),

+ 19 - 8
apps/emqx_resource/src/emqx_resource_buffer_worker.erl

@@ -62,7 +62,7 @@
 -define(COLLECT_REQ_LIMIT, 1000).
 -define(COLLECT_REQ_LIMIT, 1000).
 -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
 -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
 -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
 -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
--define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
+-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
 -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
 -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
 -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
 -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
     {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
     {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
@@ -149,9 +149,10 @@ simple_sync_query(Id, Request, QueryOpts0) ->
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
     Ref = make_request_ref(),
-    Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
+    ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
+    Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
-    maybe_reply_to(Result, QueryOpts).
+    Result.
 
 
 %% simple async-query the resource without batching and queuing.
 %% simple async-query the resource without batching and queuing.
 -spec simple_async_query(id(), request(), query_opts()) -> term().
 -spec simple_async_query(id(), request(), query_opts()) -> term().
@@ -161,9 +162,12 @@ simple_async_query(Id, Request, QueryOpts0) ->
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
     emqx_resource_metrics:matched_inc(Id),
     emqx_resource_metrics:matched_inc(Id),
     Ref = make_request_ref(),
     Ref = make_request_ref(),
-    Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
+    ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
+    Result = call_query(
+        async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
+    ),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
     _ = handle_query_result(Id, Result, _HasBeenSent = false),
-    maybe_reply_to(Result, QueryOpts).
+    Result.
 
 
 simple_query_opts() ->
 simple_query_opts() ->
     ensure_expire_at(#{simple_query => true, timeout => infinity}).
     ensure_expire_at(#{simple_query => true, timeout => infinity}).
@@ -1056,9 +1060,14 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
     end
     end
 ).
 ).
 
 
-apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
+apply_query_fun(
+    sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts
+) ->
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
     ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
-    ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
+    maybe_reply_to(
+        ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
+        QueryOpts
+    );
 apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
 apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
     ?tp(call_query_async, #{
     ?tp(call_query_async, #{
         id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
         id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
@@ -1086,7 +1095,9 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
         end,
         end,
         Request
         Request
     );
     );
-apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
+apply_query_fun(
+    sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts
+) ->
     ?tp(call_batch_query, #{
     ?tp(call_batch_query, #{
         id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
         id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
     }),
     }),

+ 5 - 1
apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl

@@ -200,7 +200,11 @@ init_per_group(metrics_fail_simple, Config) ->
         (_) -> simple_async
         (_) -> simple_async
     end),
     end),
     meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
     meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
-    meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}),
+    meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) ->
+        Result = {error, {unrecoverable_error, mecked_failure}},
+        erlang:apply(ReplyFun, Args ++ [Result]),
+        Result
+    end),
     [{mecked, [?BRIDGE_IMPL]} | Config];
     [{mecked, [?BRIDGE_IMPL]} | Config];
 init_per_group(_Groupname, Config) ->
 init_per_group(_Groupname, Config) ->
     Config.
     Config.