Просмотр исходного кода

Merge pull request #10529 from zmstone/0426-ensure-buffer-worker-monitors-cassandra-conn-pid

0426 ensure buffer worker monitors cassandra conn pid
Zaiming (Stone) Shi 2 лет назад
Родитель
Сommit
b58d3e8f94

+ 3 - 3
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_cassa_SUITE.erl

@@ -404,7 +404,7 @@ t_setup_via_config_and_publish(Config) ->
         end,
         fun(Trace0) ->
             Trace = ?of_kind(cassandra_connector_query_return, Trace0),
-            ?assertMatch([#{result := ok}], Trace),
+            ?assertMatch([#{result := {ok, _Pid}}], Trace),
             ok
         end
     ),
@@ -443,7 +443,7 @@ t_setup_via_http_api_and_publish(Config) ->
         end,
         fun(Trace0) ->
             Trace = ?of_kind(cassandra_connector_query_return, Trace0),
-            ?assertMatch([#{result := ok}], Trace),
+            ?assertMatch([#{result := {ok, _Pid}}], Trace),
             ok
         end
     ),
@@ -603,7 +603,7 @@ t_missing_data(Config) ->
         fun(Trace0) ->
             %% 1. ecql driver will return `ok` first in async query
             Trace = ?of_kind(cassandra_connector_query_return, Trace0),
-            ?assertMatch([#{result := ok}], Trace),
+            ?assertMatch([#{result := {ok, _Pid}}], Trace),
             %% 2. then it will return an error in callback function
             Trace1 = ?of_kind(handle_async_reply, Trace0),
             ?assertMatch([#{result := {error, {8704, _}}}], Trace1),

+ 18 - 5
lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl

@@ -278,7 +278,7 @@ proc_cql_params(query, SQL, Params, _State) ->
 exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
     Type == query; Type == prepared_query
 ->
-    case ecpool:pick_and_do(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}, no_handover) of
+    case exec(PoolName, {?MODULE, Type, [Async, PreparedKey, Data]}) of
         {error, Reason} = Result ->
             ?tp(
                 error,
@@ -292,7 +292,7 @@ exec_cql_query(InstId, PoolName, Type, Async, PreparedKey, Data) when
     end.
 
 exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
-    case ecpool:pick_and_do(PoolName, {?MODULE, batch_query, [Async, CQLs]}, no_handover) of
+    case exec(PoolName, {?MODULE, batch_query, [Async, CQLs]}) of
         {error, Reason} = Result ->
             ?tp(
                 error,
@@ -305,6 +305,13 @@ exec_cql_batch_query(InstId, PoolName, Async, CQLs) ->
             Result
     end.
 
+%% Pick one of the pool members to do the query.
+%% Using 'no_handoever' strategy,
+%% meaning the buffer worker does the gen_server call or gen_server cast
+%% towards the connection process.
+exec(PoolName, Query) ->
+    ecpool:pick_and_do(PoolName, Query, no_handover).
+
 on_get_status(_InstId, #{poolname := Pool} = State) ->
     case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
         true ->
@@ -343,17 +350,23 @@ do_check_prepares(State = #{poolname := PoolName, prepare_cql := {error, Prepare
 query(Conn, sync, CQL, Params) ->
     ecql:query(Conn, CQL, Params);
 query(Conn, {async, Callback}, CQL, Params) ->
-    ecql:async_query(Conn, CQL, Params, one, Callback).
+    ok = ecql:async_query(Conn, CQL, Params, one, Callback),
+    %% return the connection pid for buffer worker to monitor
+    {ok, Conn}.
 
 prepared_query(Conn, sync, PreparedKey, Params) ->
     ecql:execute(Conn, PreparedKey, Params);
 prepared_query(Conn, {async, Callback}, PreparedKey, Params) ->
-    ecql:async_execute(Conn, PreparedKey, Params, Callback).
+    ok = ecql:async_execute(Conn, PreparedKey, Params, Callback),
+    %% return the connection pid for buffer worker to monitor
+    {ok, Conn}.
 
 batch_query(Conn, sync, Rows) ->
     ecql:batch(Conn, Rows);
 batch_query(Conn, {async, Callback}, Rows) ->
-    ecql:async_batch(Conn, Rows, Callback).
+    ok = ecql:async_batch(Conn, Rows, Callback),
+    %% return the connection pid for buffer worker to monitor
+    {ok, Conn}.
 
 %%--------------------------------------------------------------------
 %% callbacks for ecpool

+ 1 - 1
scripts/ct/run.sh

@@ -47,7 +47,7 @@ while [ "$#" -gt 0 ]; do
             exit 0
             ;;
         --app)
-            WHICH_APP="$2"
+            WHICH_APP="${2%/}"
             shift 2
             ;;
         --only-up)