Kaynağa Gözat

fix(oracle): drop support for async queries

jamdb_oracle does not provide interface for performing async queries and
ecpool does not monitor the worker which calls jamdb_oracle, so it's
safer to keep support for sync queries only.
Paulo Zulato 2 yıl önce
ebeveyn
işleme
43bb6f00ca

+ 0 - 80
apps/emqx_bridge_oracle/test/emqx_bridge_oracle_SUITE.erl

@@ -330,8 +330,6 @@ create_rule_and_action_http(Config) ->
 %% Testcases
 %%------------------------------------------------------------------------------
 
-% Under normal operations, the bridge will be called async via
-% `simple_async_query'.
 t_sync_query(Config) ->
     ResourceId = resource_id(Config),
     ?check_trace(
@@ -360,48 +358,6 @@ t_sync_query(Config) ->
     ),
     ok.
 
-t_async_query(Config) ->
-    Overrides = #{
-        <<"resource_opts">> => #{
-            <<"enable_batch">> => <<"false">>,
-            <<"batch_size">> => 1
-        }
-    },
-    ResourceId = resource_id(Config),
-    ?check_trace(
-        begin
-            ?assertMatch({ok, _}, create_bridge_api(Config, Overrides)),
-            ?retry(
-                _Sleep = 1_000,
-                _Attempts = 20,
-                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
-            ),
-            reset_table(Config),
-            MsgId = erlang:unique_integer(),
-            Params = #{
-                topic => ?config(mqtt_topic, Config),
-                id => MsgId,
-                payload => ?config(oracle_name, Config),
-                retain => false
-            },
-            Message = {send_message, Params},
-            ?assertMatch(
-                {
-                    ok,
-                    {ok, #{result := {ok, [{affected_rows, 1}]}}}
-                },
-                ?wait_async_action(
-                    emqx_resource:query(ResourceId, Message),
-                    #{?snk_kind := oracle_query},
-                    5_000
-                )
-            ),
-            ok
-        end,
-        []
-    ),
-    ok.
-
 t_batch_sync_query(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
@@ -449,42 +405,6 @@ t_batch_sync_query(Config) ->
     ),
     ok.
 
-t_batch_async_query(Config) ->
-    ResourceId = resource_id(Config),
-    ?check_trace(
-        begin
-            ?assertMatch({ok, _}, create_bridge_api(Config)),
-            ?retry(
-                _Sleep = 1_000,
-                _Attempts = 20,
-                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
-            ),
-            reset_table(Config),
-            MsgId = erlang:unique_integer(),
-            Params = #{
-                topic => ?config(mqtt_topic, Config),
-                id => MsgId,
-                payload => ?config(oracle_name, Config),
-                retain => false
-            },
-            Message = {send_message, Params},
-            ?assertMatch(
-                {
-                    ok,
-                    {ok, #{result := {ok, [{affected_rows, 1}]}}}
-                },
-                ?wait_async_action(
-                    emqx_resource:query(ResourceId, Message),
-                    #{?snk_kind := oracle_batch_query},
-                    5_000
-                )
-            ),
-            ok
-        end,
-        []
-    ),
-    ok.
-
 t_create_via_http(Config) ->
     ?check_trace(
         begin

+ 4 - 71
apps/emqx_oracle/src/emqx_oracle.erl

@@ -23,8 +23,6 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
-    on_query_async/4,
-    on_batch_query_async/4,
     on_get_status/2
 ]).
 
@@ -35,7 +33,6 @@
 -export([
     query/3,
     execute_batch/3,
-    do_async_reply/2,
     do_get_status/1
 ]).
 
@@ -46,7 +43,6 @@
 -define(ACTION_SEND_MESSAGE, send_message).
 
 -define(SYNC_QUERY_MODE, no_handover).
--define(ASYNC_QUERY_MODE(REPLY), {handover_async, {?MODULE, do_async_reply, [REPLY]}}).
 
 -define(ORACLE_HOST_OPTIONS, #{
     default_port => ?ORACLE_DEFAULT_PORT
@@ -67,7 +63,10 @@
         batch_params_tokens := params_tokens()
     }.
 
-callback_mode() -> async_if_possible.
+% As ecpool is not monitoring the worker's PID when doing a handover_async, the
+% request can be lost if worker crashes. Thus, it's better to force requests to
+% be sync for now.
+callback_mode() -> always_sync.
 
 is_buffer_supported() -> false.
 
@@ -147,24 +146,6 @@ on_query(
     Res = on_sql_query(InstId, PoolName, Type, ?SYNC_QUERY_MODE, NameOrSQL2, Data),
     handle_result(Res).
 
-on_query_async(InstId, {TypeOrKey, NameOrSQL}, Reply, State) ->
-    on_query_async(InstId, {TypeOrKey, NameOrSQL, []}, Reply, State);
-on_query_async(
-    InstId, {TypeOrKey, NameOrSQL, Params} = Query, Reply, #{pool_name := PoolName} = State
-) ->
-    ?SLOG(debug, #{
-        msg => "oracle database connector received async sql query",
-        connector => InstId,
-        query => Query,
-        reply => Reply,
-        state => State
-    }),
-    ApplyMode = ?ASYNC_QUERY_MODE(Reply),
-    Type = query,
-    {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
-    Res = on_sql_query(InstId, PoolName, Type, ApplyMode, NameOrSQL2, Data),
-    handle_result(Res).
-
 on_batch_query(
     InstId,
     BatchReq,
@@ -207,51 +188,6 @@ on_batch_query(
             {error, {unrecoverable_error, invalid_request}}
     end.
 
-on_batch_query_async(
-    InstId,
-    BatchReq,
-    Reply,
-    #{pool_name := PoolName, params_tokens := Tokens, prepare_statement := Sts} = State
-) ->
-    case BatchReq of
-        [{Key, _} = Request | _] ->
-            BinKey = to_bin(Key),
-            case maps:get(BinKey, Tokens, undefined) of
-                undefined ->
-                    Log = #{
-                        connector => InstId,
-                        first_request => Request,
-                        state => State,
-                        msg => "batch prepare not implemented"
-                    },
-                    ?SLOG(error, Log),
-                    {error, {unrecoverable_error, batch_prepare_not_implemented}};
-                TokenList ->
-                    {_, Datas} = lists:unzip(BatchReq),
-                    Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
-                    St = maps:get(BinKey, Sts),
-                    case
-                        on_sql_query(
-                            InstId, PoolName, execute_batch, ?ASYNC_QUERY_MODE(Reply), St, Datas2
-                        )
-                    of
-                        {ok, Results} ->
-                            handle_batch_result(Results, 0);
-                        Result ->
-                            Result
-                    end
-            end;
-        _ ->
-            Log = #{
-                connector => InstId,
-                request => BatchReq,
-                state => State,
-                msg => "invalid request"
-            },
-            ?SLOG(error, Log),
-            {error, {unrecoverable_error, invalid_request}}
-    end.
-
 proc_sql_params(query, SQLOrKey, Params, _State) ->
     {SQLOrKey, Params};
 proc_sql_params(TypeOrKey, SQLOrData, Params, #{
@@ -429,6 +365,3 @@ handle_batch_result([{proc_result, RetCode, Reason} | _Rest], _Acc) ->
     {error, {unrecoverable_error, {RetCode, Reason}}};
 handle_batch_result([], Acc) ->
     {ok, Acc}.
-
-do_async_reply(Result, {ReplyFun, [Context]}) ->
-    ReplyFun(Context, Result).