Просмотр исходного кода

Merge pull request #8844 from thalesmg/mongodb-bridge

feat: add mongodb bridge (e5.0)
Xinyu Liu 3 лет назад
Родитель
Сommit
23b6ff399d

+ 4 - 4
.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml

@@ -18,7 +18,7 @@ services:
       --ipv6
       --ipv6
       --bind_ip_all
       --bind_ip_all
       --replSet rs0
       --replSet rs0
-      
+
   mongo2:
   mongo2:
     hostname: mongo2
     hostname: mongo2
     container_name: mongo2
     container_name: mongo2
@@ -54,10 +54,10 @@ services:
       --ipv6
       --ipv6
       --bind_ip_all
       --bind_ip_all
       --replSet rs0
       --replSet rs0
-      
-  mongo_client:
+
+  mongo_rs_client:
     image: mongo:${MONGO_TAG}
     image: mongo:${MONGO_TAG}
-    container_name: mongo_client
+    container_name: mongo_rs_client
     networks:
     networks:
       - emqx_bridge
       - emqx_bridge
     depends_on:
     depends_on:

+ 90 - 0
.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml

@@ -0,0 +1,90 @@
+version: "3"
+
+services:
+  mongosharded1:
+    hostname: mongosharded1
+    container_name: mongosharded1
+    image: mongo:${MONGO_TAG}
+    environment:
+      MONGO_INITDB_DATABASE: mqtt
+    networks:
+      - emqx_bridge
+    expose:
+      - 27017
+    ports:
+      - 27014:27017
+    restart: always
+    command:
+      --configsvr
+      --replSet cfg0
+      --port 27017
+      --ipv6
+      --bind_ip_all
+
+  mongosharded2:
+    hostname: mongosharded2
+    container_name: mongosharded2
+    image: mongo:${MONGO_TAG}
+    environment:
+      MONGO_INITDB_DATABASE: mqtt
+    networks:
+      - emqx_bridge
+    expose:
+      - 27017
+    ports:
+      - 27015:27017
+    restart: always
+    command:
+      --shardsvr
+      --replSet rs0
+      --port 27017
+      --ipv6
+      --bind_ip_all
+
+  mongosharded3:
+    hostname: mongosharded3
+    container_name: mongosharded3
+    image: mongo:${MONGO_TAG}
+    environment:
+      MONGO_INITDB_DATABASE: mqtt
+    networks:
+      - emqx_bridge
+    expose:
+      - 27017
+    ports:
+      - 27016:27017
+    restart: always
+    entrypoint: mongos
+    command:
+      --configdb cfg0/mongosharded1:27017
+      --port 27017
+      --ipv6
+      --bind_ip_all
+
+  mongosharded_client:
+    image: mongo:${MONGO_TAG}
+    container_name: mongosharded_client
+    networks:
+      - emqx_bridge
+    depends_on:
+      - mongosharded1
+      - mongosharded2
+      - mongosharded3
+    command:
+      - /bin/bash
+      - -c
+      - |
+        while ! mongo --host mongosharded1 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do
+            sleep 1
+        done
+        mongo --host mongosharded1 --eval "rs.initiate( { _id : 'cfg0', configsvr: true, members: [ { _id : 0, host : 'mongosharded1:27017' } ] })"
+        while ! mongo --host mongosharded2 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1  ; do
+            sleep 1
+        done
+        mongo --host mongosharded2 --eval "rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : 'mongosharded2:27017' } ] })"
+        mongo --host mongosharded2 --eval "rs.status()"
+        while ! mongo --host mongosharded3 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1  ; do
+            sleep 1
+        done
+        mongo --host mongosharded3 --eval "sh.addShard('rs0/mongosharded2:27017')"
+        mongo --host mongosharded3 --eval "sh.enableSharding('mqtt')"

+ 15 - 4
.github/workflows/elixir_release.yml

@@ -12,7 +12,18 @@ on:
 jobs:
 jobs:
   elixir_release_build:
   elixir_release_build:
     runs-on: ubuntu-latest
     runs-on: ubuntu-latest
