Просмотр исходного кода

fix(rule tracing): format result traces in a more structured way

Kjell Winblad 1 год назад
Родитель
Сommit
ca88f5731b

+ 7 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -29,7 +29,8 @@
     on_query_async/4,
     on_query_async/4,
     on_batch_query/3,
     on_batch_query/3,
     on_batch_query_async/4,
     on_batch_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+    on_format_query_result/1
 ]).
 ]).
 
 
 %% callbacks of ecpool
 %% callbacks of ecpool
@@ -459,6 +460,11 @@ handle_result({error, Error}) ->
 handle_result(Res) ->
 handle_result(Res) ->
     Res.
     Res.
 
 
+on_format_query_result({ok, Result}) ->
+    #{result => ok, info => Result};
+on_format_query_result(Result) ->
+    Result.
+
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% utils
 %% utils
 
 

+ 9 - 1
apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl

@@ -38,7 +38,8 @@
     on_get_channels/1,
     on_get_channels/1,
     on_query/3,
     on_query/3,
     on_batch_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_format_query_result/1
 ]).
 ]).
 
 
 %% callbacks for ecpool
 %% callbacks for ecpool
@@ -519,6 +520,13 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
             to_error_tuple(ClickhouseErrorResult)
             to_error_tuple(ClickhouseErrorResult)
     end.
     end.
 
 
+on_format_query_result(ok) ->
+    #{result => ok, message => <<"">>};
+on_format_query_result({ok, Message}) ->
+    #{result => ok, message => Message};
+on_format_query_result(Result) ->
+    Result.
+
 to_recoverable_error({error, Reason}) ->
 to_recoverable_error({error, Reason}) ->
     {error, {recoverable_error, Reason}};
     {error, {recoverable_error, Reason}};
 to_recoverable_error(Error) ->
 to_recoverable_error(Error) ->

+ 7 - 1
apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl

@@ -26,7 +26,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([
 -export([
@@ -184,6 +185,11 @@ on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) ->
 on_batch_query(_InstanceId, Query, _State) ->
 on_batch_query(_InstanceId, Query, _State) ->
     {error, {unrecoverable_error, {invalid_request, Query}}}.
     {error, {unrecoverable_error, {invalid_request, Query}}}.
 
 
+on_format_query_result({ok, Result}) ->
+    #{result => ok, info => Result};
+on_format_query_result(Result) ->
+    Result.
+
 health_check_timeout() ->
 health_check_timeout() ->
     2500.
     2500.
 
 

+ 5 - 1
apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl

@@ -23,7 +23,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([
 -export([
@@ -288,6 +289,9 @@ on_query_async(
         InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State
         InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State
     ).
     ).
 
 
+on_format_query_result(Result) ->
+    emqx_bridge_http_connector:on_format_query_result(Result).
+
 on_add_channel(
 on_add_channel(
     InstanceId,
     InstanceId,
     #{channels := Channels} = State0,
     #{channels := Channels} = State0,

+ 7 - 1
apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl

@@ -53,7 +53,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([reply_delegator/2]).
 -export([reply_delegator/2]).
@@ -489,6 +490,11 @@ handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) ->
 handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) ->
 handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) ->
     Result.
     Result.
 
 
+on_format_query_result({ok, Info}) ->
+    #{result => ok, info => Info};
+on_format_query_result(Result) ->
+    Result.
+
 reply_delegator(ReplyFunAndArgs, Response) ->
 reply_delegator(ReplyFunAndArgs, Response) ->
     case Response of
     case Response of
         {error, Reason} when
         {error, Reason} when

+ 7 - 1
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl

@@ -27,7 +27,8 @@
     on_batch_query/3,
     on_batch_query/3,
     on_query_async/4,
     on_query_async/4,
     on_batch_query_async/4,
     on_batch_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+    on_format_query_result/1
 ]).
 ]).
 -export([reply_callback/2]).
 -export([reply_callback/2]).
 
 
@@ -453,6 +454,11 @@ do_query(InstId, Channel, Client, Points) ->
             end
             end
     end.
     end.
 
 
+on_format_query_result({ok, {affected_rows, Rows}}) ->
+    #{result => ok, affected_rows => Rows};
+on_format_query_result(Result) ->
+    Result.
+
 do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
 do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) ->
     ?SLOG(info, #{
     ?SLOG(info, #{
         msg => "greptimedb_write_point_async",
         msg => "greptimedb_write_point_async",

+ 5 - 1
apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl

@@ -27,7 +27,8 @@
     on_batch_query/3,
     on_batch_query/3,
     on_query_async/4,
     on_query_async/4,
     on_batch_query_async/4,
     on_batch_query_async/4,
-    on_get_status/2
+    on_get_status/2,
+    on_format_query_result/1
 ]).
 ]).
 -export([reply_callback/2]).
 -export([reply_callback/2]).
 
 
