|
@@ -315,7 +315,7 @@ on_query(InstId, {send_message, Msg}, State) ->
|
|
|
ClientId = maps:get(clientid, Msg, undefined),
|
|
ClientId = maps:get(clientid, Msg, undefined),
|
|
|
on_query(
|
|
on_query(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
|
|
|
|
|
|
|
+ {undefined, ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
|
|
|
State
|
|
State
|
|
|
)
|
|
)
|
|
|
end;
|
|
end;
|
|
@@ -345,19 +345,19 @@ on_query(
|
|
|
ClientId = clientid(Msg),
|
|
ClientId = clientid(Msg),
|
|
|
on_query(
|
|
on_query(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
|
|
|
|
|
|
|
+ {ActionId, ClientId, Method, {Path, Headers, Body}, Timeout, Retry},
|
|
|
State
|
|
State
|
|
|
)
|
|
)
|
|
|
end;
|
|
end;
|
|
|
on_query(InstId, {Method, Request}, State) ->
|
|
on_query(InstId, {Method, Request}, State) ->
|
|
|
%% TODO: Get retry from State
|
|
%% TODO: Get retry from State
|
|
|
- on_query(InstId, {undefined, Method, Request, 5000, _Retry = 2}, State);
|
|
|
|
|
|
|
+ on_query(InstId, {undefined, undefined, Method, Request, 5000, _Retry = 2}, State);
|
|
|
on_query(InstId, {Method, Request, Timeout}, State) ->
|
|
on_query(InstId, {Method, Request, Timeout}, State) ->
|
|
|
%% TODO: Get retry from State
|
|
%% TODO: Get retry from State
|
|
|
- on_query(InstId, {undefined, Method, Request, Timeout, _Retry = 2}, State);
|
|
|
|
|
|
|
+ on_query(InstId, {undefined, undefined, Method, Request, Timeout, _Retry = 2}, State);
|
|
|
on_query(
|
|
on_query(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {KeyOrNum, Method, Request, Timeout, Retry},
|
|
|
|
|
|
|
+ {ActionId, KeyOrNum, Method, Request, Timeout, Retry},
|
|
|
#{base_path := BasePath} = State
|
|
#{base_path := BasePath} = State
|
|
|
) ->
|
|
) ->
|
|
|
?TRACE(
|
|
?TRACE(
|
|
@@ -367,11 +367,12 @@ on_query(
|
|
|
request => redact_request(Request),
|
|
request => redact_request(Request),
|
|
|
note => ?READACT_REQUEST_NOTE,
|
|
note => ?READACT_REQUEST_NOTE,
|
|
|
connector => InstId,
|
|
connector => InstId,
|
|
|
|
|
+ action_id => ActionId,
|
|
|
state => redact(State)
|
|
state => redact(State)
|
|
|
}
|
|
}
|
|
|
),
|
|
),
|
|
|
NRequest = formalize_request(Method, BasePath, Request),
|
|
NRequest = formalize_request(Method, BasePath, Request),
|
|
|
- trace_rendered_action_template(InstId, Method, NRequest, Timeout),
|
|
|
|
|
|
|
+ trace_rendered_action_template(ActionId, Method, NRequest, Timeout),
|
|
|
Worker = resolve_pool_worker(State, KeyOrNum),
|
|
Worker = resolve_pool_worker(State, KeyOrNum),
|
|
|
Result0 = ehttpc:request(
|
|
Result0 = ehttpc:request(
|
|
|
Worker,
|
|
Worker,
|
|
@@ -428,7 +429,7 @@ on_query_async(InstId, {send_message, Msg}, ReplyFunAndArgs, State) ->
|
|
|
ClientId = maps:get(clientid, Msg, undefined),
|
|
ClientId = maps:get(clientid, Msg, undefined),
|
|
|
on_query_async(
|
|
on_query_async(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {ClientId, Method, {Path, Headers, Body}, Timeout},
|
|
|
|
|
|
|
+ {undefined, ClientId, Method, {Path, Headers, Body}, Timeout},
|
|
|
ReplyFunAndArgs,
|
|
ReplyFunAndArgs,
|
|
|
State
|
|
State
|
|
|
)
|
|
)
|
|
@@ -458,14 +459,14 @@ on_query_async(
|
|
|
ClientId = clientid(Msg),
|
|
ClientId = clientid(Msg),
|
|
|
on_query_async(
|
|
on_query_async(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {ClientId, Method, {Path, Headers, Body}, Timeout},
|
|
|
|
|
|
|
+ {ActionId, ClientId, Method, {Path, Headers, Body}, Timeout},
|
|
|
ReplyFunAndArgs,
|
|
ReplyFunAndArgs,
|
|
|
State
|
|
State
|
|
|
)
|
|
)
|
|
|
end;
|
|
end;
|
|
|
on_query_async(
|
|
on_query_async(
|
|
|
InstId,
|
|
InstId,
|
|
|
- {KeyOrNum, Method, Request, Timeout},
|
|
|
|
|
|
|
+ {ActionId, KeyOrNum, Method, Request, Timeout},
|
|
|
ReplyFunAndArgs,
|
|
ReplyFunAndArgs,
|
|
|
#{base_path := BasePath} = State
|
|
#{base_path := BasePath} = State
|
|
|
) ->
|
|
) ->
|
|
@@ -481,7 +482,7 @@ on_query_async(
|
|
|
}
|
|
}
|
|
|
),
|
|
),
|
|
|
NRequest = formalize_request(Method, BasePath, Request),
|
|
NRequest = formalize_request(Method, BasePath, Request),
|
|
|
- trace_rendered_action_template(InstId, Method, NRequest, Timeout),
|
|
|
|
|
|
|
+ trace_rendered_action_template(ActionId, Method, NRequest, Timeout),
|
|
|
MaxAttempts = maps:get(max_attempts, State, 3),
|
|
MaxAttempts = maps:get(max_attempts, State, 3),
|
|
|
Context = #{
|
|
Context = #{
|
|
|
attempt => 1,
|
|
attempt => 1,
|
|
@@ -501,11 +502,11 @@ on_query_async(
|
|
|
),
|
|
),
|
|
|
{ok, Worker}.
|
|
{ok, Worker}.
|
|
|
|
|
|
|
|
-trace_rendered_action_template(InstId, Method, NRequest, Timeout) ->
|
|
|
|
|
|
|
+trace_rendered_action_template(ActionId, Method, NRequest, Timeout) ->
|
|
|
case NRequest of
|
|
case NRequest of
|
|
|
{Path, Headers} ->
|
|
{Path, Headers} ->
|
|
|
emqx_trace:rendered_action_template(
|
|
emqx_trace:rendered_action_template(
|
|
|
- InstId,
|
|
|
|
|
|
|
+ ActionId,
|
|
|
#{
|
|
#{
|
|
|
path => Path,
|
|
path => Path,
|
|
|
method => Method,
|
|
method => Method,
|
|
@@ -515,7 +516,7 @@ trace_rendered_action_template(InstId, Method, NRequest, Timeout) ->
|
|
|
);
|
|
);
|
|
|
{Path, Headers, Body} ->
|
|
{Path, Headers, Body} ->
|
|
|
emqx_trace:rendered_action_template(
|
|
emqx_trace:rendered_action_template(
|
|
|
- InstId,
|
|
|
|
|
|
|
+ ActionId,
|
|
|
#{
|
|
#{
|
|
|
path => Path,
|
|
path => Path,
|
|
|
method => Method,
|
|
method => Method,
|