Просмотр исходного кода

test(tdengine): add test cases to cover the super table feature

firest 2 лет назад
Родитель
Сommit
142125b9e4
1 измененных файлов с 187 добавлено и 17 удалено
  1. 187 17
      apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl

+ 187 - 17
apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl

@@ -24,9 +24,21 @@
     ");"
 ).
 -define(SQL_DROP_TABLE, "DROP TABLE t_mqtt_msg").
--define(SQL_DELETE, "DELETE from t_mqtt_msg").
+-define(SQL_DROP_STABLE, "DROP STABLE s_tab").
+-define(SQL_DELETE, "DELETE FROM t_mqtt_msg").
 -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg").
 
+-define(AUTO_CREATE_BRIDGE,
+    "insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})"
+).
+
+-define(SQL_CREATE_STABLE,
+    "CREATE STABLE s_tab (\n"
+    "  ts timestamp,\n"
+    "  payload BINARY(1024)\n"
+    ") TAGS (clientid BINARY(128));"
+).
+
 % DB defaults
 -define(TD_DATABASE, "mqtt").
 -define(TD_USERNAME, "root").
@@ -53,12 +65,13 @@ all() ->
 groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
     NonBatchCases = [t_write_timeout],
+    MustBatchCases = [t_batch_insert, t_auto_create_batch_insert],
     BatchingGroups = [{group, with_batch}, {group, without_batch}],
     [
         {async, BatchingGroups},
         {sync, BatchingGroups},
         {with_batch, TCs -- NonBatchCases},
-        {without_batch, TCs}
+        {without_batch, TCs -- MustBatchCases}
     ].
 
 init_per_group(async, Config) ->
@@ -117,7 +130,8 @@ common_init(ConfigT) ->
     Config0 = [
         {td_host, Host},
         {td_port, Port},
-        {proxy_name, "tdengine_restful"}
+        {proxy_name, "tdengine_restful"},
+        {template, ?SQL_BRIDGE}
         | ConfigT
     ],
 
@@ -165,6 +179,7 @@ tdengine_config(BridgeType, Config) ->
             false -> 1
         end,
     QueryMode = ?config(query_mode, Config),
+    Template = ?config(template, Config),
     ConfigString =
         io_lib:format(
             "bridges.~s.~s {\n"
@@ -187,7 +202,7 @@ tdengine_config(BridgeType, Config) ->
                 ?TD_DATABASE,
                 ?TD_USERNAME,
                 ?TD_PASSWORD,
-                ?SQL_BRIDGE,
+                Template,
                 BatchSize,
                 QueryMode
             ]
@@ -272,11 +287,15 @@ connect_direct_tdengine(Config) ->
 connect_and_create_table(Config) ->
     ?WITH_CON(begin
         {ok, _} = directly_query(Con, ?SQL_CREATE_DATABASE, []),
-        {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE)
+        {ok, _} = directly_query(Con, ?SQL_CREATE_TABLE),
+        {ok, _} = directly_query(Con, ?SQL_CREATE_STABLE)
     end).
 
 connect_and_drop_table(Config) ->
-    ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DROP_TABLE)).
+    ?WITH_CON(begin
+        {ok, _} = directly_query(Con, ?SQL_DROP_TABLE),
+        {ok, _} = directly_query(Con, ?SQL_DROP_STABLE)
+    end).
 
 connect_and_clear_table(Config) ->
     ?WITH_CON({ok, _} = directly_query(Con, ?SQL_DELETE)).
@@ -287,6 +306,15 @@ connect_and_get_payload(Config) ->
     ),
     Result.
 
+connect_and_exec(Config, SQL) ->
+    ?WITH_CON({ok, _} = directly_query(Con, SQL)).
+
+connect_and_query(Config, SQL) ->
+    ?WITH_CON(
+        {ok, #{<<"code">> := 0, <<"data">> := Data}} = directly_query(Con, SQL)
+    ),
+    Data.
+
 directly_query(Con, Query) ->
     directly_query(Con, Query, [{db_name, ?TD_DATABASE}]).
 
@@ -407,7 +435,7 @@ t_write_failure(Config) ->
                 #{?snk_kind := buffer_worker_flush_ack},
                 2_000
             ),
-        ?assertMatch({error, econnrefused}, Result),
+        ?assertMatch({error, Reason} when Reason =:= econnrefused; Reason =:= closed, Result),
         ok
     end),
     ok.
@@ -490,26 +518,19 @@ t_missing_data(Config) ->
     ok.
 
 t_bad_sql_parameter(Config) ->
-    EnableBatch = ?config(enable_batch, Config),
     ?assertMatch(
         {ok, _},
         create_bridge(Config)
     ),
