Просмотр исходного кода

feat(oracle): check whether target table exists

Fixes https://emqx.atlassian.net/browse/EMQX-9026
Paulo Zulato 2 лет назад
Родитель
Сommit
5f10936091

+ 69 - 11
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -179,18 +179,39 @@ sql_drop_table() ->
 sql_check_table_exist() ->
     "SELECT COUNT(*) FROM user_tables WHERE table_name = 'MQTT_TEST'".
 
+new_jamdb_connection(Config) ->
+    JamdbOpts = [
+        {host, ?config(oracle_host, Config)},
+        {port, ?config(oracle_port, Config)},
+        {user, "system"},
+        {password, "oracle"},
+        {sid, ?SID}
+    ],
+    jamdb_oracle:start(JamdbOpts).
+
+close_jamdb_connection(Conn) ->
+    jamdb_oracle:stop(Conn).
+
 reset_table(Config) ->
-    ResourceId = resource_id(Config),
-    drop_table_if_exists(Config),
-    {ok, [{proc_result, 0, _}]} = emqx_resource:simple_sync_query(
-        ResourceId, {sql, sql_create_table()}
-    ),
+    {ok, Conn} = new_jamdb_connection(Config),
+    try
+        ok = drop_table_if_exists(Conn),
+        {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_create_table())
+    after
+        close_jamdb_connection(Conn)
+    end,
     ok.
 
+drop_table_if_exists(Conn) when is_pid(Conn) ->
+    {ok, [{proc_result, 0, _}]} = jamdb_oracle:sql_query(Conn, sql_drop_table()),
+    ok;
 drop_table_if_exists(Config) ->
-    ResourceId = resource_id(Config),
-    {ok, [{proc_result, 0, _}]} =
-        emqx_resource:simple_sync_query(ResourceId, {query, sql_drop_table()}),
+    {ok, Conn} = new_jamdb_connection(Config),
+    try
+        ok = drop_table_if_exists(Conn)
+    after
+        close_jamdb_connection(Conn)
+    end,
     ok.
 
 oracle_config(TestCase, _ConnectionType, Config) ->
@@ -216,7 +237,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
             "  pool_size = 1\n"
             "  sql = \"~s\"\n"
             "  resource_opts = {\n"
-            "     health_check_interval = \"5s\"\n"
+            "     health_check_interval = \"15s\"\n"
             "     request_ttl = \"30s\"\n"
             "     query_mode = \"async\"\n"
             "     batch_size = 3\n"
@@ -349,13 +370,13 @@ t_sync_query(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 20,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
-            reset_table(Config),
             MsgId = erlang:unique_integer(),
             Params = #{
                 topic => ?config(mqtt_topic, Config),
@@ -381,13 +402,13 @@ t_batch_sync_query(Config) ->
     BridgeId = bridge_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?retry(
                 _Sleep = 1_000,
                 _Attempts = 30,
                 ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
             ),
-            reset_table(Config),
             MsgId = erlang:unique_integer(),
             Params = #{
                 topic => ?config(mqtt_topic, Config),
@@ -464,6 +485,7 @@ t_start_stop(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge(Config)),
             %% Since the connection process is async, we give it some time to
             %% stabilize and avoid flakiness.
@@ -515,6 +537,7 @@ t_on_get_status(Config) ->
     ProxyHost = ?config(proxy_host, Config),
     ProxyName = ?config(proxy_name, Config),
     ResourceId = resource_id(Config),
+    reset_table(Config),
     ?assertMatch({ok, _}, create_bridge(Config)),
     %% Since the connection process is async, we give it some time to
     %% stabilize and avoid flakiness.
@@ -547,10 +570,45 @@ t_no_sid_nor_service_name(Config0) ->
     ),
     ok.
 
