Procházet zdrojové kódy

Merge pull request #11547 from SergeTupchiy/EMQX-10180-fix-function_clause-bridge-errors

fix function_clause errors in data bridges
SergeTupchiy před 2 roky
rodič
revize
cf655f25ca

+ 0 - 4
.ci/docker-compose-file/cassandra/Dockerfile

@@ -1,4 +0,0 @@
-ARG CASSANDRA_TAG=3.11.6
-FROM cassandra:${CASSANDRA_TAG}
-COPY cassandra.yaml /etc/cassandra/cassandra.yaml
-CMD ["cassandra", "-f"]

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 1236 - 0
.ci/docker-compose-file/cassandra/cassandra_noauth.yaml


+ 31 - 25
.ci/docker-compose-file/docker-compose-cassandra.yaml

@@ -1,32 +1,38 @@
 version: '3.9'
 
+x-cassandra: &cassandra
+  restart: always
+  image: cassandra:${CASSANDRA_TAG:-3.11.6}
+  environment:
+    CASSANDRA_BROADCAST_ADDRESS: "1.2.3.4"
+    CASSANDRA_RPC_ADDRESS: "0.0.0.0"
+    HEAP_NEWSIZE: "128M"
+    MAX_HEAP_SIZE: "2048M"
+  #ports:
+  #  - "9042:9042"
+  #  - "9142:9142"
+  command:
+    - /bin/bash
+    - -c
+    - |
+      /opt/cassandra/bin/cassandra -f -R > /cassandra.log &
+      /opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e "CREATE KEYSPACE mqtt WITH REPLICATION = { 'class':'SimpleStrategy','replication_factor':1};"
+      while [[ $$? -ne 0 ]];do sleep 5; /opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e "CREATE KEYSPACE mqtt WITH REPLICATION = { 'class':'SimpleStrategy','replication_factor':1};"; done
+      /opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e "describe keyspaces;"
+      tail -f /cassandra.log
+  networks:
+    - emqx_bridge
+
 services:
   cassandra_server:
+    <<: *cassandra
     container_name: cassandra
-    build:
-      context: ./cassandra
-      args:
-        CASSANDRA_TAG: ${CASSANDRA_TAG}
-    image: emqx-cassandra
-    restart: always
-    environment:
-      CASSANDRA_BROADCAST_ADDRESS: "1.2.3.4"
-      CASSANDRA_RPC_ADDRESS: "0.0.0.0"
-      HEAP_NEWSIZE: "128M"
-      MAX_HEAP_SIZE: "2048M"
     volumes:
       - ./certs:/certs
-    #ports:
-    #  - "9042:9042"
-    #  - "9142:9142"
-    command:
-      - /bin/bash
-      - -c
-      - |
-        /opt/cassandra/bin/cassandra -f -R > /cassandra.log &
-        /opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e "CREATE KEYSPACE mqtt WITH REPLICATION = { 'class':'SimpleStrategy','replication_factor':1};"
-        while [[ $$? -ne 0 ]];do sleep 5; /opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e "CREATE KEYSPACE mqtt WITH REPLICATION = { 'class':'SimpleStrategy','replication_factor':1};"; done
-        /opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e "describe keyspaces;"
-        tail -f /cassandra.log
-    networks:
-      - emqx_bridge
+      - ./cassandra/cassandra.yaml:/etc/cassandra/cassandra.yaml
+  cassandra_noauth_server:
+    <<: *cassandra
+    container_name: cassandra_noauth
+    volumes:
+      - ./certs:/certs
+      - ./cassandra/cassandra_noauth.yaml:/etc/cassandra/cassandra.yaml

+ 1 - 1
apps/emqx/rebar.config

@@ -31,7 +31,7 @@
     {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.10"}}},
     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
     {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}},
