|
|
@@ -338,3 +338,53 @@ t_disable_prepared_statements(Config0) ->
|
|
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
|
|
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
|
|
ok.
|
|
|
+
|
|
|
+t_update_with_invalid_prepare(Config) ->
|
|
|
+ ConnectorName = ?config(connector_name, Config),
|
|
|
+ BridgeName = ?config(bridge_name, Config),
|
|
|
+ ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
|
|
+ %% arrivedx is a bad column name
|
|
|
+ BadSQL = <<
|
|
|
+ "INSERT INTO mqtt_test(payload, arrivedx) "
|
|
|
+ "VALUES (${payload}, TO_TIMESTAMP((${timestamp} :: bigint)/1000))"
|
|
|
+ >>,
|
|
|
+ Override = #{<<"parameters">> => #{<<"sql">> => BadSQL}},
|
|
|
+ {ok, {{_, 200, "OK"}, _Headers1, Body1}} =
|
|
|
+ emqx_bridge_v2_testlib:update_bridge_api(Config, Override),
|
|
|
+ ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1),
|
|
|
+ Error1 = maps:get(<<"error">>, Body1),
|
|
|
+ case re:run(Error1, <<"undefined_column">>, [{capture, none}]) of
|
|
|
+ match ->
|
|
|
+ ok;
|
|
|
+ nomatch ->
|
|
|
+ ct:fail(#{
|
|
|
+ expected_pattern => "undefined_column",
|
|
|
+ got => Error1
|
|
|
+ })
|
|
|
+ end,
|
|
|
+ %% assert that although there was an error returned, the invliad SQL is actually put
|
|
|
+ C1 = [{action_name, BridgeName}, {action_type, pgsql} | Config],
|
|
|
+ {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1),
|
|
|
+ #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action,
|
|
|
+ ?assertEqual(FetchedSQL, BadSQL),
|
|
|
+
|
|
|
+ %% update again with the original sql
|
|
|
+ {ok, {{_, 200, "OK"}, _Headers2, Body2}} =
|
|
|
+ emqx_bridge_v2_testlib:update_bridge_api(Config, #{}),
|
|
|
+ %% the error should be gone now, and status should be 'connected'
|
|
|
+ ?assertMatch(#{<<"error">> := <<>>, <<"status">> := <<"connected">>}, Body2),
|
|
|
+ %% finally check if ecpool worker should have exactly one of reconnect callback
|
|
|
+ ConnectorResId = <<"connector:pgsql:", ConnectorName/binary>>,
|
|
|
+ Workers = ecpool:workers(ConnectorResId),
|
|
|
+ [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers),
|
|
|
+ lists:foreach(
|
|
|
+ fun(Pid) ->
|
|
|
+ [{emqx_postgresql, prepare_sql_to_conn, Args}] =
|
|
|
+ ecpool_worker:get_reconnect_callbacks(Pid),
|
|
|
+ Sig = emqx_postgresql:get_reconnect_callback_signature(Args),
|
|
|
+ BridgeResId = <<"action:pgsql:", BridgeName/binary, $:, ConnectorResId/binary>>,
|
|
|
+ ?assertEqual(BridgeResId, Sig)
|
|
|
+ end,
|
|
|
+ WorkerPids
|
|
|
+ ),
|
|
|
+ ok.
|