Просмотр исходного кода

Merge pull request #10356 from HJianBo/async-batch-cassa

feat: support async and batch callback for cassandra connector
JianBo He 2 лет назад
Родитель
Сommit
d16b5c40d6

+ 1 - 2
apps/emqx/test/emqx_banned_SUITE.erl

@@ -186,9 +186,8 @@ t_session_taken(_) ->
                     false
             end
         end,
-        3000
+        6000
     ),
-
     Publish(),
 
     C2 = Connect(),

+ 1 - 0
apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl

@@ -66,6 +66,7 @@ end_per_group(AuthName, Conf) ->
     Conf.
 
 init_per_suite(Config) ->
+    emqx_gateway_test_utils:load_all_gateway_apps(),
     emqx_config:erase(gateway),
     init_gateway_conf(),
     emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),

+ 0 - 2
changes/ee/feat-10140.en.md

@@ -1,4 +1,2 @@
 Integrate `Cassandra` into `bridges` as a new backend. At the current stage:
 - Only support Cassandra version 3.x, not yet 4.x.
-- Only support storing data in synchronously, not yet asynchronous and batch
-  method to store data to Cassandra.

+ 0 - 1
changes/ee/feat-10140.zh.md

@@ -1,3 +1,2 @@
 支持 Cassandra 数据桥接。在当前阶段:
 - 仅支持 Cassandra 3.x 版本,暂不支持 4.x。
-- 仅支持以同步的方式存储数据,暂不支持异步和批量的方式来存储数据到 Cassandra。

+ 1 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -3,7 +3,7 @@
        , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.2"}}}
        , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
        , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}
