|
|
@@ -128,7 +128,7 @@ on_query(InstId, {Channel, Message}, State) ->
|
|
|
greptimedb_connector_send_query,
|
|
|
#{points => Points, batch => false, mode => sync}
|
|
|
),
|
|
|
- do_query(InstId, Client, Points);
|
|
|
+ do_query(InstId, Channel, Client, Points);
|
|
|
{error, ErrorPoints} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query_error,
|
|
|
@@ -152,7 +152,7 @@ on_batch_query(InstId, [{Channel, _} | _] = BatchData, State) ->
|
|
|
greptimedb_connector_send_query,
|
|
|
#{points => Points, batch => true, mode => sync}
|
|
|
),
|
|
|
- do_query(InstId, Client, Points);
|
|
|
+ do_query(InstId, Channel, Client, Points);
|
|
|
{error, Reason} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query_error,
|
|
|
@@ -173,7 +173,7 @@ on_query_async(InstId, {Channel, Message}, {ReplyFun, Args}, State) ->
|
|
|
greptimedb_connector_send_query,
|
|
|
#{points => Points, batch => false, mode => async}
|
|
|
),
|
|
|
- do_async_query(InstId, Client, Points, {ReplyFun, Args});
|
|
|
+ do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
|
|
|
{error, ErrorPoints} = Err ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query_error,
|
|
|
@@ -195,7 +195,7 @@ on_batch_query_async(InstId, [{Channel, _} | _] = BatchData, {ReplyFun, Args}, S
|
|
|
greptimedb_connector_send_query,
|
|
|
#{points => Points, batch => true, mode => async}
|
|
|
),
|
|
|
- do_async_query(InstId, Client, Points, {ReplyFun, Args});
|
|
|
+ do_async_query(InstId, Channel, Client, Points, {ReplyFun, Args});
|
|
|
{error, Reason} ->
|
|
|
?tp(
|
|
|
greptimedb_connector_send_query_error,
|
|
|
@@ -420,7 +420,8 @@ is_auth_key(_) ->
|
|
|
|
|
|
%% -------------------------------------------------------------------------------------------------
|
|
|
%% Query
|
|
|
-do_query(InstId, Client, Points) ->
|
|
|
+do_query(InstId, Channel, Client, Points) ->
|
|
|
+ emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}),
|
|
|
case greptimedb:write_batch(Client, Points) of
|
|
|
{ok, #{response := {affected_rows, #{value := Rows}}}} ->
|
|
|
?SLOG(debug, #{
|
|
|
@@ -452,12 +453,13 @@ do_query(InstId, Client, Points) ->
|
|
|
end
|
|
|
end.
|
|
|
|
|
|
-do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
|
|
|
+do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
|
|
|
?SLOG(info, #{
|
|
|
msg => "greptimedb_write_point_async",
|
|
|
connector => InstId,
|
|
|
points => Points
|
|
|
}),
|
|
|
+ emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}),
|
|
|
WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
|
|
|
ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
|
|
|
|