-    container: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04
+    strategy:
+      matrix:
+        otp:
+          - 24.2.1-1
+        elixir:
+          - 1.13.4
+        os:
+          - ubuntu20.04
+        profile:
+          - emqx
+          - emqx-enterprise
+    container: ghcr.io/emqx/emqx-builder/5.0-17:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }}
 
 
     steps:
     steps:
       - name: Checkout
       - name: Checkout
@@ -23,15 +34,15 @@ jobs:
         run: |
         run: |
           git config --global --add safe.directory "$GITHUB_WORKSPACE"
           git config --global --add safe.directory "$GITHUB_WORKSPACE"
       - name: elixir release
       - name: elixir release
-        run: make emqx-elixir
+        run: make ${{ matrix.profile }}-elixir
       - name: start release
       - name: start release
         run: |
         run: |
-          cd _build/emqx/rel/emqx
+          cd _build/${{ matrix.profile }}/rel/emqx
           bin/emqx start
           bin/emqx start
       - name: check if started
       - name: check if started
         run: |
         run: |
           sleep 10
           sleep 10
           nc -zv localhost 1883
           nc -zv localhost 1883
-          cd _build/emqx/rel/emqx
+          cd _build/${{ matrix.profile }}/rel/emqx
           bin/emqx ping
           bin/emqx ping
           bin/emqx ctl status
           bin/emqx ctl status

+ 7 - 0
.github/workflows/run_test_cases.yaml

@@ -124,6 +124,8 @@ jobs:
             docker-compose \
             docker-compose \
                 -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \
                 -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \
+                -f .ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml \
+                -f .ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mysql-tls.yaml \
                 -f .ci/docker-compose-file/docker-compose-mysql-tls.yaml \
                 -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
@@ -135,6 +137,11 @@ jobs:
                 -f .ci/docker-compose-file/docker-compose.yaml \
                 -f .ci/docker-compose-file/docker-compose.yaml \
                 up -d --build
                 up -d --build
 
 
+        - name: wait some services to be fully up
+          run: |
+            docker wait mongosharded_client
+            docker wait mongo_rs_client
+
           # produces <app-name>.coverdata
           # produces <app-name>.coverdata
         - name: run common test
         - name: run common test
           working-directory: source
           working-directory: source

+ 18 - 0
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -41,12 +41,16 @@ api_schema(Method) ->
     hoconsc:union(Broker ++ EE).
     hoconsc:union(Broker ++ EE).
 
 
 ee_api_schemas(Method) ->
 ee_api_schemas(Method) ->
+    %% must ensure the app is loaded before checking if fn is defined.
+    ensure_loaded(emqx_ee_bridge, emqx_ee_bridge),
     case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of
     case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of
         true -> emqx_ee_bridge:api_schemas(Method);
         true -> emqx_ee_bridge:api_schemas(Method);
         false -> []
         false -> []
     end.
     end.
 
 
 ee_fields_bridges() ->
 ee_fields_bridges() ->
+    %% must ensure the app is loaded before checking if fn is defined.
+    ensure_loaded(emqx_ee_bridge, emqx_ee_bridge),
     case erlang:function_exported(emqx_ee_bridge, fields, 1) of
     case erlang:function_exported(emqx_ee_bridge, fields, 1) of
         true -> emqx_ee_bridge:fields(bridges);
         true -> emqx_ee_bridge:fields(bridges);
         false -> []
         false -> []
@@ -154,3 +158,17 @@ status() ->
 
 
 node_name() ->
 node_name() ->
     {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
     {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
+
+%%=================================================================================================
+%% Internal fns
+%%=================================================================================================
+
+ensure_loaded(App, Mod) ->
+    try
+        _ = application:load(App),
+        _ = Mod:module_info(),
+        ok
+    catch
+        _:_ ->
+            ok
+    end.

+ 72 - 5
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -37,7 +37,7 @@
 
 
 -export([roots/0, fields/1, desc/1]).
 -export([roots/0, fields/1, desc/1]).
 
 
--export([mongo_query/5, check_worker_health/1]).
+-export([mongo_query/5, mongo_insert/3, check_worker_health/1]).
 
 
 -define(HEALTH_CHECK_TIMEOUT, 30000).
 -define(HEALTH_CHECK_TIMEOUT, 30000).
 
 
@@ -47,6 +47,10 @@
     default_port => ?MONGO_DEFAULT_PORT
     default_port => ?MONGO_DEFAULT_PORT
 }).
 }).
 
 
