|
|
@@ -165,6 +165,9 @@ sql_insert_template_for_bridge() ->
|
|
|
sql_insert_template_with_nested_token_for_bridge() ->
|
|
|
"INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload.msg}, ${retain})".
|
|
|
|
|
|
+sql_insert_template_with_inconsistent_datatype() ->
|
|
|
+ "INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${flags})".
|
|
|
+
|
|
|
sql_create_table() ->
|
|
|
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
|
|
|
|
|
|
@@ -333,10 +336,11 @@ update_bridge_api(Config, Overrides) ->
|
|
|
probe_bridge_api(Config) ->
|
|
|
probe_bridge_api(Config, _Overrides = #{}).
|
|
|
|
|
|
-probe_bridge_api(Config, _Overrides) ->
|
|
|
+probe_bridge_api(Config, Overrides) ->
|
|
|
TypeBin = ?BRIDGE_TYPE_BIN,
|
|
|
Name = ?config(oracle_name, Config),
|
|
|
- OracleConfig = ?config(oracle_config, Config),
|
|
|
+ OracleConfig0 = ?config(oracle_config, Config),
|
|
|
+ OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
|
|
|
Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
|
|
|
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
|
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
|
|
@@ -539,6 +543,14 @@ t_start_stop(Config) ->
|
|
|
ok.
|
|
|
|
|
|
t_probe_with_nested_tokens(Config) ->
|
|
|
+ ProbeRes0 = probe_bridge_api(
|
|
|
+ Config,
|
|
|
+ #{<<"sql">> => sql_insert_template_with_nested_token_for_bridge()}
|
|
|
+ ),
|
|
|
+ ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0).
|
|
|
+
|
|
|
+t_message_with_nested_tokens(Config) ->
|
|
|
+ BridgeId = bridge_id(Config),
|
|
|
ResourceId = resource_id(Config),
|
|
|
reset_table(Config),
|
|
|
?assertMatch(
|
|
|
@@ -553,7 +565,34 @@ t_probe_with_nested_tokens(Config) ->
|
|
|
_Sleep = 1_000,
|
|
|
_Attempts = 20,
|
|
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
|
|
- ).
|
|
|
+ ),
|
|
|
+ MsgId = erlang:unique_integer(),
|
|
|
+ Data = binary_to_list(?config(oracle_name, Config)),
|
|
|
+ Params = #{
|
|
|
+ topic => ?config(mqtt_topic, Config),
|
|
|
+ id => MsgId,
|
|
|
+ payload => emqx_utils_json:encode(#{<<"msg">> => Data}),
|
|
|
+ retain => false
|
|
|
+ },
|
|
|
+ emqx_bridge:send_message(BridgeId, Params),
|
|
|
+ ?retry(
|
|
|
+ _Sleep = 1_000,
|
|
|
+ _Attempts = 20,
|
|
|
+ ?assertMatch(
|
|
|
+ {ok, [{result_set, [<<"PAYLOAD">>], _, [[Data]]}]},
|
|
|
+ emqx_resource:simple_sync_query(
|
|
|
+ ResourceId, {query, "SELECT payload FROM mqtt_test"}
|
|
|
+ )
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ ok.
|
|
|
+
|
|
|
+t_probe_with_inconsistent_datatype(Config) ->
|
|
|
+ ProbeRes0 = probe_bridge_api(
|
|
|
+ Config,
|
|
|
+ #{<<"sql">> => sql_insert_template_with_inconsistent_datatype()}
|
|
|
+ ),
|
|
|
+ ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0).
|
|
|
|
|
|
t_on_get_status(Config) ->
|
|
|
ProxyPort = ?config(proxy_port, Config),
|