Sfoglia il codice sorgente

Merge pull request #12072 from killme2008/feature/greptimedb-async-write

feat: implements async query mode for GreptimeDB data bridge
JianBo He 2 anni fa
parent
commit
bd985e52d7

+ 1 - 1
.ci/docker-compose-file/docker-compose-greptimedb.yaml

@@ -4,7 +4,7 @@ services:
   greptimedb:
     container_name: greptimedb
     hostname: greptimedb
-    image: greptime/greptimedb:0.3.2
+    image: greptime/greptimedb:v0.4.4
     expose:
       - "4000"
       - "4001"

+ 1 - 1
apps/emqx_bridge_greptimedb/rebar.config

@@ -6,7 +6,7 @@
        {emqx_connector, {path, "../../apps/emqx_connector"}},
        {emqx_resource, {path, "../../apps/emqx_resource"}},
        {emqx_bridge, {path, "../../apps/emqx_bridge"}},
-       {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.2"}}}
+       {greptimedb, {git, "https://github.com/GreptimeTeam/greptimedb-client-erl", {tag, "v0.1.6"}}}
 ]}.
 {plugins, [rebar3_path_deps]}.
 {project_plugins, [erlfmt]}.

+ 72 - 1
apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl

@@ -21,8 +21,11 @@
     on_stop/2,
     on_query/3,
     on_batch_query/3,
+    on_query_async/4,
+    on_batch_query_async/4,
     on_get_status/2
 ]).
+-export([reply_callback/2]).
 
 -export([
     roots/0,
@@ -57,7 +60,7 @@
 
 %% -------------------------------------------------------------------------------------------------
 %% resource callback
-callback_mode() -> always_sync.
+callback_mode() -> async_if_possible.
 
 on_start(InstId, Config) ->
     %% InstID as pool would be handled by greptimedb client
@@ -110,6 +113,49 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
             {error, {unrecoverable_error, Reason}}
     end.
 
+on_query_async(
+    InstId,
+    {send_message, Data},
+    {ReplyFun, Args},
+    _State = #{write_syntax := SyntaxLines, client := Client}
+) ->
+    case data_to_points(Data, SyntaxLines) of
+        {ok, Points} ->
+            ?tp(
+                greptimedb_connector_send_query,
+                #{points => Points, batch => false, mode => async}
+            ),
+            do_async_query(InstId, Client, Points, {ReplyFun, Args});
+        {error, ErrorPoints} = Err ->
+            ?tp(
+                greptimedb_connector_send_query_error,
+                #{batch => false, mode => async, error => ErrorPoints}
+            ),
+            log_error_points(InstId, ErrorPoints),
+            Err
+    end.
+
+on_batch_query_async(
+    InstId,
+    BatchData,
+    {ReplyFun, Args},
+    #{write_syntax := SyntaxLines, client := Client}
+) ->
+    case parse_batch_data(InstId, BatchData, SyntaxLines) of
+        {ok, Points} ->
+            ?tp(
+                greptimedb_connector_send_query,
+                #{points => Points, batch => true, mode => async}
+            ),
+            do_async_query(InstId, Client, Points, {ReplyFun, Args});
+        {error, Reason} ->
+            ?tp(
+                greptimedb_connector_send_query_error,
+                #{batch => true, mode => async, error => Reason}
+            ),
+            {error, {unrecoverable_error, Reason}}
+    end.
+
 on_get_status(_InstId, #{client := Client}) ->
     case greptimedb:is_alive(Client) of
         true ->
@@ -344,6 +390,31 @@ do_query(InstId, Client, Points) ->
             end
     end.
 
+do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
+    ?SLOG(info, #{
+        msg => "greptimedb_write_point_async",
+        connector => InstId,
+        points => Points
+    }),
+    WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
+    ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs).
+
+reply_callback(ReplyFunAndArgs, {error, {unauth, _, _}}) ->
+    ?tp(greptimedb_connector_do_query_failure, #{error => <<"authorization failure">>}),
+    Result = {error, {unrecoverable_error, <<"authorization failure">>}},
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
+reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
+    case is_unrecoverable_error(Error) of
+        true ->
+            Result = {error, {unrecoverable_error, Reason}},
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
+        false ->
+            Result = {error, {recoverable_error, Reason}},
+            emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
+    end;
+reply_callback(ReplyFunAndArgs, Result) ->
+    emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
+
 %% -------------------------------------------------------------------------------------------------
 %% Tags & Fields Config Trans
 

+ 80 - 10
apps/emqx_bridge_greptimedb/test/emqx_bridge_greptimedb_SUITE.erl

@@ -25,16 +25,23 @@ groups() ->
     TCs = emqx_common_test_helpers:all(?MODULE),
     [
         {with_batch, [
-            {group, sync_query}
+            {group, sync_query},
+            {group, async_query}
         ]},
         {without_batch, [
-            {group, sync_query}
+            {group, sync_query},
+            {group, async_query}
         ]},
         {sync_query, [
             {group, grpcv1_tcp}
             %% uncomment tls when we are ready
             %% {group, grpcv1_tls}
         ]},
+        {async_query, [
+            {group, grpcv1_tcp}
+            %% uncomment tls when we are ready
+            %% {group, grpcv1_tls}
+        ]},
         {grpcv1_tcp, TCs}
         %%{grpcv1_tls, TCs}
     ].
@@ -130,6 +137,8 @@ init_per_group(GreptimedbType, Config0) when
     end;
 init_per_group(sync_query, Config) ->
     [{query_mode, sync} | Config];
+init_per_group(async_query, Config) ->
+    [{query_mode, async} | Config];
 init_per_group(with_batch, Config) ->
     [{batch_size, 100} | Config];
 init_per_group(without_batch, Config) ->
@@ -420,6 +429,9 @@ t_start_ok(Config) ->
     ?check_trace(
         begin
             case QueryMode of
+                async ->
+                    ?assertMatch(ok, send_message(Config, SentData)),
+                    ct:sleep(500);
                 sync ->
                     ?assertMatch({ok, _}, send_message(Config, SentData))
             end,
@@ -666,6 +678,9 @@ t_const_timestamp(Config) ->
         <<"timestamp">> => erlang:system_time(millisecond)
     },
     case QueryMode of
+        async ->
+            ?assertMatch(ok, send_message(Config, SentData)),
+            ct:sleep(500);
         sync ->
             ?assertMatch({ok, _}, send_message(Config, SentData))
     end,
@@ -709,9 +724,12 @@ t_boolean_variants(Config) ->
             },
             case QueryMode of
                 sync ->
-                    ?assertMatch({ok, _}, send_message(Config, SentData))
+                    ?assertMatch({ok, _}, send_message(Config, SentData));
+                async ->
+                    ?assertMatch(ok, send_message(Config, SentData))
             end,
             case QueryMode of
+                async -> ct:sleep(500);
                 sync -> ok
             end,
             PersistedData = query_by_clientid(atom_to_binary(?FUNCTION_NAME), ClientId, Config),
@@ -779,11 +797,29 @@ t_bad_timestamp(Config) ->
             #{?snk_kind := greptimedb_connector_send_query_error},
             10_000
         ),