+-ifdef(TEST).
+-export([to_servers_raw/1]).
+-endif.
+
 %%=====================================================================
 %%=====================================================================
 roots() ->
 roots() ->
     [
     [
@@ -177,9 +181,16 @@ on_start(
         {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
         {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
     ],
     ],
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
+    Collection = maps:get(collection, Config, <<"mqtt">>),
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of
-        ok -> {ok, #{poolname => PoolName, type => Type}};
-        {error, Reason} -> {error, Reason}
+        ok ->
+            {ok, #{
+                poolname => PoolName,
+                type => Type,
+                collection => Collection
+            }};
+        {error, Reason} ->
+            {error, Reason}
     end.
     end.
 
 
 on_stop(InstId, #{poolname := PoolName}) ->
 on_stop(InstId, #{poolname := PoolName}) ->
@@ -189,6 +200,35 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
 
+on_query(
+    InstId,
+    {send_message, Document},
+    #{poolname := PoolName, collection := Collection} = State
+) ->
+    Request = {insert, Collection, Document},
+    ?TRACE(
+        "QUERY",
+        "mongodb_connector_received",
+        #{request => Request, connector => InstId, state => State}
+    ),
+    case
+        ecpool:pick_and_do(
+            PoolName,
+            {?MODULE, mongo_insert, [Collection, Document]},
+            no_handover
+        )
+    of
+        {{false, Reason}, _Document} ->
+            ?SLOG(error, #{
+                msg => "mongodb_connector_do_query_failed",
+                request => Request,
+                reason => Reason,
+                connector => InstId
+            }),
+            {error, Reason};
+        {{true, _Info}, _Document} ->
+            ok
+    end;
 on_query(
 on_query(
     InstId,
     InstId,
     {Action, Collection, Filter, Projector},
     {Action, Collection, Filter, Projector},
@@ -292,6 +332,9 @@ mongo_query(Conn, find_one, Collection, Filter, Projector) ->
 mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) ->
 mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) ->
     ok.
     ok.
 
 
+mongo_insert(Conn, Collection, Documents) ->
+    mongo_api:insert(Conn, Collection, Documents).
+
 init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) ->
 init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) ->
     {rs, ReplicaSetName};
     {rs, ReplicaSetName};
 init_type(#{mongo_type := Type}) ->
 init_type(#{mongo_type := Type}) ->
@@ -408,7 +451,7 @@ may_parse_srv_and_txt_records_(
         true ->
         true ->
             error({missing_parameter, replica_set_name});
             error({missing_parameter, replica_set_name});
         false ->
         false ->
-            Config#{hosts => servers_to_bin(Servers)}
+            Config#{hosts => servers_to_bin(lists:flatten(Servers))}
     end;
     end;
 may_parse_srv_and_txt_records_(
 may_parse_srv_and_txt_records_(
     #{
     #{
@@ -518,9 +561,33 @@ to_servers_raw(Servers) ->
         fun(Server) ->
         fun(Server) ->
             emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS)
             emqx_connector_schema_lib:parse_server(Server, ?MONGO_HOST_OPTIONS)
         end,
         end,
-        string:tokens(str(Servers), ", ")
+        split_servers(Servers)
     ).
     ).
 
 
+split_servers(L) when is_list(L) ->
+    PossibleTypes = [
+        list(binary()),
+        list(string()),
+        string()
+    ],
+    TypeChecks = lists:map(fun(T) -> typerefl:typecheck(T, L) end, PossibleTypes),
+    case TypeChecks of
+        [ok, _, _] ->
+            %% list(binary())
+            lists:map(fun binary_to_list/1, L);
+        [_, ok, _] ->
+            %% list(string())
+            L;
+        [_, _, ok] ->
+            %% string()
+            string:tokens(L, ", ");
+        [_, _, _] ->
+            %% invalid input
+            throw("List of servers must contain only strings")
+    end;
+split_servers(B) when is_binary(B) ->
+    string:tokens(str(B), ", ").
+
 str(A) when is_atom(A) ->
 str(A) when is_atom(A) ->
     atom_to_list(A);
     atom_to_list(A);
 str(B) when is_binary(B) ->
 str(B) when is_binary(B) ->

+ 168 - 0
apps/emqx_connector/test/emqx_connector_mongo_tests.erl

@@ -0,0 +1,168 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_connector_mongo_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+-define(DEFAULT_MONGO_PORT, 27017).
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+%%------------------------------------------------------------------------------
+%% Test cases
+%%------------------------------------------------------------------------------
+
+to_servers_raw_test_() ->
+    [
+        {"single server, binary, no port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw(<<"localhost">>)
+                )
+            )},
+        {"single server, string, no port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw("localhost")
+                )
+            )},
+        {"single server, list(binary), no port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw([<<"localhost">>])
+                )
+            )},
+        {"single server, list(string), no port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw(["localhost"])
+                )
+            )},
+        %%%%%%%%%
+        {"single server, binary, with port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw(<<"localhost:9999">>)
+                )
+            )},
+        {"single server, string, with port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw("localhost:9999")
+                )
+            )},
+        {"single server, list(binary), with port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", 9999}],
+                    emqx_connector_mongo:to_servers_raw([<<"localhost:9999">>])
+                )
+            )},
+        {"single server, list(string), with port",
+            ?_test(
+                ?assertEqual(
+                    [{"localhost", 9999}], emqx_connector_mongo:to_servers_raw(["localhost:9999"])
+                )
+            )},
+        %%%%%%%%%
+        {"multiple servers, string, no port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw("host1, host2")
+                )
+            )},
+        {"multiple servers, binary, no port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw(<<"host1, host2">>)
+                )
+            )},
+        {"multiple servers, list(string), no port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw(["host1", "host2"])
+                )
+            )},
+        {"multiple servers, list(binary), no port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", ?DEFAULT_MONGO_PORT}, {"host2", ?DEFAULT_MONGO_PORT}],
+                    emqx_connector_mongo:to_servers_raw([<<"host1">>, <<"host2">>])
+                )
+            )},
+        %%%%%%%%%
+        {"multiple servers, string, with port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", 1234}, {"host2", 2345}],
+                    emqx_connector_mongo:to_servers_raw("host1:1234, host2:2345")
+                )
+            )},
+        {"multiple servers, binary, with port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", 1234}, {"host2", 2345}],
+                    emqx_connector_mongo:to_servers_raw(<<"host1:1234, host2:2345">>)
+                )
+            )},
+        {"multiple servers, list(string), with port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", 1234}, {"host2", 2345}],
+                    emqx_connector_mongo:to_servers_raw(["host1:1234", "host2:2345"])
+                )
+            )},
+        {"multiple servers, list(binary), with port",
+            ?_test(
+                ?assertEqual(
+                    [{"host1", 1234}, {"host2", 2345}],
+                    emqx_connector_mongo:to_servers_raw([<<"host1:1234">>, <<"host2:2345">>])
+                )
+            )},
+        %%%%%%%%
+        {"multiple servers, invalid list(string)",
+            ?_test(
+                ?assertThrow(
+                    _,
+                    emqx_connector_mongo:to_servers_raw(["host1, host2"])
+                )
+            )},
+        {"multiple servers, invalid list(binary)",
+            ?_test(
+                ?assertThrow(
+                    _,
+                    emqx_connector_mongo:to_servers_raw([<<"host1, host2">>])
+                )
+            )},
+        %% TODO: handle this case??
+        {"multiple servers, mixed list(binary|string)",
+            ?_test(
+                ?assertThrow(
+                    _,
+                    emqx_connector_mongo:to_servers_raw([<<"host1">>, "host2"])
+                )
+            )}
+    ].

