Selaa lähdekoodia

fix: prepared statement for oracle

firest 1 vuosi sitten
vanhempi
commit
cc4387ddbf

+ 52 - 0
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -795,3 +795,55 @@ t_table_removed(Config) ->
         []
     ),
     ok.
+
+t_update_with_invalid_prepare(Config) ->
+    reset_table(Config),
+
+    {ok, _} = create_bridge_api(Config),
+
+    %% retainx is a bad column name
+    BadSQL =
+        <<"INSERT INTO mqtt_test(topic, msgid, payload, retainx) VALUES (${topic}, ${id}, ${payload}, ${retain})">>,
+
+    Override = #{<<"sql">> => BadSQL},
+    {ok, Body1} =
+        update_bridge_api(Config, Override),
+
+    ?assertMatch(#{<<"status">> := <<"disconnected">>}, Body1),
+    Error1 = maps:get(<<"status_reason">>, Body1),
+    case re:run(Error1, <<"unhealthy_target">>, [{capture, none}]) of
+        match ->
+            ok;
+        nomatch ->
+            ct:fail(#{
+                expected_pattern => "undefined_column",
+                got => Error1
+            })
+    end,
+
+    %% assert that although there was an error returned, the invliad SQL is actually put
+    BridgeName = ?config(oracle_name, Config),
+    C1 = [{action_name, BridgeName}, {action_type, oracle} | Config],
+    {ok, {{_, 200, "OK"}, _, Action}} = emqx_bridge_v2_testlib:get_action_api(C1),
+    #{<<"parameters">> := #{<<"sql">> := FetchedSQL}} = Action,
+    ?assertEqual(FetchedSQL, BadSQL),
+
+    %% update again with the original sql
+    {ok, Body2} = update_bridge_api(Config),
+    %% the error should be gone now, and status should be 'connected'
+    ?assertMatch(#{<<"status">> := <<"connected">>}, Body2),
+    %% finally check if ecpool worker should have exactly one of reconnect callback
+    ConnectorResId = <<"connector:oracle:", BridgeName/binary>>,
+    Workers = ecpool:workers(ConnectorResId),
+    [_ | _] = WorkerPids = lists:map(fun({_, Pid}) -> Pid end, Workers),
+    lists:foreach(
+        fun(Pid) ->
+            [{emqx_oracle, prepare_sql_to_conn, Args}] =
+                ecpool_worker:get_reconnect_callbacks(Pid),
+            Sig = emqx_postgresql:get_reconnect_callback_signature(Args),
+            BridgeResId = <<"action:oracle:", BridgeName/binary, $:, ConnectorResId/binary>>,
+            ?assertEqual(BridgeResId, Sig)
+        end,
+        WorkerPids
+    ),
+    ok.

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.2.4"},
+    {vsn, "0.2.5"},
     {registered, []},
     {applications, [
         kernel,

+ 7 - 1
apps/emqx_oracle/src/emqx_oracle.erl

@@ -35,7 +35,7 @@
 ]).
 
 %% callbacks for ecpool
--export([connect/1, prepare_sql_to_conn/3]).
+-export([connect/1, prepare_sql_to_conn/3, get_reconnect_callback_signature/1]).
 
 %% Internal exports used to execute code with ecpool worker
 -export([
@@ -496,6 +496,12 @@ prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) whe
             Error
     end.
 
+%% this callback accepts the arg list provided to
+%% ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Templates]})
+%% so ecpool_worker can de-duplicate the callbacks based on the signature.
+get_reconnect_callback_signature([[{ChannelId, _Template}]]) ->
+    ChannelId.
+
 check_if_table_exists(Conn, SQL, Tokens0) ->
     % Discard nested tokens for checking if table exist. As payload here is defined as
     % a single string, it would fail if Token is, for instance, ${payload.msg}, causing

+ 6 - 0
changes/ee/fix-14126.en.md

@@ -0,0 +1,6 @@
+Fix prepared statements for Oracle integration.
+
+Prior to this fix, when updating a Oracle integration action,
+if an invalid prepared-statements is used, for example reference to an unknown table column name,
+it may cause the action to apply the oldest version prepared-statement from the past.
+