-       , {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.4.2"}}}
+       , {ecql, {git, "https://github.com/emqx/ecql.git", {tag, "v0.5.1"}}}
        , {emqx_connector, {path, "../../apps/emqx_connector"}}
        , {emqx_resource, {path, "../../apps/emqx_resource"}}
        , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

+ 26 - 47
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl

@@ -72,10 +72,10 @@ all() ->
 
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
-    NonBatchCases = [t_write_timeout],
+    NonBatchCases = [t_write_timeout, t_simple_sql_query],
     QueryModeGroups = [{group, async}, {group, sync}],
     BatchingGroups = [
-        %{group, with_batch},
+        {group, with_batch},
         {group, without_batch}
     ],
     [
@@ -404,12 +404,7 @@ t_setup_via_config_and_publish(Config) ->
         end,
         fun(Trace0) ->
             Trace = ?of_kind(cassandra_connector_query_return, Trace0),
-            case ?config(enable_batch, Config) of
-                true ->
-                    ?assertMatch([#{result := {_, [ok]}}], Trace);
-                false ->
-                    ?assertMatch([#{result := ok}], Trace)
-            end,
+            ?assertMatch([#{result := ok}], Trace),
             ok
         end
     ),
@@ -448,12 +443,7 @@ t_setup_via_http_api_and_publish(Config) ->
         end,
         fun(Trace0) ->
             Trace = ?of_kind(cassandra_connector_query_return, Trace0),
-            case ?config(enable_batch, Config) of
-                true ->
-                    ?assertMatch([#{result := {_, [{ok, 1}]}}], Trace);
-                false ->
-                    ?assertMatch([#{result := ok}], Trace)
-            end,
+            ?assertMatch([#{result := ok}], Trace),
             ok
         end
     ),
@@ -540,8 +530,8 @@ t_write_failure(Config) ->
         fun(Trace0) ->
             ct:pal("trace: ~p", [Trace0]),
             Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
-            ?assertMatch([#{result := {error, _}} | _], Trace),
-            [#{result := {error, Error}} | _] = Trace,
+            ?assertMatch([#{result := {async_return, {error, _}}} | _], Trace),
+            [#{result := {async_return, {error, Error}}} | _] = Trace,
             case Error of
                 {resource_error, _} ->
                     ok;
@@ -576,7 +566,6 @@ t_write_failure(Config) ->
 %    ok.
 
 t_simple_sql_query(Config) ->
-    EnableBatch = ?config(enable_batch, Config),
     QueryMode = ?config(query_mode, Config),
     ?assertMatch(
         {ok, _},
@@ -592,12 +581,7 @@ t_simple_sql_query(Config) ->
                 {ok, Res} = receive_result(Ref, 2_000),
                 Res
         end,
-    case EnableBatch of
-        true ->
-            ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
-        false ->
-            ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result)
-    end,
+    ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result),
     ok.
 
 t_missing_data(Config) ->
@@ -607,27 +591,29 @@ t_missing_data(Config) ->
     ),
     %% emqx_ee_connector_cassa will send missed data as a `null` atom
     %% to ecql driver
-    {_, {ok, Event}} =
-        ?wait_async_action(
-            send_message(Config, #{}),
-            #{?snk_kind := buffer_worker_flush_ack},
-            2_000
-        ),
-    ?assertMatch(
-        %% TODO: match error msgs
-        #{
-            result :=
-                {error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}}
-        },
-        Event
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                send_message(Config, #{}),
+                #{?snk_kind := handle_async_reply, result := {error, {8704, _}}},
+                10_000
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            %% 1. ecql driver will return `ok` first in async query
+            Trace = ?of_kind(cassandra_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            %% 2. then it will return an error in callback function
+            Trace1 = ?of_kind(handle_async_reply, Trace0),
+            ?assertMatch([#{result := {error, {8704, _}}}], Trace1),
+            ok
+        end
     ),
     ok.
 
 t_bad_sql_parameter(Config) ->
     QueryMode = ?config(query_mode, Config),
-    EnableBatch = ?config(enable_batch, Config),
-    Name = ?config(cassa_name, Config),
-    ResourceId = emqx_bridge_resource:resource_id(cassandra, Name),
     ?assertMatch(
         {ok, _},
         create_bridge(
@@ -656,14 +642,7 @@ t_bad_sql_parameter(Config) ->
                         ct:fail("no response received")
                 end
         end,
-    case EnableBatch of
-        true ->
-            ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
-        false ->
-            ?assertMatch(
-                {error, {unrecoverable_error, _}}, Result
-            )
-    end,
+    ?assertMatch({error, _}, Result),
     ok.
 
 t_nasty_sql_string(Config) ->

+ 112 - 13
lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl

@@ -33,8 +33,9 @@
     on_start/2,
     on_stop/2,
     on_query/3,
-    %% TODO: not_supported_now
-    %%on_batch_query/3,
+    on_query_async/4,
+    on_batch_query/3,
+    on_batch_query_async/4,
     on_get_status/2
 ]).
 
@@ -45,7 +46,7 @@
 ]).
 
 %% callbacks for query executing
--export([query/3, prepared_query/3]).
+-export([query/4, prepared_query/4, batch_query/3]).
 
 -export([do_get_status/1]).
 
@@ -96,7 +97,7 @@ keyspace(_) -> undefined.
 %%--------------------------------------------------------------------
 %% callbacks for emqx_resource
 
-callback_mode() -> always_sync.
+callback_mode() -> async_if_possible.
 
 -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
 on_start(
@@ -172,6 +173,28 @@ on_stop(InstId, #{poolname := PoolName}) ->
 on_query(
     InstId,
     Request,
+    State
+) ->
+    do_single_query(InstId, Request, sync, State).
+
+-spec on_query_async(
+    emqx_resource:resource_id(),
+    request(),
+    {function(), list()},
+    state()
+) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
+on_query_async(
+    InstId,
+    Request,
+    Callback,
+    State
+) ->
+    do_single_query(InstId, Request, {async, Callback}, State).
+
+do_single_query(
+    InstId,
+    Request,
+    Async,
     #{poolname := PoolName} = State
 ) ->
     {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
@@ -187,7 +210,59 @@ on_query(
         }
     ),
     {PreparedKeyOrSQL1, Data} = proc_cql_params(Type, PreparedKeyOrSQL, Params, State),
-    Res = exec_cql_query(InstId, PoolName, Type, PreparedKeyOrSQL1, Data),
+    Res = exec_cql_query(InstId, PoolName, Type, Async, PreparedKeyOrSQL1, Data),
+    handle_result(Res).
+
+-spec on_batch_query(
+    emqx_resource:resource_id(),
+    [request()],
+    state()
+) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
+on_batch_query(
+    InstId,
+    Requests,
+    State
+) ->
+    do_batch_query(InstId, Requests, sync, State).
+
+-spec on_batch_query_async(
+    emqx_resource:resource_id(),
+    [request()],
+    {function(), list()},
+    state()
+) -> ok | {error, {recoverable_error | unrecoverable_error, term()}}.
+on_batch_query_async(
+    InstId,
+    Requests,
+    Callback,
+    State
+) ->
+    do_batch_query(InstId, Requests, {async, Callback}, State).
+
+do_batch_query(
+    InstId,
+    Requests,
+    Async,
+    #{poolname := PoolName} = State
+) ->
+    CQLs =
+        lists:map(
+            fun(Request) ->
+                {Type, PreparedKeyOrSQL, Params} = parse_request_to_cql(Request),
+                proc_cql_params(Type, PreparedKeyOrSQL, Params, State)
+            end,
+            Requests
+        ),
+    ?tp(
+        debug,
+        cassandra_connector_received_cql_batch_query,
+        #{
+            connector => InstId,
+            cqls => CQLs,
+            state => State
+        }
+    ),
+    Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs),
     handle_result(Res).
 
 parse_request_to_cql({send_message, Params}) ->
@@ -203,17 +278,32 @@ proc_cql_params(
     Params,
     #{prepare_statement := Prepares, params_tokens := ParamsTokens}
 ) ->
-    PreparedKey = maps:get(PreparedKey0, Prepares),
+    %% assert
+    _PreparedKey = maps:get(PreparedKey0, Prepares),
     Tokens = maps:get(PreparedKey0, ParamsTokens),
-    {PreparedKey, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
+    {PreparedKey0, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))};
 proc_cql_params(query, SQL, Params, _State) ->
     {SQL1, Tokens} = emqx_plugin_libs_rule:preproc_sql(SQL, '?'),
     {SQL1, assign_type_for_params(emqx_plugin_libs_rule:proc_sql(Tokens, Params))}.
 
-exec_cql_query(InstId, PoolName, Type, PreparedKey, Data) when
+exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
     Type == query; Type == prepared_query
 ->
-    case ecpool:pick_and_do(PoolName, {?MODULE, Type, [PreparedKey, Data]}, no_handover) of
+    case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of
+        {error, Reason} = Result ->
+            ?tp(
+                error,
+                cassandra_connector_query_return,
+                #{connector => InstId, error => Reason}
+            ),
+            Result;
+        Result ->
+            ?tp(debug, cassandra_connector_query_return, #{result => Result}),
+            Result
+    end.
+
+exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
+    case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of
         {error, Reason} = Result ->
             ?tp(
                 error,
@@ -261,11 +351,20 @@ do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepare
 %%--------------------------------------------------------------------
 %% callbacks query
 
-query(Conn, SQL, Params) ->
-    ecql:query(Conn, SQL, Params).
+query(Conn, sync, CQL, Params) ->
+    ecql:query(Conn, CQL, Params);
+query(Conn, {async, Callback}, CQL, Params) ->
+    ecql:async_query(Conn, CQL, Params, one, Callback).
+
+prepared_query(Conn, sync, PreparedKey, Params) ->
+    ecql:execute(Conn, PreparedKey, Params);
+prepared_query(Conn, {async, Callback}, PreparedKey, Params) ->
+    ecql:async_execute(Conn, PreparedKey, Params, Callback).
 
-prepared_query(Conn, PreparedKey, Params) ->
-    ecql:execute(Conn, PreparedKey, Params).
+batch_query(Conn, sync, Rows) ->
+    ecql:batch(Conn, Rows);
+batch_query(Conn, {async, Callback}, Rows) ->
+    ecql:async_batch(Conn, Rows, Callback).
 
 %%--------------------------------------------------------------------
 %% callbacks for ecpool