+ 67 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf

@@ -0,0 +1,67 @@
+emqx_ee_bridge_mongodb {
+  desc_config {
+    desc {
+      en: "Configuration for MongoDB Bridge"
+      zh: "为MongoDB桥配置"
+    }
+    label {
+      en: "MongoDB Bridge Configuration"
+      zh: "MongoDB桥配置"
+    }
+  }
+
+  enable {
+    desc {
+      en: "Enable or disable this MongoDB Bridge"
+      zh: "启用或停用该MongoDB桥"
+    }
+    label {
+      en: "Enable or disable"
+      zh: "启用或禁用"
+    }
+  }
+
+  collection {
+    desc {
+      en: "The collection where data will be stored into"
+      zh: "数据将被存储到的集合"
+    }
+    label {
+      en: "Collection to be used"
+      zh: "将要使用的藏品"
+    }
+  }
+
+  mongodb_rs_conf {
+    desc {
+      en: "MongoDB (Replica Set) configuration"
+      zh: "MongoDB(Replica Set)配置"
+    }
+    label {
+      en: "MongoDB (Replica Set) Configuration"
+      zh: "MongoDB(Replica Set)配置"
+    }
+  }
+
+  mongodb_sharded_conf {
+    desc {
+      en: "MongoDB (Sharded) configuration"
+      zh: "MongoDB (Sharded)配置"
+    }
+    label {
+      en: "MongoDB (Sharded) Configuration"
+      zh: "MongoDB (Sharded)配置"
+    }
+  }
+
+  mongodb_single_conf {
+    desc {
+      en: "MongoDB (Standalone) configuration"
+      zh: "MongoDB(独立)配置"
+    }
+    label {
+      en: "MongoDB (Standalone) Configuration"
+      zh: "MongoDB(独立)配置"
+    }
+  }
+}