+t_missing_table(Config) ->
+    ResourceId = resource_id(Config),
+    ?check_trace(
+        begin
+            drop_table_if_exists(Config),
+            ?assertMatch({ok, _}, create_bridge_api(Config)),
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertMatch(
+                    {ok, Status} when Status =:= disconnected orelse Status =:= connecting,
+                    emqx_resource_manager:health_check(ResourceId)
+                )
+            ),
+            MsgId = erlang:unique_integer(),
+            Params = #{
+                topic => ?config(mqtt_topic, Config),
+                id => MsgId,
+                payload => ?config(oracle_name, Config),
+                retain => true
+            },
+            Message = {send_message, Params},
+            ?assertMatch(
+                {error, {resource_error, #{reason := not_connected}}},
+                emqx_resource:simple_sync_query(ResourceId, Message)
+            ),
+            ok
+        end,
+        fun(Trace) ->
+            ?assertNotMatch([], ?of_kind(oracle_undefined_table, Trace)),
+            ok
+        end
+    ).
+
 t_table_removed(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
         begin
+            reset_table(Config),
             ?assertMatch({ok, _}, create_bridge_api(Config)),
             ?retry(
                 _Sleep = 1_000,

+ 116 - 39
apps/emqx_oracle/src/emqx_oracle.erl

@@ -9,8 +9,6 @@
 -include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
--define(ORACLE_DEFAULT_PORT, 1521).
-
 %%====================================================================
 %% Exports
 %%====================================================================
@@ -26,7 +24,7 @@
 ]).
 
 %% callbacks for ecpool
--export([connect/1, prepare_sql_to_conn/2]).
+-export([connect/1, prepare_sql_to_conn/3]).
 
 %% Internal exports used to execute code with ecpool worker
 -export([
@@ -39,18 +37,15 @@
     oracle_host_options/0
 ]).
 
--define(ACTION_SEND_MESSAGE, send_message).
-
+-define(ORACLE_DEFAULT_PORT, 1521).
 -define(SYNC_QUERY_MODE, no_handover).
-
+-define(DEFAULT_POOL_SIZE, 8).
+-define(OPT_TIMEOUT, 30000).
+-define(MAX_CURSORS, 10).
 -define(ORACLE_HOST_OPTIONS, #{
     default_port => ?ORACLE_DEFAULT_PORT
 }).
 
--define(MAX_CURSORS, 10).
--define(DEFAULT_POOL_SIZE, 8).
--define(OPT_TIMEOUT, 30000).
-
 -type prepares() :: #{atom() => binary()}.
 -type params_tokens() :: #{atom() => list()}.
 
@@ -105,7 +100,7 @@ on_start(
     ],
     PoolName = InstId,
     Prepares = parse_prepare_sql(Config),
-    InitState = #{pool_name => PoolName, prepare_statement => #{}},
+    InitState = #{pool_name => PoolName},
     State = maps:merge(InitState, Prepares),
     case emqx_resource_pool:start(InstId, ?MODULE, Options) of
         ok ->
