瀏覽代碼

Merge pull request #11478 from JimMoen/hstreamdb-tls-support

Hstreamdb tls support
JimMoen 2 年之前
父節點
當前提交
13ebcd6290

+ 1 - 1
.ci/docker-compose-file/.env

@@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11.6
 MINIO_TAG=RELEASE.2023-03-20T20-16-18Z
 OPENTS_TAG=9aa7f88
 KINESIS_TAG=2.1
-HSTREAMDB_TAG=v0.15.0
+HSTREAMDB_TAG=v0.16.1
 HSTREAMDB_ZK_TAG=3.8.1
 
 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server

+ 1 - 1
apps/emqx_bridge/src/emqx_bridge.app.src

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {application, emqx_bridge, [
     {description, "EMQX bridges"},
-    {vsn, "0.1.26"},
+    {vsn, "0.1.27"},
     {registered, [emqx_bridge_sup]},
     {mod, {emqx_bridge_app, []}},
     {applications, [

+ 2 - 4
apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl

@@ -32,8 +32,7 @@ api_schemas(Method) ->
         api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"),
         api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"),
         api_ref(emqx_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"),
-        %% TODO: un-hide for e5.2.0...
-        %%api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method),
+        api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method),
         api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"),
         api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"),
         api_ref(emqx_bridge_redis, <<"redis_single">>, Method ++ "_single"),
@@ -147,8 +146,7 @@ fields(bridges) ->
                 hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config")),
                 #{
                     desc => <<"HStreamDB Bridge Config">>,
-                    required => false,
-                    importance => ?IMPORTANCE_HIDDEN
+                    required => false
                 }
             )},
         {mysql,

+ 1 - 1
apps/emqx_bridge_hstreamdb/rebar.config

@@ -1,7 +1,7 @@
 %% -*- mode: erlang -*-
 {erl_opts, [debug_info]}.
 {deps, [
-  {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.3.1+v0.12.0"}}},
+  {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.3+v0.16.1"}}},
   {emqx, {path, "../../apps/emqx"}},
   {emqx_utils, {path, "../../apps/emqx_utils"}}
 ]}.

+ 1 - 1
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_hstreamdb, [
     {description, "EMQX Enterprise HStreamDB Bridge"},
-    {vsn, "0.1.1"},
+    {vsn, "0.1.2"},
     {registered, []},
     {applications, [
         kernel,

+ 57 - 51
apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl

@@ -75,7 +75,7 @@ on_query(
     }
 ) ->
     try to_record(PartitionKey, HRecordTemplate, Data) of
-        Record -> append_record(Producer, Record)
+        Record -> append_record(Producer, Record, false)
     catch
         _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
     end.
@@ -88,7 +88,7 @@ on_batch_query(
     }
 ) ->
     try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
-        Records -> append_record(Producer, Records)
+        Records -> append_record(Producer, Records, true)
     catch
         _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
     end.
@@ -156,16 +156,29 @@ start_client(InstId, Config) ->
             {error, Error}
     end.
 
-do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
+do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) ->
     ?SLOG(info, #{
         msg => "starting hstreamdb connector: client",
         connector => InstId,
         config => Config
     }),
     ClientName = client_name(InstId),
+    RpcOpts =
+        case maps:get(enable, SSL) of
+            false ->
+                #{pool_size => PoolSize};
+            true ->
+                #{
+                    pool_size => PoolSize,
+                    gun_opts => #{
+                        transport => tls,
+                        transport_opts => emqx_tls_lib:to_client_opts(SSL)
+                    }
+                }
+        end,
     ClientOptions = [
         {url, binary_to_list(Server)},
-        {rpc_options, #{pool_size => PoolSize}}
+        {rpc_options, RpcOpts}
     ],
     case hstreamdb:start_client(ClientName, ClientOptions) of
         {ok, Client} ->
@@ -206,12 +219,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
     end.
 
 is_alive(Client) ->
-    case hstreamdb:echo(Client) of
-        {ok, _Echo} ->
-            true;
-        _ErrorEcho ->
-            false
-    end.
+    hstreamdb_client:echo(Client) =:= ok.
 
 start_producer(
     InstId,
@@ -280,54 +288,52 @@ to_record(PartitionKey, RawRecord) ->
     hstreamdb:to_record(PartitionKey, raw, RawRecord).
 
 to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
-    Records0 = lists:map(
+    lists:map(
         fun({send_message, Data}) ->
             to_record(PartitionKeyTmpl, HRecordTmpl, Data)
         end,
         BatchList
-    ),
-    PartitionKeys = proplists:get_keys(Records0),
-    [
-        {PartitionKey, proplists:get_all_values(PartitionKey, Records0)}
-     || PartitionKey <- PartitionKeys
-    ].
+    ).
 
-append_record(Producer, MultiPartsRecords) when is_list(MultiPartsRecords) ->
-    lists:foreach(fun(Record) -> append_record(Producer, Record) end, MultiPartsRecords);
-append_record(Producer, Record) when is_tuple(Record) ->
-    do_append_records(false, Producer, Record).
+append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) ->
+    lists:foreach(
+        fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords
+    );
+append_record(Producer, Record, MaybeBatch) when is_tuple(Record) ->
+    do_append_records(Producer, Record, MaybeBatch).
 
 %% TODO: only sync request supported. implement async request later.
-do_append_records(false, Producer, Record) ->
-    case hstreamdb:append_flush(Producer, Record) of
-        {ok, _Result} ->
-            ?tp(
-                hstreamdb_connector_query_return,
-                #{result => _Result}
-            ),
-            ?SLOG(debug, #{
-                msg => "HStreamDB producer sync append success",
-                record => Record
-            });
-        %% the HStream is warming up or buzy, something are not ready yet, retry after a while
-        {error, {unavailable, _} = Reason} ->
-            {error,
-                {recoverable_error, #{
-                    msg => "HStreamDB is warming up or buzy, will retry after a moment",
-                    reason => Reason
-                }}};
-        {error, Reason} = Err ->
-            ?tp(
-                hstreamdb_connector_query_return,
-                #{error => Reason}
-            ),
-            ?SLOG(error, #{
-                msg => "HStreamDB producer sync append failed",
-                reason => Reason,
-                record => Record
-            }),
-            Err
-    end.
+do_append_records(Producer, Record, true = IsBatch) ->
+    Result = hstreamdb:append(Producer, Record),
+    handle_result(Result, Record, IsBatch);
+do_append_records(Producer, Record, false = IsBatch) ->
+    Result = hstreamdb:append_flush(Producer, Record),
+    handle_result(Result, Record, IsBatch).
+
+handle_result(ok = Result, Record, IsBatch) ->
+    handle_result({ok, Result}, Record, IsBatch);
+handle_result({ok, Result}, Record, IsBatch) ->
+    ?tp(
+        hstreamdb_connector_query_append_return,
+        #{result => Result, is_batch => IsBatch}
+    ),
+    ?SLOG(debug, #{
+        msg => "HStreamDB producer sync append success",
+        record => Record,
+        is_batch => IsBatch
+    });
+handle_result({error, Reason} = Err, Record, IsBatch) ->
+    ?tp(
+        hstreamdb_connector_query_append_return,
+        #{error => Reason, is_batch => IsBatch}
+    ),
+    ?SLOG(error, #{
+        msg => "HStreamDB producer sync append failed",
+        reason => Reason,
+        record => Record,
+        is_batch => IsBatch
+    }),
+    Err.
 
 client_name(InstId) ->
     "client:" ++ to_string(InstId).