+ 5 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,5 +1,9 @@
 {erl_opts, [debug_info]}.
 {erl_opts, [debug_info]}.
-{deps, []}.
+{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
 
 
 {shell, [
 {shell, [
     {apps, [emqx_ee_bridge]}
     {apps, [emqx_ee_bridge]}

+ 17 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -15,6 +15,9 @@
 api_schemas(Method) ->
 api_schemas(Method) ->
     [
     [
         ref(emqx_ee_bridge_mysql, Method),
         ref(emqx_ee_bridge_mysql, Method),
+        ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
+        ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
+        ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
         ref(emqx_ee_bridge_hstreamdb, Method),
         ref(emqx_ee_bridge_hstreamdb, Method),
         ref(emqx_ee_bridge_influxdb, Method ++ "_udp"),
         ref(emqx_ee_bridge_influxdb, Method ++ "_udp"),
         ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
         ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
@@ -25,6 +28,7 @@ schema_modules() ->
     [
     [
         emqx_ee_bridge_hstreamdb,
         emqx_ee_bridge_hstreamdb,
         emqx_ee_bridge_influxdb,
         emqx_ee_bridge_influxdb,
+        emqx_ee_bridge_mongodb,
         emqx_ee_bridge_mysql
         emqx_ee_bridge_mysql
     ].
     ].
 
 
@@ -42,6 +46,9 @@ examples(Method) ->
 
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
+resource_type(mongodb_rs) -> emqx_connector_mongo;
+resource_type(mongodb_sharded) -> emqx_connector_mongo;
+resource_type(mongodb_single) -> emqx_connector_mongo;
 resource_type(mysql) -> emqx_connector_mysql;
 resource_type(mysql) -> emqx_connector_mysql;
 resource_type(influxdb_udp) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_udp) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
@@ -59,7 +66,16 @@ fields(bridges) ->
                 hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
                 hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
                 #{desc => <<"EMQX Enterprise Config">>}
                 #{desc => <<"EMQX Enterprise Config">>}
             )}
             )}
-    ] ++ fields(influxdb);
+    ] ++ fields(mongodb) ++ fields(influxdb);
+fields(mongodb) ->
+    [
+        {Type,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)),
+                #{desc => <<"EMQX Enterprise Config">>}
+            )}
+     || Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
+    ];
 fields(influxdb) ->
 fields(influxdb) ->
     [
     [
         {Protocol,
         {Protocol,

+ 154 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl

@@ -0,0 +1,154 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_mongodb).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-behaviour(hocon_schema).
+
+%% emqx_ee_bridge "callbacks"
+-export([
+    conn_bridge_examples/1
+]).
+
+%% hocon_schema callbacks
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%%=================================================================================================
+%% hocon_schema API
+%%=================================================================================================
+
+namespace() ->
+    "bridge_mongodb".
+
+roots() ->
+    [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
+        {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}
+    ];
+fields(mongodb_rs) ->
+    emqx_connector_mongo:fields(rs) ++ fields("config");
+fields(mongodb_sharded) ->
+    emqx_connector_mongo:fields(sharded) ++ fields("config");
+fields(mongodb_single) ->
+    emqx_connector_mongo:fields(single) ++ fields("config");
+fields("post_rs") ->
+    fields(mongodb_rs);
+fields("post_sharded") ->
+    fields(mongodb_sharded);
+fields("post_single") ->
+    fields(mongodb_single);
+fields("put_rs") ->
+    fields(mongodb_rs);
+fields("put_sharded") ->
+    fields(mongodb_sharded);
+fields("put_single") ->
+    fields(mongodb_single);
+fields("get_rs") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_rs);
+fields("get_sharded") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_sharded);
+fields("get_single") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_single).
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"mongodb_rs">> => #{
+                summary => <<"MongoDB (Replica Set) Bridge">>,
+                value => values(mongodb_rs, Method)
+            }
+        },
+        #{
+            <<"mongodb_sharded">> => #{
+                summary => <<"MongoDB (Sharded) Bridge">>,
+                value => values(mongodb_sharded, Method)
+            }
+        },
+        #{
+            <<"mongodb_single">> => #{
+                summary => <<"MongoDB (Standalone) Bridge">>,
+                value => values(mongodb_single, Method)
+            }
+        }
+    ].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(mongodb_rs) ->
+    ?DESC(mongodb_rs_conf);
+desc(mongodb_sharded) ->
+    ?DESC(mongodb_sharded_conf);
+desc(mongodb_single) ->
+    ?DESC(mongodb_single_conf);
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for MongoDB using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+%%=================================================================================================
+%% Internal fns
+%%=================================================================================================
+
+values(mongodb_rs = MongoType, Method) ->
+    TypeOpts = #{
+        servers => <<"localhost:27017, localhost:27018">>,
+        w_mode => <<"safe">>,
+        r_mode => <<"safe">>,
+        replica_set_name => <<"rs">>
+    },
+    values(common, MongoType, Method, TypeOpts);
+values(mongodb_sharded = MongoType, Method) ->
+    TypeOpts = #{
+        servers => <<"localhost:27017, localhost:27018">>,
+        w_mode => <<"safe">>
+    },
+    values(common, MongoType, Method, TypeOpts);
+values(mongodb_single = MongoType, Method) ->
+    TypeOpts = #{
+        server => <<"localhost:27017">>,
+        w_mode => <<"safe">>
+    },
+    values(common, MongoType, Method, TypeOpts).
+
+values(common, MongoType, Method, TypeOpts) ->
+    MongoTypeBin = atom_to_binary(MongoType),
+    Common = #{
+        name => <<MongoTypeBin/binary, "_demo">>,
+        type => MongoTypeBin,
+        enable => true,
+        collection => <<"mycol">>,
+        database => <<"mqtt">>,
+        srv_record => false,
+        pool_size => 8,
+        username => <<"myuser">>,
+        password => <<"mypass">>
+    },
+    MethodVals = method_values(MongoType, Method),
+    Vals0 = maps:merge(MethodVals, Common),
+    maps:merge(Vals0, TypeOpts).
+
+method_values(MongoType, get) ->
+    Vals = method_values(MongoType, post),
+    maps:merge(?METRICS_EXAMPLE, Vals);
+method_values(MongoType, _) ->
+    ConnectorType =
+        case MongoType of
+            mongodb_rs -> <<"rs">>;
+            mongodb_sharded -> <<"sharded">>;
+            mongodb_single -> <<"single">>
+        end,
+    #{mongo_type => ConnectorType}.

