|
|
@@ -50,7 +50,9 @@ groups() ->
|
|
|
NonBatchCases = [
|
|
|
t_write_timeout,
|
|
|
t_uninitialized_prepared_statement,
|
|
|
- t_non_batch_update_is_allowed
|
|
|
+ t_non_batch_update_is_allowed,
|
|
|
+ t_delete_with_undefined_field_in_sql,
|
|
|
+ t_undefined_field_in_sql
|
|
|
],
|
|
|
OnlyBatchCases = [
|
|
|
t_batch_update_is_forbidden
|
|
|
@@ -801,27 +803,13 @@ t_missing_table(Config) ->
|
|
|
sync ->
|
|
|
query_resource(Config, Request);
|
|
|
async ->
|
|
|
- {_, Ref} = query_resource_async(Config, Request),
|
|
|
- {ok, Res} = receive_result(Ref, 2_000),
|
|
|
+ {Res, _Ref} = query_resource_async(Config, Request),
|
|
|
Res
|
|
|
end,
|
|
|
-
|
|
|
- BatchSize = ?config(batch_size, Config),
|
|
|
- IsBatch = BatchSize > 1,
|
|
|
- case IsBatch of
|
|
|
- true ->
|
|
|
- ?assertMatch(
|
|
|
- {error,
|
|
|
- {unrecoverable_error,
|
|
|
- {1146, <<"42S02">>, <<"Table 'mqtt.mqtt_test' doesn't exist">>}}},
|
|
|
- Result
|
|
|
- );
|
|
|
- false ->
|
|
|
- ?assertMatch(
|
|
|
- {error, undefined_table},
|
|
|
- Result
|
|
|
- )
|
|
|
- end,
|
|
|
+ ?assertMatch(
|
|
|
+ {error, {resource_error, #{reason := unhealthy_target}}},
|
|
|
+ Result
|
|
|
+ ),
|
|
|
ok
|
|
|
end,
|
|
|
fun(Trace) ->
|
|
|
@@ -974,3 +962,58 @@ t_non_batch_update_is_allowed(Config) ->
|
|
|
[]
|
|
|
),
|
|
|
ok.
|
|
|
+
|
|
|
+t_undefined_field_in_sql(Config) ->
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ Overrides = #{
|
|
|
+ <<"sql">> =>
|
|
|
+ <<
|
|
|
+ "INSERT INTO mqtt_test(wrong_column, arrived) "
|
|
|
+ "VALUES (${payload}, FROM_UNIXTIME(${timestamp}/1000))"
|
|
|
+ >>
|
|
|
+ },
|
|
|
+ ProbeRes = emqx_bridge_testlib:probe_bridge_api(Config, Overrides),
|
|
|
+ ?assertMatch({error, {{_, 400, _}, _, _BodyRaw}}, ProbeRes),
|
|
|
+ {error, {{_, 400, _}, _, BodyRaw}} = ProbeRes,
|
|
|
+ ?assertEqual(
|
|
|
+ match,
|
|
|
+ re:run(
|
|
|
+ BodyRaw,
|
|
|
+ <<"Unknown column 'wrong_column' in 'field list'">>,
|
|
|
+ [{capture, none}]
|
|
|
+ ),
|
|
|
+ #{body => BodyRaw}
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_delete_with_undefined_field_in_sql(Config) ->
|
|
|
+ ?check_trace(
|
|
|
+ begin
|
|
|
+ Name = ?config(bridge_name, Config),
|
|
|
+ Type = ?config(bridge_type, Config),
|
|
|
+ Overrides = #{
|
|
|
+ <<"sql">> =>
|
|
|
+ <<
|
|
|
+ "INSERT INTO mqtt_test(wrong_column, arrived) "
|
|
|
+ "VALUES (${payload}, FROM_UNIXTIME(${timestamp}/1000))"
|
|
|
+ >>
|
|
|
+ },
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, {{_, 201, _}, _, #{<<"status">> := Status}}} when
|
|
|
+ Status =:= <<"connecting">> orelse Status =:= <<"disconnected">>,
|
|
|
+ emqx_bridge_testlib:create_bridge_api(Config, Overrides)
|
|
|
+ ),
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, {{_, 204, _}, _, _}},
|
|
|
+ emqx_bridge_testlib:delete_bridge_http_api_v1(#{type => Type, name => Name})
|
|
|
+ ),
|
|
|
+ ok
|
|
|
+ end,
|
|
|
+ []
|
|
|
+ ),
|
|
|
+ ok.
|