@@ -209,6 +210,9 @@ on_batch_query_async(
             {error, {unrecoverable_error, Reason}}
             {error, {unrecoverable_error, Reason}}
     end.
     end.
 
 
+on_format_query_result(Result) ->
+    emqx_bridge_http_connector:on_format_query_result(Result).
+
 on_get_status(_InstId, #{client := Client}) ->
 on_get_status(_InstId, #{client := Client}) ->
     case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
     case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
         true ->
         true ->

+ 5 - 1
apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl

@@ -26,7 +26,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([
 -export([
@@ -390,6 +391,9 @@ on_batch_query(
             Error
             Error
     end.
     end.
 
 
+on_format_query_result(Result) ->
+    emqx_bridge_http_connector:on_format_query_result(Result).
+
 on_add_channel(
 on_add_channel(
     InstanceId,
     InstanceId,
     #{iotdb_version := Version, channels := Channels} = OldState0,
     #{iotdb_version := Version, channels := Channels} = OldState0,

+ 7 - 1
apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl

@@ -39,7 +39,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([
 -export([
@@ -318,6 +319,11 @@ handle_result({error, Reason} = Error, Requests, InstanceId) ->
     }),
     }),
     Error.
     Error.
 
 
+on_format_query_result({ok, Result}) ->
+    #{result => ok, info => Result};
+on_format_query_result(Result) ->
+    Result.
+
 parse_template(Config) ->
 parse_template(Config) ->
     #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config,
     #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config,
     Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate},
     Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate},

+ 7 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -18,7 +18,8 @@
     on_get_status/2,
     on_get_status/2,
     on_query/3,
     on_query/3,
     on_start/2,
     on_start/2,
-    on_stop/2
+    on_stop/2,
+    on_format_query_result/1
 ]).
 ]).
 
 
 %%========================================================================================
 %%========================================================================================
@@ -85,6 +86,11 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat
 on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
 on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
     emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
     emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
 
 
+on_format_query_result({{Result, Info}, Documents}) ->
+    #{result => Result, info => Info, documents => Documents};
+on_format_query_result(Result) ->
+    Result.
+
 on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
 on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
     NewState = State#{channels => maps:remove(ChannelId, Channels)},
     NewState = State#{channels => maps:remove(ChannelId, Channels)},
     {ok, NewState}.
     {ok, NewState}.

+ 7 - 1
apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl

@@ -27,7 +27,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([connector_examples/1]).
 -export([connector_examples/1]).
@@ -175,6 +176,11 @@ on_batch_query(
             Error
             Error
     end.
     end.
 
 
+on_format_query_result({ok, StatusCode, BodyMap}) ->
+    #{result => ok, status_code => StatusCode, body => BodyMap};
+on_format_query_result(Result) ->
+    Result.
+
 on_get_status(_InstanceId, #{server := Server}) ->
 on_get_status(_InstanceId, #{server := Server}) ->
     Result =
     Result =
         case opentsdb_connectivity(Server) of
         case opentsdb_connectivity(Server) of

+ 7 - 1
apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl

@@ -20,7 +20,8 @@
     on_get_status/2,
     on_get_status/2,
     on_get_channel_status/3,
     on_get_channel_status/3,
     on_query/3,
     on_query/3,
-    on_query_async/4
+    on_query_async/4,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -type pulsar_client_id() :: atom().
 -type pulsar_client_id() :: atom().
@@ -234,6 +235,11 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
     }),
     }),
     pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
     pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
 
 
+on_format_query_result({ok, Info}) ->
+    #{result => ok, info => Info};
+on_format_query_result(Result) ->
+    Result.
+
 %%-------------------------------------------------------------------------------------
 %%-------------------------------------------------------------------------------------
 %% Internal fns
 %% Internal fns
 %%-------------------------------------------------------------------------------------
 %%-------------------------------------------------------------------------------------

+ 7 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -39,7 +39,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 %% callbacks for ecpool
 %% callbacks for ecpool
@@ -320,6 +321,11 @@ on_batch_query(ResourceId, BatchRequests, State) ->
     ),
     ),
     do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
     do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State).
 
 
