Quellcode durchsuchen

test: improve basic tests for EE mysql bridge

Erik Timan vor 3 Jahren
Ursprung
Commit
96bff1d32e

+ 9 - 0
apps/emqx_connector/src/emqx_connector_mysql.erl

@@ -19,6 +19,7 @@
 -include_lib("typerefl/include/types.hrl").
 -include_lib("hocon/include/hoconsc.hrl").
 -include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 -behaviour(emqx_resource).
 
@@ -398,6 +399,10 @@ on_sql_query(
     ?TRACE("QUERY", "mysql_connector_received", LogMeta),
     Worker = ecpool:get_client(PoolName),
     {ok, Conn} = ecpool_worker:client(Worker),
+    ?tp(
+        mysql_connector_send_query,
+        #{sql_or_key => SQLOrKey, data => Data}
+    ),
     try mysql:SQLFunc(Conn, SQLOrKey, Data, Timeout) of
         {error, disconnected} = Result ->
             ?SLOG(
@@ -427,6 +432,10 @@ on_sql_query(
             ),
             Result;
         Result ->
+            ?tp(
+                mysql_connector_query_return,
+                #{result => Result}
+            ),
             Result
     catch
         error:badarg ->

+ 141 - 69
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl

@@ -9,6 +9,7 @@
 
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 % SQL definitions
 -define(SQL_BRIDGE,
@@ -34,34 +35,68 @@
 
 all() ->
     [
-        {group, tcp},
-        {group, tls}
-        | (emqx_common_test_helpers:all(?MODULE) -- group_tests())
-    ].
-
-group_tests() ->
-    [
-        t_setup_via_config_and_publish,
-        t_setup_via_http_api_and_publish
+        {group, with_batch},
+        {group, without_batch}
     ].
 
 groups() ->
+    TCs = emqx_common_test_helpers:all(?MODULE),
     [
-        {tcp, group_tests()},
-        {tls, group_tests()}
+        {with_batch, [
+            {group, sync_query},
+            {group, async_query}
+        ]},
+        {without_batch, [
+            {group, sync_query},
+            {group, async_query}
+        ]},
+        {sync_query, [
+            {group, tcp},
+            {group, tls}
+        ]},
+        {async_query, [
+            {group, tcp},
+            {group, tls}
+        ]},
+        {tcp, TCs},
+        {tls, TCs}
     ].
 
-init_per_group(GroupType = tcp, Config) ->
+init_per_group(tcp, Config0) ->
     MysqlHost = os:getenv("MYSQL_TCP_HOST", "mysql"),
     MysqlPort = list_to_integer(os:getenv("MYSQL_TCP_PORT", "3306")),
-    common_init(GroupType, Config, MysqlHost, MysqlPort);
-init_per_group(GroupType = tls, Config) ->
+    Config = [
+        {mysql_host, MysqlHost},
+        {mysql_port, MysqlPort},
+        {enable_tls, false}
+        | Config0
+    ],
+    common_init(Config);
+init_per_group(tls, Config0) ->
     MysqlHost = os:getenv("MYSQL_TLS_HOST", "mysql-tls"),
     MysqlPort = list_to_integer(os:getenv("MYSQL_TLS_PORT", "3306")),
-    common_init(GroupType, Config, MysqlHost, MysqlPort).
+    Config = [
+        {mysql_host, MysqlHost},
+        {mysql_port, MysqlPort},
+        {enable_tls, true}
+        | Config0
+    ],
+    common_init(Config);
+init_per_group(sync_query, Config) ->
+    [{query_mode, sync} | Config];
+init_per_group(async_query, Config) ->
+    [{query_mode, async} | Config];
+init_per_group(with_batch, Config) ->
+    [{enable_batch, true} | Config];
+init_per_group(without_batch, Config) ->
+    [{enable_batch, false} | Config];
+init_per_group(_Group, Config) ->
+    Config.
 
-end_per_group(GroupType, Config) ->
-    drop_table_raw(GroupType, Config),
+end_per_group(Group, Config) when Group =:= tcp; Group =:= tls ->
+    connect_and_drop_table(Config),
+    ok;
+end_per_group(_Group, _Config) ->
     ok.
 
 init_per_suite(Config) ->
@@ -72,13 +107,16 @@ end_per_suite(_Config) ->
     ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
     ok.
 
-init_per_testcase(_Testcase, Config) ->
+init_per_testcase(_Testcase, Config0) ->
+    Config = [{mysql_direct_pid, connect_direct_mysql(Config0)} | Config0],
     catch clear_table(Config),
     delete_bridge(Config),
     Config.
 
 end_per_testcase(_Testcase, Config) ->
     catch clear_table(Config),
+    DirectPid = ?config(mysql_direct_pid, Config),
+    mysql:stop(DirectPid),
     delete_bridge(Config),
     ok.
 
@@ -86,8 +124,10 @@ end_per_testcase(_Testcase, Config) ->
 %% Helper fns
 %%------------------------------------------------------------------------------
 
-common_init(GroupType, Config0, MysqlHost, MysqlPort) ->
+common_init(Config0) ->
     BridgeType = <<"mysql">>,
+    MysqlHost = ?config(mysql_host, Config0),
+    MysqlPort = ?config(mysql_port, Config0),
     case emqx_common_test_helpers:is_tcp_server_available(MysqlHost, MysqlPort) of
         true ->
             % Ensure EE bridge module is loaded
@@ -95,31 +135,28 @@ common_init(GroupType, Config0, MysqlHost, MysqlPort) ->
             _ = emqx_ee_bridge:module_info(),
             ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
             emqx_mgmt_api_test_util:init_suite(),
-            {Name, MysqlConfig} = mysql_config(MysqlHost, MysqlPort, GroupType, BridgeType),
+            % Connect to mysql directly and create the table
+            connect_and_create_table(Config0),
+            {Name, MysqlConfig} = mysql_config(BridgeType, Config0),
             Config =
                 [
-                    {mysql_host, MysqlHost},
-                    {mysql_port, MysqlPort},
                     {mysql_config, MysqlConfig},
                     {mysql_bridge_type, BridgeType},
                     {mysql_name, Name}
                     | Config0
                 ],
-            create_table_raw(GroupType, Config),
             Config;
         false ->
             {skip, no_mysql}
     end.
 
-mysql_config(MysqlHost, MysqlPort0, GroupType, BridgeType) ->
-    MysqlPort = integer_to_list(MysqlPort0),
-    Server = MysqlHost ++ ":" ++ MysqlPort,
-    Name = iolist_to_binary(io_lib:format("~s-~s", [?MODULE, GroupType])),
-    SslEnabled =
-        case GroupType of
-            tcp -> "false";
-            tls -> "true"
-        end,
+mysql_config(BridgeType, Config) ->
+    MysqlPort = integer_to_list(?config(mysql_port, Config)),
+    Server = ?config(mysql_host, Config) ++ ":" ++ MysqlPort,
+    Name = atom_to_binary(?MODULE),
+    EnableBatch = ?config(enable_batch, Config),
+    QueryMode = ?config(query_mode, Config),
+    TlsEnabled = ?config(enable_tls, Config),
     ConfigString =
         io_lib:format(
             "bridges.~s.~s {\n"
@@ -129,6 +166,10 @@ mysql_config(MysqlHost, MysqlPort0, GroupType, BridgeType) ->
             "  username = ~p\n"
             "  password = ~p\n"
             "  sql = ~p\n"
+            "  resource_opts = {\n"
+            "    enable_batch = ~p\n"
+            "    query_mode = ~s\n"
+            "  }\n"
             "  ssl = {\n"
             "                 enable = ~w\n"
             "  }\n"
@@ -141,7 +182,9 @@ mysql_config(MysqlHost, MysqlPort0, GroupType, BridgeType) ->
                 ?MYSQL_USERNAME,
                 ?MYSQL_PASSWORD,
                 ?SQL_BRIDGE,
-                SslEnabled
+                EnableBatch,
+                QueryMode,
+                TlsEnabled
             ]
         ),
     {Name, parse_and_check(ConfigString, BridgeType, Name)}.
@@ -171,33 +214,19 @@ create_bridge_http(Params) ->
         Error -> Error
     end.
 
-query(Config, SqlQuery) ->
-    BridgeType = ?config(mysql_bridge_type, Config),
-    Name = ?config(mysql_name, Config),
-    ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
-    ?assertMatch({ok, connected}, emqx_resource:health_check(ResourceID)),
-    emqx_resource:simple_sync_query(ResourceID, {sql, SqlQuery}).
-
 send_message(Config, Payload) ->
     Name = ?config(mysql_name, Config),
     BridgeType = ?config(mysql_bridge_type, Config),
+    BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name),
     ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
     ?assertMatch({ok, connected}, emqx_resource:health_check(ResourceID)),
-    % TODO: Check why we can't use send_message directly!
-    % BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
-    % emqx_bridge:send_message(BridgeID, Payload).
-    emqx_resource:simple_sync_query(ResourceID, {send_message, Payload}).
-
-clear_table(Config) ->
-    query(Config, ?SQL_DELETE).
-
-get_payload(Config) ->
-    query(Config, ?SQL_SELECT).
+    emqx_bridge:send_message(BridgeID, Payload).
 
 % We need to create and drop the test table outside of using bridges
 % since a bridge expects the table to exist when enabling it. We
-% therefore call the mysql module directly.
-connect_raw_and_run_sql(GroupType, Config, Sql) ->
+% therefore call the mysql module directly, in addition to using it
+% for querying the DB directly.
+connect_direct_mysql(Config) ->
     Opts = [
         {host, ?config(mysql_host, Config)},
         {port, ?config(mysql_port, Config)},
@@ -206,21 +235,34 @@ connect_raw_and_run_sql(GroupType, Config, Sql) ->
         {database, ?MYSQL_DATABASE}
     ],
     SslOpts =
-        case GroupType of
-            tls ->
+        case ?config(enable_tls, Config) of
+            true ->
                 [{ssl, emqx_tls_lib:to_client_opts(#{enable => true})}];
-            tcp ->
+            false ->
                 []
         end,
     {ok, Pid} = mysql:start_link(Opts ++ SslOpts),
-    ok = mysql:query(Pid, Sql),
-    mysql:stop(Pid).
+    Pid.
+
+% These funs connect and then stop the mysql connection
+connect_and_create_table(Config) ->
+    DirectPid = connect_direct_mysql(Config),
+    ok = mysql:query(DirectPid, ?SQL_CREATE_TABLE),
+    mysql:stop(DirectPid).
+
+connect_and_drop_table(Config) ->
+    DirectPid = connect_direct_mysql(Config),
+    ok = mysql:query(DirectPid, ?SQL_DROP_TABLE),
+    mysql:stop(DirectPid).
 
-create_table_raw(GroupType, Config) ->
-    connect_raw_and_run_sql(GroupType, Config, ?SQL_CREATE_TABLE).
+% These funs expects a connection to already exist
+clear_table(Config) ->
+    DirectPid = ?config(mysql_direct_pid, Config),
+    ok = mysql:query(DirectPid, ?SQL_DELETE).
 
-drop_table_raw(GroupType, Config) ->
-    connect_raw_and_run_sql(GroupType, Config, ?SQL_DROP_TABLE).
+get_payload(Config) ->
+    DirectPid = ?config(mysql_direct_pid, Config),
+    mysql:query(DirectPid, ?SQL_SELECT).
 
 %%------------------------------------------------------------------------------
 %% Testcases
@@ -232,10 +274,25 @@ t_setup_via_config_and_publish(Config) ->
         create_bridge(Config)
     ),
     Val = integer_to_binary(erlang:unique_integer()),
-    ?assertMatch(ok, send_message(Config, #{payload => Val, timestamp => 1668602148000})),
-    ?assertMatch(
-        {ok, [<<"payload">>], [[Val]]},
-        get_payload(Config)
+    SentData = #{payload => Val, timestamp => 1668602148000},
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := mysql_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                {ok, [<<"payload">>], [[Val]]},
+                get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(mysql_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            ok
+        end
     ),
     ok.
 
@@ -252,9 +309,24 @@ t_setup_via_http_api_and_publish(Config) ->
         create_bridge_http(MysqlConfig)
     ),
     Val = integer_to_binary(erlang:unique_integer()),
-    ?assertMatch(ok, send_message(Config, #{payload => Val, timestamp => 1668602148000})),
-    ?assertMatch(
-        {ok, [<<"payload">>], [[Val]]},
-        get_payload(Config)
+    SentData = #{payload => Val, timestamp => 1668602148000},
+    ?check_trace(
+        begin
+            ?wait_async_action(
+                ?assertEqual(ok, send_message(Config, SentData)),
+                #{?snk_kind := mysql_connector_query_return},
+                10_000
+            ),
+            ?assertMatch(
+                {ok, [<<"payload">>], [[Val]]},
+                get_payload(Config)
+            ),
+            ok
+        end,
+        fun(Trace0) ->
+            Trace = ?of_kind(mysql_connector_query_return, Trace0),
+            ?assertMatch([#{result := ok}], Trace),
+            ok
+        end
     ),
     ok.