-    {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
+    {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}},
     {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
     {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
     {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}

+ 1 - 1
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_cassandra, [
     {description, "EMQX Enterprise Cassandra Bridge"},
-    {vsn, "0.1.3"},
+    {vsn, "0.1.4"},
     {registered, []},
     {applications, [
         kernel,

+ 19 - 4
apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl

@@ -94,7 +94,6 @@ on_start(
     #{
         servers := Servers0,
         keyspace := Keyspace,
-        username := Username,
         pool_size := PoolSize,
         ssl := SSL
     } = Config
@@ -114,12 +113,12 @@ on_start(
 
     Options = [
         {nodes, Servers},
-        {username, Username},
-        {password, emqx_secret:wrap(maps:get(password, Config, ""))},
         {keyspace, Keyspace},
         {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
         {pool_size, PoolSize}
     ],
+    Options1 = maybe_add_opt(username, Config, Options),
+    Options2 = maybe_add_opt(password, Config, Options1, _IsSensitive = true),
 
     SslOpts =
         case maps:get(enable, SSL) of
@@ -132,7 +131,7 @@ on_start(
                 []
         end,
     State = parse_prepare_cql(Config),
-    case emqx_resource_pool:start(InstId, ?MODULE, Options ++ SslOpts) of
+    case emqx_resource_pool:start(InstId, ?MODULE, Options2 ++ SslOpts) of
         ok ->
             {ok, init_prepare(State#{pool_name => InstId, prepare_statement => #{}})};
         {error, Reason} ->
@@ -513,3 +512,19 @@ maybe_assign_type(V) when is_integer(V) ->
 maybe_assign_type(V) when is_float(V) -> {double, V};
 maybe_assign_type(V) ->
     V.
+
+maybe_add_opt(Key, Conf, Opts) ->
+    maybe_add_opt(Key, Conf, Opts, _IsSensitive = false).
+
+maybe_add_opt(Key, Conf, Opts, IsSensitive) ->
+    case Conf of
+        #{Key := Val} ->
+            [{Key, maybe_wrap(IsSensitive, Val)} | Opts];
+        _ ->
+            Opts
+    end.
+
+maybe_wrap(false = _IsSensitive, Val) ->
+    Val;
+maybe_wrap(true, Val) ->
+    emqx_secret:wrap(Val).

+ 47 - 24
apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_connector_SUITE.erl

@@ -7,15 +7,17 @@
 -compile(nowarn_export_all).
 -compile(export_all).
 
+-include_lib("common_test/include/ct.hrl").
 -include("emqx_bridge_cassandra.hrl").
 -include("emqx_connector/include/emqx_connector.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("stdlib/include/assert.hrl").
 
-%% Cassandra server defined at `.ci/docker-compose-file/docker-compose-cassandra-tcp.yaml`
+%% Cassandra servers are defined at `.ci/docker-compose-file/docker-compose-cassandra.yaml`
 %% You can change it to `127.0.0.1`, if you run this SUITE locally
 -define(CASSANDRA_HOST, "cassandra").
+-define(CASSANDRA_HOST_NOAUTH, "cassandra_noauth").
 -define(CASSANDRA_RESOURCE_MOD, emqx_bridge_cassandra_connector).
 
 %% This test SUITE requires a running cassandra instance. If you don't want to
@@ -32,40 +34,58 @@
 -define(CASSA_PASSWORD, <<"cassandra">>).
 
 all() ->
-    emqx_common_test_helpers:all(?MODULE).
+    [
+        {group, auth},
+        {group, noauth}
+    ].
 
 groups() ->
-    [].
+    TCs = emqx_common_test_helpers:all(?MODULE),
+    [
+        {auth, TCs},
+        {noauth, TCs}
+    ].
 
-cassandra_servers() ->
+cassandra_servers(CassandraHost) ->
     lists:map(
         fun(#{hostname := Host, port := Port}) ->
             {Host, Port}
         end,
         emqx_schema:parse_servers(
-            iolist_to_binary([?CASSANDRA_HOST, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
+            iolist_to_binary([CassandraHost, ":", erlang:integer_to_list(?CASSANDRA_DEFAULT_PORT)]),
             #{default_port => ?CASSANDRA_DEFAULT_PORT}
         )
     ).
 
 init_per_suite(Config) ->
-    case
-        emqx_common_test_helpers:is_tcp_server_available(?CASSANDRA_HOST, ?CASSANDRA_DEFAULT_PORT)
-    of
+    ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+    ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
+    {ok, _} = application:ensure_all_started(emqx_connector),
+    Config.
+
+init_per_group(Group, Config) ->
+    {CassandraHost, AuthOpts} =
+        case Group of
+            auth ->
+                {?CASSANDRA_HOST, [{username, ?CASSA_USERNAME}, {password, ?CASSA_PASSWORD}]};
+            noauth ->
+                {?CASSANDRA_HOST_NOAUTH, []}
+        end,
+    case emqx_common_test_helpers:is_tcp_server_available(CassandraHost, ?CASSANDRA_DEFAULT_PORT) of
         true ->
-            ok = emqx_common_test_helpers:start_apps([emqx_conf]),
-            ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
-            {ok, _} = application:ensure_all_started(emqx_connector),
             %% keyspace `mqtt` must be created in advance
             {ok, Conn} =
                 ecql:connect([
-                    {nodes, cassandra_servers()},
-                    {username, ?CASSA_USERNAME},
-                    {password, ?CASSA_PASSWORD},
+                    {nodes, cassandra_servers(CassandraHost)},
                     {keyspace, "mqtt"}
+                    | AuthOpts
                 ]),
             ecql:close(Conn),
-            Config;
+            [
+                {cassa_host, CassandraHost},
+                {cassa_auth_opts, AuthOpts}
+                | Config
+            ];
         false ->
             case os:getenv("IS_CI") of
                 "yes" ->
@@ -75,6 +95,9 @@ init_per_suite(Config) ->
             end
     end.
 
+end_per_group(_Group, _Config) ->
+    ok.
+
 end_per_suite(_Config) ->
     ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
     ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
@@ -90,10 +113,10 @@ end_per_testcase(_, _Config) ->
 %% cases
 %%--------------------------------------------------------------------
 
-t_lifecycle(_Config) ->
+t_lifecycle(Config) ->
     perform_lifecycle_check(
         <<"emqx_connector_cassandra_SUITE">>,
-        cassandra_config()
+        cassandra_config(Config)
     ).
 
 show(X) ->
@@ -168,25 +191,25 @@ perform_lifecycle_check(ResourceId, InitialConfig) ->
 %% utils
 %%--------------------------------------------------------------------
 
-cassandra_config() ->
-    Config =
-        #{
+cassandra_config(Config) ->
+    Host = ?config(cassa_host, Config),
+    AuthOpts = maps:from_list(?config(cassa_auth_opts, Config)),
+    CassConfig =
+        AuthOpts#{
             auto_reconnect => true,
             keyspace => <<"mqtt">>,
-            username => ?CASSA_USERNAME,
-            password => ?CASSA_PASSWORD,
             pool_size => 8,
             servers => iolist_to_binary(
                 io_lib:format(
                     "~s:~b",
                     [
-                        ?CASSANDRA_HOST,
+                        Host,
                         ?CASSANDRA_DEFAULT_PORT
                     ]
                 )
             )
         },
-    #{<<"config">> => Config}.
+    #{<<"config">> => CassConfig}.
 
 test_query_no_params() ->
     {query, <<"SELECT count(1) AS T FROM system.local">>}.

+ 1 - 1
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver.app.src

@@ -1,6 +1,6 @@
 {application, emqx_bridge_sqlserver, [
     {description, "EMQX Enterprise SQL Server Bridge"},
-    {vsn, "0.1.2"},
+    {vsn, "0.1.3"},
     {registered, []},
     {applications, [kernel, stdlib, emqx_resource, odbc]},
     {env, []},

+ 2 - 3
apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl

@@ -173,7 +173,6 @@ on_start(
     #{
         server := Server,
         username := Username,
-        password := Password,
         driver := Driver,
         database := Database,
         pool_size := PoolSize,
@@ -200,7 +199,7 @@ on_start(
     Options = [
         {server, to_bin(Server)},
         {username, Username},
-        {password, Password},
+        {password, emqx_secret:wrap(maps:get(password, Config, ""))},
         {driver, Driver},
         {database, Database},
         {pool_size, PoolSize}
@@ -320,7 +319,7 @@ conn_str([{database, Database} | Opts], Acc) ->
 conn_str([{username, Username} | Opts], Acc) ->
     conn_str(Opts, ["UID=" ++ str(Username) | Acc]);
 conn_str([{password, Password} | Opts], Acc) ->
-    conn_str(Opts, ["PWD=" ++ str(Password) | Acc]);
+    conn_str(Opts, ["PWD=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
 conn_str([{_, _} | Opts], Acc) ->
     conn_str(Opts, Acc).
 

+ 4 - 7
apps/emqx_connector/src/emqx_connector_pgsql.erl

@@ -85,13 +85,10 @@ server() ->
 adjust_fields(Fields) ->
     lists:map(
         fun
-            ({username, OrigUsernameFn}) ->
-                {username, fun
-                    (required) ->
-                        true;
-                    (Any) ->
-                        OrigUsernameFn(Any)
-                end};
+            ({username, Sc}) ->
+                %% to please dialyzer...
+                Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
+                {username, hocon_schema:override(Sc, Override)};
             (Field) ->
                 Field
         end,

+ 1 - 1
apps/emqx_oracle/src/emqx_oracle.app.src

@@ -1,6 +1,6 @@
 {application, emqx_oracle, [
     {description, "EMQX Enterprise Oracle Database Connector"},
-    {vsn, "0.1.5"},
+    {vsn, "0.1.6"},
     {registered, []},
     {applications, [
         kernel,

+ 14 - 1
apps/emqx_oracle/src/emqx_oracle_schema.erl

@@ -21,7 +21,7 @@ roots() ->
 fields(config) ->
     Fields =
         [{server, server()}, {sid, fun sid/1}, {service_name, fun service_name/1}] ++
-            emqx_connector_schema_lib:relational_db_fields() ++
+            adjust_fields(emqx_connector_schema_lib:relational_db_fields()) ++
             emqx_connector_schema_lib:prepare_statement_fields(),
     proplists:delete(database, Fields).
 
@@ -38,3 +38,16 @@ service_name(type) -> binary();
 service_name(desc) -> ?DESC(?REF_MODULE, "service_name");
 service_name(required) -> false;
 service_name(_) -> undefined.
+
+adjust_fields(Fields) ->
+    lists:map(
+        fun
+            ({username, Sc}) ->
+                %% to please dialyzer...
+                Override = #{type => hocon_schema:field_schema(Sc, type), required => true},
+                {username, hocon_schema:override(Sc, Override)};
+            (Field) ->
+                Field
+        end,
+        Fields
+    ).

+ 7 - 0
changes/ee/fix-11547.en.md

@@ -0,0 +1,7 @@
+Fix several emqx_bridge issues:
+
+- fix Cassandra bridge connect error occurring when the bridge is configured without username/password 
+  (Cassandra doesn't require user credentials when it is configured with `authenticator: AllowAllAuthenticator`)
+- fix SQL Server bridge connect error caused by an empty password
+- make `username` a required field in Oracle bridge
+- fix IoTDB bridge error caused by setting base URL without scheme (e.g. `<host>:<port>`)

+ 1 - 1
mix.exs

@@ -73,7 +73,7 @@ defmodule EMQXUmbrella.MixProject do
       {:getopt, "1.0.2", override: true},
       {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
       {:hocon, github: "emqx/hocon", tag: "0.39.16", override: true},
-      {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
+      {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.3", override: true},
       {:esasl, github: "emqx/esasl", tag: "0.2.0"},
       {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
       # in conflict by ehttpc and emqtt

+ 1 - 1
rebar.config

@@ -76,7 +76,7 @@
     , {getopt, "1.0.2"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
     , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.16"}}}
-    , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
+    , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.3"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
     , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
     , {telemetry, "1.1.0"}