+on_format_query_result({ok, Rows}) ->
+    #{result => ok, rows => Rows};
+on_format_query_result(Result) ->
+    Result.
+
 on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
 on_get_status(_InstanceId, #{pool_name := PoolName} = _State) ->
     Health = emqx_resource_pool:health_check_workers(
     Health = emqx_resource_pool:health_check_workers(
         PoolName,
         PoolName,

+ 7 - 1
apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl

@@ -28,7 +28,8 @@
     on_add_channel/4,
     on_add_channel/4,
     on_remove_channel/3,
     on_remove_channel/3,
     on_get_channels/1,
     on_get_channels/1,
-    on_get_channel_status/3
+    on_get_channel_status/3,
+    on_format_query_result/1
 ]).
 ]).
 
 
 -export([connector_examples/1]).
 -export([connector_examples/1]).
@@ -215,6 +216,11 @@ on_batch_query(InstanceId, BatchReq, State) ->
     ?SLOG(error, LogMeta#{msg => "invalid_request"}),
     ?SLOG(error, LogMeta#{msg => "invalid_request"}),
     {error, {unrecoverable_error, invalid_request}}.
     {error, {unrecoverable_error, invalid_request}}.
 
 
+on_format_query_result({ok, ResultMap}) ->
+    #{result => ok, info => ResultMap};
+on_format_query_result(Result) ->
+    Result.
+
 on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
 on_get_status(_InstanceId, #{pool_name := PoolName} = State) ->
     case
     case
         emqx_resource_pool:health_check_workers(
         emqx_resource_pool:health_check_workers(

+ 9 - 1
apps/emqx_mysql/src/emqx_mysql.erl

@@ -30,7 +30,8 @@
     on_stop/2,
     on_stop/2,
     on_query/3,
     on_query/3,
     on_batch_query/3,
     on_batch_query/3,
-    on_get_status/2
+    on_get_status/2,
+    on_format_query_result/1
 ]).
 ]).
 
 
 %% ecpool connect & reconnect
 %% ecpool connect & reconnect
@@ -214,6 +215,13 @@ on_batch_query(
     }),
     }),
     {error, {unrecoverable_error, invalid_request}}.
     {error, {unrecoverable_error, invalid_request}}.
 
 
+on_format_query_result({ok, ColumnNames, Rows}) ->
+    #{result => ok, column_names => ColumnNames, rows => Rows};
+on_format_query_result({ok, DataList}) ->
+    #{result => ok, column_names_rows_list => DataList};
+on_format_query_result(Result) ->
+    Result.
+
 mysql_function(sql) ->
 mysql_function(sql) ->
     query;
     query;
 mysql_function(prepared_query) ->
 mysql_function(prepared_query) ->