-        fun(Result, _Trace) ->
+        fun(Result, Trace) ->
             ?assertMatch({_, {ok, _}}, Result),
             {Return, {ok, _}} = Result,
             IsBatch = BatchSize > 1,
             case {QueryMode, IsBatch} of
+                {async, true} ->
+                    ?assertEqual(ok, Return),
+                    ?assertMatch(
+                        [#{error := points_trans_failed}],
+                        ?of_kind(greptimedb_connector_send_query_error, Trace)
+                    );
+                {async, false} ->
+                    ?assertEqual(ok, Return),
+                    ?assertMatch(
+                        [
+                            #{
+                                error := [
+                                    {error, {bad_timestamp, <<"bad_timestamp">>}}
+                                ]
+                            }
+                        ],
+                        ?of_kind(greptimedb_connector_send_query_error, Trace)
+                    );
                 {sync, false} ->
                     ?assertEqual(
                         {error, [
@@ -907,17 +943,34 @@ t_write_failure(Config) ->
                             {error, {resource_error, #{reason := timeout}}},
                             send_message(Config, SentData)
                         ),
-                        #{?snk_kind := greptimedb_connector_do_query_failure, action := nack},
-                        16_000
+                        #{?snk_kind := handle_async_reply, action := nack},
+                        1_000
+                    );
+                async ->
+                    ?wait_async_action(
+                        ?assertEqual(ok, send_message(Config, SentData)),
+                        #{?snk_kind := handle_async_reply},
+                        1_000
                     )
             end
         end),
-        fun(Trace) ->
+        fun(Trace0) ->
             case QueryMode of
                 sync ->
-                    ?assertMatch(
-                        [#{error := _} | _],
-                        ?of_kind(greptimedb_connector_do_query_failure, Trace)
+                    Trace = ?of_kind(handle_async_reply, Trace0),
+                    ?assertMatch([_ | _], Trace),
+                    [#{result := Result} | _] = Trace,
+                    ?assert(
+                        not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
+                        #{got => Result}
+                    );
+                async ->
+                    Trace = ?of_kind(handle_async_reply, Trace0),
+                    ?assertMatch([_ | _], Trace),
+                    [#{result := Result} | _] = Trace,
+                    ?assert(
+                        not emqx_bridge_greptimedb_connector:is_unrecoverable_error(Result),
+                        #{got => Result}
                     )
             end,
             ok
@@ -1029,6 +1082,23 @@ t_authentication_error_on_send_message(Config0) ->
             ?assertMatch(
                 {error, {unrecoverable_error, <<"authorization failure">>}},
                 send_message(Config, SentData)
+            );
+        async ->
+            ?check_trace(
+                begin
+                    ?wait_async_action(
+                        ?assertEqual(ok, send_message(Config, SentData)),
+                        #{?snk_kind := handle_async_reply},
+                        1_000
+                    )
+                end,
+                fun(Trace) ->
+                    ?assertMatch(
+                        [#{error := <<"authorization failure">>} | _],
+                        ?of_kind(greptimedb_connector_do_query_failure, Trace)
+                    ),
+                    ok
+                end
             )
     end,
     ok.

+ 3 - 0
changes/ee/feat-12072.en.md

@@ -0,0 +1,3 @@
+Supports async query mode for GreptimeDB data bridge. It provides better performance.
+
+

+ 1 - 1
mix.exs

@@ -236,7 +236,7 @@ defmodule EMQXUmbrella.MixProject do
       {:crc32cer, "0.1.8", override: true},
       {:supervisor3, "1.1.12", override: true},
       {:opentsdb, github: "emqx/opentsdb-client-erl", tag: "v0.5.1", override: true},
-      {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.2", override: true},
+      {:greptimedb, github: "GreptimeTeam/greptimedb-client-erl", tag: "v0.1.6", override: true},
       # The following two are dependencies of rabbit_common. They are needed here to
       # make mix not complain about conflicting versions
       {:thoas, github: "emqx/thoas", tag: "v1.0.0", override: true},