+ 272 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl

@@ -0,0 +1,272 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_mongodb_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, rs},
+        {group, sharded},
+        {group, single}
+        | (emqx_common_test_helpers:all(?MODULE) -- group_tests())
+    ].
+
+group_tests() ->
+    [
+        t_setup_via_config_and_publish,
+        t_setup_via_http_api_and_publish
+    ].
+
+groups() ->
+    [
+        {rs, group_tests()},
+        {sharded, group_tests()},
+        {single, group_tests()}
+    ].
+
+init_per_group(Type = rs, Config) ->
+    MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"),
+    MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            ensure_loaded(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type),
+            [
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort},
+                {mongo_config, MongoConfig},
+                {mongo_type, Type},
+                {mongo_name, Name}
+                | Config
+            ];
+        false ->
+            {skip, no_mongo}
+    end;
+init_per_group(Type = sharded, Config) ->
+    MongoHost = os:getenv("MONGO_SHARDED_HOST", "mongosharded3"),
+    MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            ensure_loaded(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type),
+            [
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort},
+                {mongo_config, MongoConfig},
+                {mongo_type, Type},
+                {mongo_name, Name}
+                | Config
+            ];
+        false ->
+            {skip, no_mongo}
+    end;
+init_per_group(Type = single, Config) ->
+    MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
+    MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            ensure_loaded(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type),
+            [
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort},
+                {mongo_config, MongoConfig},
+                {mongo_type, Type},
+                {mongo_name, Name}
+                | Config
+            ];
+        false ->
+            {skip, no_mongo}
+    end.
+
+end_per_group(_Type, _Config) ->
+    ok.
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    catch clear_db(Config),
+    delete_bridge(Config),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    catch clear_db(Config),
+    delete_bridge(Config),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+ensure_loaded() ->
+    _ = application:load(emqx_ee_bridge),
+    _ = emqx_ee_bridge:module_info(),
+    ok.
+
+mongo_type_bin(rs) ->
+    <<"mongodb_rs">>;
+mongo_type_bin(sharded) ->
+    <<"mongodb_sharded">>;
+mongo_type_bin(single) ->
+    <<"mongodb_single">>.
+
+mongo_config(MongoHost, MongoPort0, rs = Type) ->
+    MongoPort = integer_to_list(MongoPort0),
+    Servers = MongoHost ++ ":" ++ MongoPort,
+    Name = atom_to_binary(?MODULE),
+    ConfigString =
+        io_lib:format(
+            "bridges.mongodb_rs.~s {\n"
+            "  enable = true\n"
+            "  collection = mycol\n"
+            "  replica_set_name = rs0\n"
+            "  servers = [~p]\n"
+            "  w_mode = safe\n"
+            "  database = mqtt\n"
+            "}",
+            [Name, Servers]
+        ),
+    {Name, parse_and_check(ConfigString, Type, Name)};
+mongo_config(MongoHost, MongoPort0, sharded = Type) ->
+    MongoPort = integer_to_list(MongoPort0),
+    Servers = MongoHost ++ ":" ++ MongoPort,
+    Name = atom_to_binary(?MODULE),
+    ConfigString =
+        io_lib:format(
+            "bridges.mongodb_sharded.~s {\n"
+            "  enable = true\n"
+            "  collection = mycol\n"
+            "  servers = [~p]\n"
+            "  w_mode = safe\n"
+            "  database = mqtt\n"
+            "}",
+            [Name, Servers]
+        ),
+    {Name, parse_and_check(ConfigString, Type, Name)};
+mongo_config(MongoHost, MongoPort0, single = Type) ->
+    MongoPort = integer_to_list(MongoPort0),
+    Server = MongoHost ++ ":" ++ MongoPort,
+    Name = atom_to_binary(?MODULE),
+    ConfigString =
+        io_lib:format(
+            "bridges.mongodb_single.~s {\n"
+            "  enable = true\n"
+            "  collection = mycol\n"
+            "  server = ~p\n"
+            "  w_mode = safe\n"
+            "  database = mqtt\n"
+            "}",
+            [Name, Server]
+        ),
+    {Name, parse_and_check(ConfigString, Type, Name)}.
+
+parse_and_check(ConfigString, Type, Name) ->
+    {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+    TypeBin = mongo_type_bin(Type),
+    hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+    #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
+    Config.
+
+create_bridge(Config) ->
+    Type = mongo_type_bin(?config(mongo_type, Config)),
+    Name = ?config(mongo_name, Config),
+    MongoConfig = ?config(mongo_config, Config),
+    emqx_bridge:create(Type, Name, MongoConfig).
+
+delete_bridge(Config) ->
+    Type = mongo_type_bin(?config(mongo_type, Config)),
+    Name = ?config(mongo_name, Config),
+    emqx_bridge:remove(Type, Name).
+
+create_bridge_http(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+clear_db(Config) ->
+    Type = mongo_type_bin(?config(mongo_type, Config)),
+    Name = ?config(mongo_name, Config),
+    #{<<"collection">> := Collection} = ?config(mongo_config, Config),
+    ResourceID = emqx_bridge_resource:resource_id(Type, Name),
+    {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
+    Selector = #{},
+    {true, _} = ecpool:pick_and_do(
+        PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover
+    ),
+    ok.
+
+find_all(Config) ->
+    Type = mongo_type_bin(?config(mongo_type, Config)),
+    Name = ?config(mongo_name, Config),
+    #{<<"collection">> := Collection} = ?config(mongo_config, Config),
+    ResourceID = emqx_bridge_resource:resource_id(Type, Name),
+    emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}).
+
+send_message(Config, Payload) ->
+    Name = ?config(mongo_name, Config),
+    Type = mongo_type_bin(?config(mongo_type, Config)),
+    BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
+    emqx_bridge:send_message(BridgeID, Payload).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_setup_via_config_and_publish(Config) ->
+    ?assertMatch(
+        {ok, _},
+        create_bridge(Config)
+    ),
+    Val = erlang:unique_integer(),
+    ok = send_message(Config, #{key => Val}),
+    ?assertMatch(
+        {ok, [#{<<"key">> := Val}]},
+        find_all(Config)
+    ),
+    ok.
+
+t_setup_via_http_api_and_publish(Config) ->
+    Type = mongo_type_bin(?config(mongo_type, Config)),
+    Name = ?config(mongo_name, Config),
+    MongoConfig0 = ?config(mongo_config, Config),
+    MongoConfig = MongoConfig0#{
+        <<"name">> => Name,
+        <<"type">> => Type
+    },
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(MongoConfig)
+    ),
+    Val = erlang:unique_integer(),
+    ok = send_message(Config, #{key => Val}),
+    ?assertMatch(
+        {ok, [#{<<"key">> := Val}]},
+        find_all(Config)
+    ),
+    ok.

+ 2 - 1
lib-ee/emqx_ee_connector/rebar.config

@@ -1,7 +1,8 @@
 {erl_opts, [debug_info]}.
 {erl_opts, [debug_info]}.
 {deps, [
 {deps, [
   {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
   {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
-  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}}
+  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}},
+  {emqx, {path, "../../apps/emqx"}}
 ]}.
 ]}.
 
 
 {shell, [
 {shell, [

+ 1 - 0
scripts/docker-ct-apps

@@ -2,3 +2,4 @@
 apps/emqx_authn
 apps/emqx_authn
 apps/emqx_authz
 apps/emqx_authz
 apps/emqx_connector
 apps/emqx_connector
+lib-ee/emqx_ee_bridge