+ 56 - 21
apps/emqx_bridge_hstreamdb/test/emqx_bridge_hstreamdb_SUITE.erl

@@ -13,8 +13,13 @@
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 % SQL definitions
--define(STREAM, "stream").
+
+-define(STREAM, "demo_stream").
+%% could not be "stream" in Production Environment
+%% especially not in hstreamdb_sql CLI client
+
 -define(REPLICATION_FACTOR, 1).
+
 %% in seconds
 -define(BACKLOG_RETENTION_SECOND, (24 * 60 * 60)).
 -define(SHARD_COUNT, 1).
@@ -146,16 +151,23 @@ t_setup_via_config_and_publish(Config) ->
         begin
             ?wait_async_action(
                 ?assertEqual(ok, send_message(Config, Data)),
-                #{?snk_kind := hstreamdb_connector_query_return},
+                #{?snk_kind := hstreamdb_connector_query_append_return},
                 10_000
             ),
             ok
         end,
         fun(Trace0) ->
-            Trace = ?of_kind(hstreamdb_connector_query_return, Trace0),
+            Trace = ?of_kind(hstreamdb_connector_query_append_return, Trace0),
             lists:foreach(
                 fun(EachTrace) ->
-                    ?assertMatch(#{result := #{streamName := <<?STREAM>>}}, EachTrace)
+                    case ?config(enable_batch, Config) of
+                        true ->
+                            ?assertMatch(#{result := ok, is_batch := true}, EachTrace);
+                        false ->
+                            ?assertMatch(
+                                #{result := #{'batchId' := _}, is_batch := false}, EachTrace
+                            )
+                    end
                 end,
                 Trace
             ),
@@ -181,16 +193,26 @@ t_setup_via_http_api_and_publish(Config) ->
         begin
             ?wait_async_action(
                 ?assertEqual(ok, send_message(Config, Data)),
-                #{?snk_kind := hstreamdb_connector_query_return},
+                #{?snk_kind := hstreamdb_connector_query_append_return},
                 10_000
             ),
             ok
         end,
         fun(Trace) ->
-            ?assertMatch(
-                [#{result := #{streamName := <<?STREAM>>}}],
-                ?of_kind(hstreamdb_connector_query_return, Trace)
-            )
+            lists:foreach(
+                fun(EachTrace) ->
+                    case ?config(enable_batch, Config) of
+                        true ->
+                            ?assertMatch(#{result := ok, is_batch := true}, EachTrace);
+                        false ->
+                            ?assertMatch(
+                                #{result := #{'batchId' := _}, is_batch := false}, EachTrace
+                            )
+                    end
+                end,
+                ?of_kind(hstreamdb_connector_query_append_return, Trace)
+            ),
+            ok
         end
     ),
     ok.
@@ -240,6 +262,7 @@ t_write_failure(Config) ->
     ProxyPort = ?config(proxy_port, Config),
     ProxyHost = ?config(proxy_host, Config),
     QueryMode = ?config(query_mode, Config),
+    EnableBatch = ?config(enable_batch, Config),
     Data = rand_data(),
     {{ok, _}, {ok, _}} =
         ?wait_async_action(
@@ -251,10 +274,16 @@ t_write_failure(Config) ->
         health_check_resource_down(Config),
         case QueryMode of
             sync ->
-                ?assertMatch(
-                    {error, {resource_error, #{msg := "call resource timeout", reason := timeout}}},
-                    send_message(Config, Data)
-                );
+                case EnableBatch of
+                    true ->
+                        %% append to batch always returns ok
+                        ?assertMatch(ok, send_message(Config, Data));
+                    false ->
+                        ?assertMatch(
+                            {error, {cannot_list_shards, {<<?STREAM>>, econnrefused}}},
+                            send_message(Config, Data)
+                        )
+                end;
             async ->
                 %% TODO: async mode is not supported yet,
                 %% but it will return ok if calling emqx_resource_buffer_worker:async_query/3,
@@ -282,17 +311,23 @@ t_simple_query(Config) ->
                     end,
                     Requests
                 ),
-                #{?snk_kind := hstreamdb_connector_query_return},
+                #{?snk_kind := hstreamdb_connector_query_append_return},
                 10_000
             )
         end,
-        fun(Trace0) ->
-            Trace = ?of_kind(hstreamdb_connector_query_return, Trace0),
+        fun(Trace) ->
             lists:foreach(
                 fun(EachTrace) ->
-                    ?assertMatch(#{result := #{streamName := <<?STREAM>>}}, EachTrace)
+                    case ?config(enable_batch, Config) of
+                        true ->
+                            ?assertMatch(#{result := ok, is_batch := true}, EachTrace);
+                        false ->
+                            ?assertMatch(
+                                #{result := #{'batchId' := _}, is_batch := false}, EachTrace
+                            )
+                    end
                 end,
-                Trace
+                ?of_kind(hstreamdb_connector_query_append_return, Trace)
             ),
             ok
         end
@@ -432,7 +467,7 @@ client(Name, Config, N) ->
     try
         _ = hstreamdb:stop_client(Name),
         {ok, Client} = hstreamdb:start_client(Name, default_options(Config)),
-        {ok, echo} = hstreamdb:echo(Client),
+        ok = hstreamdb_client:echo(Client),
         Client
     catch
         Class:Error ->
@@ -509,7 +544,7 @@ health_check_resource_down(Config) ->
 % These funs start and then stop the hstreamdb connection
 connect_and_create_stream(Config) ->
     ?WITH_CLIENT(
-        _ = hstreamdb:create_stream(
+        _ = hstreamdb_client:create_stream(
             Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT
         )
     ),
@@ -531,7 +566,7 @@ connect_and_create_stream(Config) ->
 
 connect_and_delete_stream(Config) ->
     ?WITH_CLIENT(
-        _ = hstreamdb:delete_stream(Client, ?STREAM)
+        _ = hstreamdb_client:delete_stream(Client, ?STREAM)
     ).
 
 %%--------------------------------------------------------------------

+ 1 - 0
changes/ee/feat-11478.en.md

@@ -0,0 +1 @@
+Add HStreamDB bridge support (both TCP and TLS connection allowed), adapted to the HStreamDB `v0.16.1`.

+ 1 - 1
mix.exs

@@ -227,7 +227,7 @@ defmodule EMQXUmbrella.MixProject do
 
   defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
     [
-      {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.3.1+v0.12.0"},
+      {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.3+v0.16.1"},
       {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
       {:wolff, github: "kafka4beam/wolff", tag: "1.7.6"},
       {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},