-    Request = {sql, <<"">>, [bad_parameter]},
+    Request = {send_message, <<"">>},
     {_, {ok, #{result := Result}}} =
         ?wait_async_action(
             query_resource(Config, Request),
             #{?snk_kind := buffer_worker_flush_ack},
             2_000
         ),
-    case EnableBatch of
-        true ->
-            ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
-        false ->
-            ?assertMatch(
-                {error, {unrecoverable_error, _}}, Result
-            )
-    end,
+
+    ?assertMatch({error, #{<<"code">> := _}}, Result),
     ok.
 
 t_nasty_sql_string(Config) ->
@@ -544,7 +565,156 @@ t_nasty_sql_string(Config) ->
         connect_and_get_payload(Config)
     ).
 
+t_simple_insert(Config) ->
+    connect_and_clear_table(Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+
+    SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
+    Request = {send_message, SentData},
+    {_, {ok, #{result := _Result}}} =
+        ?wait_async_action(
+            query_resource(Config, Request),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+    ?assertMatch(
+        ?PAYLOAD,
+        connect_and_get_payload(Config)
+    ).
+
+t_batch_insert(Config) ->
+    connect_and_clear_table(Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+
+    Size = 5,
+    Ts = erlang:system_time(millisecond),
+    {_, {ok, #{result := Result}}} =
+        ?wait_async_action(
+            lists:foreach(
+                fun(Idx) ->
+                    SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx},
+                    Request = {send_message, SentData},
+                    query_resource(Config, Request)
+                end,
+                lists:seq(1, Size)
+            ),
+
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+
+    timer:sleep(200),
+
+    ?assertMatch(
+        [[Size]],
+        connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
+    ).
+
+t_auto_create_simple_insert(Config0) ->
+    ClientId = to_str(?FUNCTION_NAME),
+    Config = get_auto_create_config(Config0),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+
+    SentData = #{
+        payload => ?PAYLOAD,
+        timestamp => 1668602148000,
+        clientid => ClientId
+    },
+    Request = {send_message, SentData},
+    {_, {ok, #{result := _Result}}} =
+        ?wait_async_action(
+            query_resource(Config, Request),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+    ?assertMatch(
+        [[?PAYLOAD]],
+        connect_and_query(Config, "SELECT payload FROM " ++ ClientId)
+    ),
+
+    ?assertMatch(
+        [[0]],
+        connect_and_query(Config, "DROP TABLE " ++ ClientId)
+    ).
+
+t_auto_create_batch_insert(Config0) ->
+    ClientId1 = "client1",
+    ClientId2 = "client2",
+    Config = get_auto_create_config(Config0),
+
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+
+    Size1 = 2,
+    Size2 = 3,
+
+    Ts = erlang:system_time(millisecond),
+    {_, {ok, #{result := Result}}} =
+        ?wait_async_action(
+            lists:foreach(
+                fun({Offset, ClientId, Size}) ->
+                    lists:foreach(
+                        fun(Idx) ->
+                            SentData = #{
+                                payload => ?PAYLOAD,
+                                timestamp => Ts + Idx + Offset,
+                                clientid => ClientId
+                            },
+                            Request = {send_message, SentData},
+                            query_resource(Config, Request)
+                        end,
+                        lists:seq(1, Size)
+                    )
+                end,
+                [{0, ClientId1, Size1}, {100, ClientId2, Size2}]
+            ),
+            #{?snk_kind := buffer_worker_flush_ack},
+            2_000
+        ),
+
+    timer:sleep(200),
+
+    ?assertMatch(
+        [[Size1]],
+        connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1)
+    ),
+
+    ?assertMatch(
+        [[Size2]],
+        connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2)
+    ),
+
+    ?assertMatch(
+        [[0]],
+        connect_and_query(Config, "DROP TABLE " ++ ClientId1)
+    ),
+
+    ?assertMatch(
+        [[0]],
+        connect_and_query(Config, "DROP TABLE " ++ ClientId2)
+    ).
+
 to_bin(List) when is_list(List) ->
     unicode:characters_to_binary(List, utf8);
 to_bin(Bin) when is_binary(Bin) ->
     Bin.
+
+to_str(Atom) when is_atom(Atom) ->
+    erlang:atom_to_list(Atom).
+
+get_auto_create_config(Config0) ->
+    Config = lists:keyreplace(template, 1, Config0, {template, ?AUTO_CREATE_BRIDGE}),
+    BridgeType = proplists:get_value(bridge_type, Config, <<"tdengine">>),
+    {_Name, TDConf} = tdengine_config(BridgeType, Config),
+    lists:keyreplace(tdengine_config, 1, Config, {tdengine_config, TDConf}).