@@ -148,7 +143,7 @@ on_query(
 on_batch_query(
     InstId,
     BatchReq,
-    #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
+    #{pool_name := PoolName, params_tokens := Tokens, prepare_sql := Sts} = State
 ) ->
     case BatchReq of
         [{Key, _} = Request | _] ->
@@ -241,7 +236,13 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
                     connected;
                 {ok, NState} ->
                     %% return new state with prepared statements
-                    {connected, NState}
+                    {connected, NState};
+                {error, _Reason} ->
+                    %% do not log error, it is logged in prepare_sql_to_conn
+                    connecting;
+                {undefined_table, NState} ->
+                    %% return new state indicating that we are connected but the target table is not created
+                    {disconnected, NState, unhealthy_target}
             end;
         false ->
             disconnected
@@ -250,11 +251,42 @@ on_get_status(_InstId, #{pool_name := Pool} = State) ->
 do_get_status(Conn) ->
     ok == element(1, jamdb_oracle:sql_query(Conn, "select 1 from dual")).
 
-do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
-    ok;
-do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
-    {ok, Sts} = prepare_sql(Prepares, PoolName),
-    {ok, State#{prepare_sql => Prepares, prepare_statement := Sts}}.
+do_check_prepares(
+    #{
+        pool_name := PoolName,
+        prepare_sql := #{<<"send_message">> := SQL},
+        params_tokens := #{<<"send_message">> := Tokens}
+    } = State
+) ->
+    % it's already connected. Verify if target table still exists
+    Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
+    lists:foldl(
+        fun
+            (WorkerPid, ok) ->
+                {ok, Conn} = ecpool_worker:client(WorkerPid),
+                case check_if_table_exists(Conn, SQL, Tokens) of
+                    {error, undefined_table} -> {undefined_table, State};
+                    _ -> ok
+                end;
+            (_, Acc) ->
+                Acc
+        end,
+        ok,
+        Workers
+    );
+do_check_prepares(
+    State = #{pool_name := PoolName, prepare_sql := {error, Prepares}, params_tokens := TokensMap}
+) ->
+    case prepare_sql(Prepares, PoolName, TokensMap) of
+        %% remove the error
+        {ok, Sts} ->
+            {ok, State#{prepare_sql => Sts}};
+        {error, undefined_table} ->
+            %% indicate the error
+            {undefined_table, State#{prepare_sql => {error, Prepares}}};
+        {error, _Reason} = Error ->
+            Error
+    end.
 
 %% ===================================================================
 
@@ -312,36 +344,81 @@ parse_prepare_sql([], Prepares, Tokens) ->
         params_tokens => Tokens
     }.
 
-init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName}) ->
-    {ok, Sts} = prepare_sql(Prepares, PoolName),
-    State#{prepare_statement := Sts}.
+init_prepare(State = #{prepare_sql := Prepares, pool_name := PoolName, params_tokens := TokensMap}) ->
+    case prepare_sql(Prepares, PoolName, TokensMap) of
+        {ok, Sts} ->
+            State#{prepare_sql := Sts};
+        Error ->
+            LogMeta = #{
+                msg => <<"Oracle Database init prepare statement failed">>, error => Error
+            },
+            ?SLOG(error, LogMeta),
+            %% mark the prepare_sql as failed
+            State#{prepare_sql => {error, Prepares}}
+    end.
 
-prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
-    prepare_sql(maps:to_list(Prepares), PoolName);
-prepare_sql(Prepares, PoolName) ->
-    Data = do_prepare_sql(Prepares, PoolName),
-    {ok, _Sts} = Data,
-    ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
-    Data.
+prepare_sql(Prepares, PoolName, TokensMap) when is_map(Prepares) ->
+    prepare_sql(maps:to_list(Prepares), PoolName, TokensMap);
+prepare_sql(Prepares, PoolName, TokensMap) ->
+    case do_prepare_sql(Prepares, PoolName, TokensMap) of
+        {ok, _Sts} = Ok ->
+            %% prepare for reconnect
+            ecpool:add_reconnect_callback(PoolName, {?MODULE, prepare_sql_to_conn, [Prepares]}),
+            Ok;
+        Error ->
+            Error
+    end.
 
-do_prepare_sql(Prepares, PoolName) ->
-    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, #{}).
+do_prepare_sql(Prepares, PoolName, TokensMap) ->
+    do_prepare_sql(ecpool:workers(PoolName), Prepares, PoolName, TokensMap, #{}).
 
-do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, _LastSts) ->
+do_prepare_sql([{_Name, Worker} | T], Prepares, PoolName, TokensMap, _LastSts) ->
     {ok, Conn} = ecpool_worker:client(Worker),
-    {ok, Sts} = prepare_sql_to_conn(Conn, Prepares),
-    do_prepare_sql(T, Prepares, PoolName, Sts);
-do_prepare_sql([], _Prepares, _PoolName, LastSts) ->
+    case prepare_sql_to_conn(Conn, Prepares, TokensMap) of
+        {ok, Sts} ->
+            do_prepare_sql(T, Prepares, PoolName, TokensMap, Sts);
+        Error ->
+            Error
+    end;
+do_prepare_sql([], _Prepares, _PoolName, _TokensMap, LastSts) ->
     {ok, LastSts}.
 
-prepare_sql_to_conn(Conn, Prepares) ->
-    prepare_sql_to_conn(Conn, Prepares, #{}).
+prepare_sql_to_conn(Conn, Prepares, TokensMap) ->
+    prepare_sql_to_conn(Conn, Prepares, TokensMap, #{}).
 
-prepare_sql_to_conn(Conn, [], Statements) when is_pid(Conn) -> {ok, Statements};
-prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], Statements) when is_pid(Conn) ->
+prepare_sql_to_conn(Conn, [], _TokensMap, Statements) when is_pid(Conn) -> {ok, Statements};
+prepare_sql_to_conn(Conn, [{Key, SQL} | PrepareList], TokensMap, Statements) when is_pid(Conn) ->
     LogMeta = #{msg => "Oracle Database Prepare Statement", name => Key, prepare_sql => SQL},
+    Tokens = maps:get(Key, TokensMap, []),
     ?SLOG(info, LogMeta),
-    prepare_sql_to_conn(Conn, PrepareList, Statements#{Key => SQL}).
+    case check_if_table_exists(Conn, SQL, Tokens) of
+        ok ->
+            ?SLOG(info, LogMeta#{result => success}),
+            prepare_sql_to_conn(Conn, PrepareList, TokensMap, Statements#{Key => SQL});
+        {error, undefined_table} = Error ->
+            %% Target table is not created
+            ?SLOG(error, LogMeta#{result => failed, reason => "table does not exist"}),
+            ?tp(oracle_undefined_table, #{}),
+            Error;
+        Error ->
+            Error
+    end.
+
+check_if_table_exists(Conn, SQL, Tokens) ->
+    {Event, _Headers} = emqx_rule_events:eventmsg_publish(
+        emqx_message:make(<<"t/opic">>, "test query")
+    ),
+    SqlQuery = "begin " ++ binary_to_list(SQL) ++ "; rollback; end;",
+    Params = emqx_placeholder:proc_sql(Tokens, Event),
+    case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of
+        {ok, [{proc_result, 0, _Description}]} ->
+            ok;
+        {ok, [{proc_result, 6550, _Description}]} ->
+            %% Target table is not created
+            {error, undefined_table};
+        Reason ->
+            {error, Reason}
+    end.
 
 to_bin(Bin) when is_binary(Bin) ->
     Bin;