|
|
@@ -9,6 +9,7 @@
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
+-include_lib("emqx/include/logger.hrl").
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
|
%% CT boilerplate
|
|
|
@@ -284,7 +285,8 @@ send_message(Config, Payload) ->
|
|
|
Name = ?config(greptimedb_name, Config),
|
|
|
Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
|
|
|
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
|
|
- emqx_bridge:send_message(BridgeId, Payload).
|
|
|
+ Resp = emqx_bridge:send_message(BridgeId, Payload),
|
|
|
+ Resp.
|
|
|
|
|
|
query_by_clientid(Topic, ClientId, Config) ->
|
|
|
GreptimedbHost = ?config(greptimedb_host, Config),
|
|
|
@@ -308,7 +310,7 @@ query_by_clientid(Topic, ClientId, Config) ->
|
|
|
{"Authorization", "Basic Z3JlcHRpbWVfdXNlcjpncmVwdGltZV9wd2Q="},
|
|
|
{"Content-Type", "application/x-www-form-urlencoded"}
|
|
|
],
|
|
|
- Body = <<"sql=select * from ", Topic/binary, " where clientid='", ClientId/binary, "'">>,
|
|
|
+ Body = <<"sql=select * from \"", Topic/binary, "\" where clientid='", ClientId/binary, "'">>,
|
|
|
{ok, 200, _Headers, RawBody0} =
|
|
|
ehttpc:request(
|
|
|
EHttpcPoolName,
|
|
|
@@ -317,29 +319,49 @@ query_by_clientid(Topic, ClientId, Config) ->
|
|
|
_Timeout = 10_000,
|
|
|
_Retry = 0
|
|
|
),
|
|
|
- #{
|
|
|
- <<"code">> := 0,
|
|
|
- <<"output">> := [
|
|
|
- #{
|
|
|
- <<"records">> := #{
|
|
|
- <<"rows">> := Rows,
|
|
|
- <<"schema">> := Schema
|
|
|
+
|
|
|
+ case emqx_utils_json:decode(RawBody0, [return_maps]) of
|
|
|
+ #{
|
|
|
+ <<"code">> := 0,
|
|
|
+ <<"output">> := [
|
|
|
+ #{
|
|
|
+ <<"records">> := #{
|
|
|
+ <<"rows">> := Rows,
|
|
|
+ <<"schema">> := Schema
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- ]
|
|
|
- } = emqx_utils_json:decode(RawBody0, [return_maps]),
|
|
|
-
|
|
|
- case Schema of
|
|
|
- null ->
|
|
|
- #{};
|
|
|
- #{<<"column_schemas">> := ColumnsSchemas} ->
|
|
|
- Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas),
|
|
|
- index_by_field(Rows, Columns)
|
|
|
+ ]
|
|
|
+ } ->
|
|
|
+ make_row(Schema, Rows);
|
|
|
+ #{
|
|
|
+ <<"code">> := Code,
|
|
|
+ <<"error">> := Error
|
|
|
+ } ->
|
|
|
+ GreptimedbName = ?config(greptimedb_name, Config),
|
|
|
+ Type = greptimedb_type_bin(?config(greptimedb_type, Config)),
|
|
|
+ BridgeId = emqx_bridge_resource:bridge_id(Type, GreptimedbName),
|
|
|
+
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => io_lib:format("Failed to query: ~p, ~p", [Code, Error]),
|
|
|
+ connector => BridgeId,
|
|
|
+ reason => Error
|
|
|
+ }),
|
|
|
+ %% TODO(dennis): check the error by code
|
|
|
+ case binary:match(Error, <<"Table not found">>) of
|
|
|
+ nomatch ->
|
|
|
+ {error, Error};
|
|
|
+ _ ->
|
|
|
+ %% Table not found
|
|
|
+ #{}
|
|
|
+ end
|
|
|
end.
|
|
|
|
|
|
-index_by_field([], Columns) ->
|
|
|
+make_row(null, _Rows) ->
|
|
|
+ #{};
|
|
|
+make_row(_Schema, []) ->
|
|
|
#{};
|
|
|
-index_by_field([Row], Columns) ->
|
|
|
+make_row(#{<<"column_schemas">> := ColumnsSchemas}, [Row]) ->
|
|
|
+ Columns = lists:map(fun(#{<<"name">> := Name}) -> Name end, ColumnsSchemas),
|
|
|
maps:from_list(lists:zip(Columns, Row)).
|
|
|
|
|
|
assert_persisted_data(ClientId, Expected, PersistedData) ->
|
|
|
@@ -784,26 +806,22 @@ t_write_failure(Config) ->
|
|
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
|
|
case QueryMode of
|
|
|
sync ->
|
|
|
- {_, {ok, _}} =
|
|
|
- ?wait_async_action(
|
|
|
- ?assertMatch(
|
|
|
- {error, {resource_error, #{reason := timeout}}},
|
|
|
- send_message(Config, SentData)
|
|
|
- ),
|
|
|
- #{?snk_kind := handle_async_reply, action := nack},
|
|
|
- 1_000
|
|
|
- )
|
|
|
+ ?wait_async_action(
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {resource_error, #{reason := timeout}}},
|
|
|
+ send_message(Config, SentData)
|
|
|
+ ),
|
|
|
+ #{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
|
|
|
+ 16_000
|
|
|
+ )
|
|
|
end
|
|
|
end),
|
|
|
- fun(Trace0) ->
|
|
|
+ fun(Trace) ->
|
|
|
case QueryMode of
|
|
|
sync ->
|
|
|
- Trace = ?of_kind(handle_async_reply, Trace0),
|
|
|
- ?assertMatch([_ | _], Trace),
|
|
|
- [#{result := Result} | _] = Trace,
|
|
|
- ?assert(
|
|
|
- not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
|
|
|
- #{got => Result}
|
|
|
+ ?assertMatch(
|
|
|
+ [#{error := _} | _],
|
|
|
+ ?of_kind(greptimedb_connector_do_query_failure, Trace)
|
|
|
)
|
|
|
end,
|
|
|
ok
|
|
|
@@ -841,7 +859,7 @@ t_missing_field(Config) ->
|
|
|
?match_n_events(NEvents, #{
|
|
|
?snk_kind := greptimedb_connector_send_query_error
|
|
|
}),
|
|
|
- _Timeout1 = 10_000
|
|
|
+ _Timeout1 = 16_000
|
|
|
),
|
|
|
ok
|
|
|
end,
|