Sfoglia il codice sorgente

feat: add mongodb bridge (e5.0)

Thales Macedo Garitezi 3 anni fa
parent
commit
3d4afd65df

+ 4 - 4
.ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml

@@ -18,7 +18,7 @@ services:
       --ipv6
       --bind_ip_all
       --replSet rs0
-      
+
   mongo2:
     hostname: mongo2
     container_name: mongo2
@@ -54,10 +54,10 @@ services:
       --ipv6
       --bind_ip_all
       --replSet rs0
-      
-  mongo_client:
+
+  mongo_rs_client:
     image: mongo:${MONGO_TAG}
-    container_name: mongo_client
+    container_name: mongo_rs_client
     networks:
       - emqx_bridge
     depends_on:

+ 90 - 0
.ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml

@@ -0,0 +1,90 @@
+version: "3"
+
+services:
+  mongosharded1:
+    hostname: mongosharded1
+    container_name: mongosharded1
+    image: mongo:${MONGO_TAG}
+    environment:
+      MONGO_INITDB_DATABASE: mqtt
+    networks:
+      - emqx_bridge
+    expose:
+      - 27017
+    ports:
+      - 27014:27017
+    restart: always
+    command:
+      --configsvr
+      --replSet cfg0
+      --port 27017
+      --ipv6
+      --bind_ip_all
+
+  mongosharded2:
+    hostname: mongosharded2
+    container_name: mongosharded2
+    image: mongo:${MONGO_TAG}
+    environment:
+      MONGO_INITDB_DATABASE: mqtt
+    networks:
+      - emqx_bridge
+    expose:
+      - 27017
+    ports:
+      - 27015:27017
+    restart: always
+    command:
+      --shardsvr
+      --replSet rs0
+      --port 27017
+      --ipv6
+      --bind_ip_all
+
+  mongosharded3:
+    hostname: mongosharded3
+    container_name: mongosharded3
+    image: mongo:${MONGO_TAG}
+    environment:
+      MONGO_INITDB_DATABASE: mqtt
+    networks:
+      - emqx_bridge
+    expose:
+      - 27017
+    ports:
+      - 27016:27017
+    restart: always
+    entrypoint: mongos
+    command:
+      --configdb cfg0/mongosharded1:27017
+      --port 27017
+      --ipv6
+      --bind_ip_all
+
+  mongosharded_client:
+    image: mongo:${MONGO_TAG}
+    container_name: mongosharded_client
+    networks:
+      - emqx_bridge
+    depends_on:
+      - mongosharded1
+      - mongosharded2
+      - mongosharded3
+    command:
+      - /bin/bash
+      - -c
+      - |
+        while ! mongo --host mongosharded1 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1 ; do
+            sleep 1
+        done
+        mongo --host mongosharded1 --eval "rs.initiate( { _id : 'cfg0', configsvr: true, members: [ { _id : 0, host : 'mongosharded1:27017' } ] })"
+        while ! mongo --host mongosharded2 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1  ; do
+            sleep 1
+        done
+        mongo --host mongosharded2 --eval "rs.initiate( { _id : 'rs0', members: [ { _id : 0, host : 'mongosharded2:27017' } ] })"
+        mongo --host mongosharded2 --eval "rs.status()"
+        while ! mongo --host mongosharded3 --eval 'db.runCommand("ping").ok' --quiet >/dev/null 2>&1  ; do
+            sleep 1
+        done
+        mongo --host mongosharded3 --eval "sh.addShard('rs0/mongosharded2:27017')"
+        mongo --host mongosharded3 --eval "sh.enableSharding('mqtt')"

+ 15 - 4
.github/workflows/elixir_release.yml

@@ -12,7 +12,18 @@ on:
 jobs:
   elixir_release_build:
     runs-on: ubuntu-latest
-    container: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04
+    strategy:
+      matrix:
+        otp:
+          - 24.2.1-1
+        elixir:
+          - 1.13.4
+        os:
+          - ubuntu20.04
+        profile:
+          - emqx
+          - emqx-enterprise
+    container: ghcr.io/emqx/emqx-builder/5.0-17:${{ matrix.elixir }}-${{ matrix.otp }}-${{ matrix.os }}
 
     steps:
       - name: Checkout
@@ -23,15 +34,15 @@ jobs:
         run: |
           git config --global --add safe.directory "$GITHUB_WORKSPACE"
       - name: elixir release
-        run: make emqx-elixir
+        run: make ${{ matrix.profile }}-elixir
       - name: start release
         run: |
-          cd _build/emqx/rel/emqx
+          cd _build/${{ matrix.profile }}/rel/emqx
           bin/emqx start
       - name: check if started
         run: |
           sleep 10
           nc -zv localhost 1883
-          cd _build/emqx/rel/emqx
+          cd _build/${{ matrix.profile }}/rel/emqx
           bin/emqx ping
           bin/emqx ctl status

+ 7 - 0
.github/workflows/run_test_cases.yaml

@@ -124,6 +124,8 @@ jobs:
             docker-compose \
                 -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \
+                -f .ci/docker-compose-file/docker-compose-mongo-replicaset-tcp.yaml \
+                -f .ci/docker-compose-file/docker-compose-mongo-sharded-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
                 -f .ci/docker-compose-file/docker-compose-mysql-tls.yaml \
                 -f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
@@ -135,6 +137,11 @@ jobs:
                 -f .ci/docker-compose-file/docker-compose.yaml \
                 up -d --build
 
+        - name: wait some services to be fully up
+          run: |
+            docker wait mongosharded_client
+            docker wait mongo_rs_client
+
           # produces <app-name>.coverdata
         - name: run common test
           working-directory: source

+ 18 - 0
apps/emqx_bridge/src/schema/emqx_bridge_schema.erl

@@ -41,12 +41,16 @@ api_schema(Method) ->
     hoconsc:union(Broker ++ EE).
 
 ee_api_schemas(Method) ->
+    %% must ensure the app is loaded before checking if fn is defined.
+    ensure_loaded(emqx_ee_bridge, emqx_ee_bridge),
     case erlang:function_exported(emqx_ee_bridge, api_schemas, 1) of
         true -> emqx_ee_bridge:api_schemas(Method);
         false -> []
     end.
 
 ee_fields_bridges() ->
+    %% must ensure the app is loaded before checking if fn is defined.
+    ensure_loaded(emqx_ee_bridge, emqx_ee_bridge),
     case erlang:function_exported(emqx_ee_bridge, fields, 1) of
         true -> emqx_ee_bridge:fields(bridges);
         false -> []
@@ -155,3 +159,17 @@ status() ->
 
 node_name() ->
     {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
+
+%%=================================================================================================
+%% Internal fns
+%%=================================================================================================
+
+ensure_loaded(App, Mod) ->
+    try
+        _ = application:load(App),
+        _ = Mod:module_info(),
+        ok
+    catch
+        _:_ ->
+            ok
+    end.

+ 42 - 3
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -37,7 +37,7 @@
 
 -export([roots/0, fields/1, desc/1]).
 
--export([mongo_query/5, check_worker_health/1]).
+-export([mongo_query/5, mongo_insert/3, check_worker_health/1]).
 
 -define(HEALTH_CHECK_TIMEOUT, 30000).
 
@@ -177,9 +177,16 @@ on_start(
         {worker_options, init_worker_options(maps:to_list(NConfig), SslOpts)}
     ],
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
+    Collection = maps:get(collection, Config, <<"mqtt">>),
     case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts) of
-        ok -> {ok, #{poolname => PoolName, type => Type}};
-        {error, Reason} -> {error, Reason}
+        ok ->
+            {ok, #{
+                poolname => PoolName,
+                type => Type,
+                collection => Collection
+            }};
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 on_stop(InstId, #{poolname := PoolName}) ->
@@ -189,6 +196,35 @@ on_stop(InstId, #{poolname := PoolName}) ->
     }),
     emqx_plugin_libs_pool:stop_pool(PoolName).
 
+on_query(
+    InstId,
+    {send_message, Document},
+    #{poolname := PoolName, collection := Collection} = State
+) ->
+    Request = {insert, Collection, Document},
+    ?TRACE(
+        "QUERY",
+        "mongodb_connector_received",
+        #{request => Request, connector => InstId, state => State}
+    ),
+    case
+        ecpool:pick_and_do(
+            PoolName,
+            {?MODULE, mongo_insert, [Collection, Document]},
+            no_handover
+        )
+    of
+        {{false, Reason}, _Document} ->
+            ?SLOG(error, #{
+                msg => "mongodb_connector_do_query_failed",
+                request => Request,
+                reason => Reason,
+                connector => InstId
+            }),
+            {error, Reason};
+        {{true, _Info}, _Document} ->
+            ok
+    end;
 on_query(
     InstId,
     {Action, Collection, Filter, Projector},
@@ -292,6 +328,9 @@ mongo_query(Conn, find_one, Collection, Filter, Projector) ->
 mongo_query(_Conn, _Action, _Collection, _Filter, _Projector) ->
     ok.
 
+mongo_insert(Conn, Collection, Documents) ->
+    mongo_api:insert(Conn, Collection, Documents).
+
 init_type(#{mongo_type := rs, replica_set_name := ReplicaSetName}) ->
     {rs, ReplicaSetName};
 init_type(#{mongo_type := Type}) ->

+ 67 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf

@@ -0,0 +1,67 @@
+emqx_ee_bridge_mongodb {
+  desc_config {
+    desc {
+      en: "Configuration for MongoDB Bridge"
+      zh: "为MongoDB桥配置"
+    }
+    label {
+      en: "MongoDB Bridge Configuration"
+      zh: "MongoDB桥配置"
+    }
+  }
+
+  enable {
+    desc {
+      en: "Enable or disable this MongoDB Bridge"
+      zh: "启用或停用该MongoDB桥"
+    }
+    label {
+      en: "Enable or disable"
+      zh: "启用或禁用"
+    }
+  }
+
+  collection {
+    desc {
+      en: "The collection where data will be stored into"
+      zh: "数据将被存储到的集合"
+    }
+    label {
+      en: "Collection to be used"
+      zh: "将要使用的藏品"
+    }
+  }
+
+  mongodb_rs_conf {
+    desc {
+      en: "MongoDB (Replica Set) configuration"
+      zh: "MongoDB(Replica Set)配置"
+    }
+    label {
+      en: "MongoDB (Replica Set) Configuration"
+      zh: "MongoDB(Replica Set)配置"
+    }
+  }
+
+  mongodb_sharded_conf {
+    desc {
+      en: "MongoDB (Sharded) configuration"
+      zh: "MongoDB (Sharded)配置"
+    }
+    label {
+      en: "MongoDB (Sharded) Configuration"
+      zh: "MongoDB (Sharded)配置"
+    }
+  }
+
+  mongodb_single_conf {
+    desc {
+      en: "MongoDB (Standalone) configuration"
+      zh: "MongoDB(独立)配置"
+    }
+    label {
+      en: "MongoDB (Standalone) Configuration"
+      zh: "MongoDB(独立)配置"
+    }
+  }
+}

+ 5 - 1
lib-ee/emqx_ee_bridge/rebar.config

@@ -1,5 +1,9 @@
 {erl_opts, [debug_info]}.
-{deps, []}.
+{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
+       , {emqx_connector, {path, "../../apps/emqx_connector"}}
+       , {emqx_resource, {path, "../../apps/emqx_resource"}}
+       , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+       ]}.
 
 {shell, [
     {apps, [emqx_ee_bridge]}

+ 17 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -15,6 +15,9 @@
 api_schemas(Method) ->
     [
         ref(emqx_ee_bridge_mysql, Method),
+        ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
+        ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
+        ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
         ref(emqx_ee_bridge_hstreamdb, Method),
         ref(emqx_ee_bridge_influxdb, Method ++ "_udp"),
         ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
@@ -25,6 +28,7 @@ schema_modules() ->
     [
         emqx_ee_bridge_hstreamdb,
         emqx_ee_bridge_influxdb,
+        emqx_ee_bridge_mongodb,
         emqx_ee_bridge_mysql
     ].
 
@@ -42,6 +46,9 @@ examples(Method) ->
 
 resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
+resource_type(mongodb_rs) -> emqx_connector_mongo;
+resource_type(mongodb_sharded) -> emqx_connector_mongo;
+resource_type(mongodb_single) -> emqx_connector_mongo;
 resource_type(mysql) -> emqx_connector_mysql;
 resource_type(influxdb_udp) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
@@ -59,7 +66,16 @@ fields(bridges) ->
                 hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
                 #{desc => <<"EMQX Enterprise Config">>}
             )}
-    ] ++ fields(influxdb);
+    ] ++ fields(mongodb) ++ fields(influxdb);
+fields(mongodb) ->
+    [
+        {Type,
+            mk(
+                hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)),
+                #{desc => <<"EMQX Enterprise Config">>}
+            )}
+     || Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
+    ];
 fields(influxdb) ->
     [
         {Protocol,

+ 154 - 0
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl

@@ -0,0 +1,154 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_mongodb).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-behaviour(hocon_schema).
+
+%% emqx_ee_bridge "callbacks"
+-export([
+    conn_bridge_examples/1
+]).
+
+%% hocon_schema callbacks
+-export([
+    namespace/0,
+    roots/0,
+    fields/1,
+    desc/1
+]).
+
+%%=================================================================================================
+%% hocon_schema API
+%%=================================================================================================
+
+namespace() ->
+    "bridge_mongodb".
+
+roots() ->
+    [].
+
+fields("config") ->
+    [
+        {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
+        {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}
+    ];
+fields(mongodb_rs) ->
+    emqx_connector_mongo:fields(rs) ++ fields("config");
+fields(mongodb_sharded) ->
+    emqx_connector_mongo:fields(sharded) ++ fields("config");
+fields(mongodb_single) ->
+    emqx_connector_mongo:fields(single) ++ fields("config");
+fields("post_rs") ->
+    fields(mongodb_rs);
+fields("post_sharded") ->
+    fields(mongodb_sharded);
+fields("post_single") ->
+    fields(mongodb_single);
+fields("put_rs") ->
+    fields(mongodb_rs);
+fields("put_sharded") ->
+    fields(mongodb_sharded);
+fields("put_single") ->
+    fields(mongodb_single);
+fields("get_rs") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_rs);
+fields("get_sharded") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_sharded);
+fields("get_single") ->
+    emqx_bridge_schema:metrics_status_fields() ++ fields(mongodb_single).
+
+conn_bridge_examples(Method) ->
+    [
+        #{
+            <<"mongodb_rs">> => #{
+                summary => <<"MongoDB (Replica Set) Bridge">>,
+                value => values(mongodb_rs, Method)
+            }
+        },
+        #{
+            <<"mongodb_sharded">> => #{
+                summary => <<"MongoDB (Sharded) Bridge">>,
+                value => values(mongodb_sharded, Method)
+            }
+        },
+        #{
+            <<"mongodb_single">> => #{
+                summary => <<"MongoDB (Standalone) Bridge">>,
+                value => values(mongodb_single, Method)
+            }
+        }
+    ].
+
+desc("config") ->
+    ?DESC("desc_config");
+desc(mongodb_rs) ->
+    ?DESC(mongodb_rs_conf);
+desc(mongodb_sharded) ->
+    ?DESC(mongodb_sharded_conf);
+desc(mongodb_single) ->
+    ?DESC(mongodb_single_conf);
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+    ["Configuration for MongoDB using `", string:to_upper(Method), "` method."];
+desc(_) ->
+    undefined.
+
+%%=================================================================================================
+%% Internal fns
+%%=================================================================================================
+
+values(mongodb_rs = MongoType, Method) ->
+    TypeOpts = #{
+        servers => <<"localhost:27017, localhost:27018">>,
+        w_mode => <<"safe">>,
+        r_mode => <<"safe">>,
+        replica_set_name => <<"rs">>
+    },
+    values(common, MongoType, Method, TypeOpts);
+values(mongodb_sharded = MongoType, Method) ->
+    TypeOpts = #{
+        servers => <<"localhost:27017, localhost:27018">>,
+        w_mode => <<"safe">>
+    },
+    values(common, MongoType, Method, TypeOpts);
+values(mongodb_single = MongoType, Method) ->
+    TypeOpts = #{
+        server => <<"localhost:27017">>,
+        w_mode => <<"safe">>
+    },
+    values(common, MongoType, Method, TypeOpts).
+
+values(common, MongoType, Method, TypeOpts) ->
+    MongoTypeBin = atom_to_binary(MongoType),
+    Common = #{
+        name => <<MongoTypeBin/binary, "_demo">>,
+        type => MongoTypeBin,
+        enable => true,
+        collection => <<"mycol">>,
+        database => <<"mqtt">>,
+        srv_record => false,
+        pool_size => 8,
+        username => <<"myuser">>,
+        password => <<"mypass">>
+    },
+    MethodVals = method_values(MongoType, Method),
+    Vals0 = maps:merge(MethodVals, Common),
+    maps:merge(Vals0, TypeOpts).
+
+method_values(MongoType, get) ->
+    Vals = method_values(MongoType, post),
+    maps:merge(?METRICS_EXAMPLE, Vals);
+method_values(MongoType, _) ->
+    ConnectorType =
+        case MongoType of
+            mongodb_rs -> <<"rs">>;
+            mongodb_sharded -> <<"sharded">>;
+            mongodb_single -> <<"single">>
+        end,
+    #{mongo_type => ConnectorType}.

+ 251 - 0
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl

@@ -0,0 +1,251 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_mongodb_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    [
+        {group, rs},
+        {group, sharded},
+        {group, single}
+        | (emqx_common_test_helpers:all(?MODULE) -- group_tests())
+    ].
+
+group_tests() ->
+    [
+        t_setup_via_config_and_publish,
+        t_setup_via_http_api_and_publish
+    ].
+
+groups() ->
+    [
+        {rs, group_tests()},
+        {sharded, group_tests()},
+        {single, group_tests()}
+    ].
+
+init_per_group(Type = rs, Config) ->
+    MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"),
+    MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            MongoConfig = mongo_config(MongoHost, MongoPort, Type),
+            [
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort},
+                {mongo_config, MongoConfig}
+                | Config
+            ];
+        false ->
+            {skip, no_mongo}
+    end;
+init_per_group(Type = sharded, Config) ->
+    MongoHost = os:getenv("MONGO_SHARDED_HOST", "mongosharded3"),
+    MongoPort = list_to_integer(os:getenv("MONGO_SHARDED_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            MongoConfig = mongo_config(MongoHost, MongoPort, Type),
+            [
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort},
+                {mongo_config, MongoConfig}
+                | Config
+            ];
+        false ->
+            {skip, no_mongo}
+    end;
+init_per_group(Type = single, Config) ->
+    MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
+    MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            _ = application:load(emqx_ee_bridge),
+            _ = emqx_ee_bridge:module_info(),
+            ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+            emqx_mgmt_api_test_util:init_suite(),
+            MongoConfig = mongo_config(MongoHost, MongoPort, Type),
+            [
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort},
+                {mongo_config, MongoConfig}
+                | Config
+            ];
+        false ->
+            {skip, no_mongo}
+    end.
+
+end_per_group(_Type, _Config) ->
+    ok.
+
+init_per_suite(Config) ->
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_mgmt_api_test_util:end_suite(),
+    ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]),
+    ok.
+
+init_per_testcase(_Testcase, Config) ->
+    catch clear_db(Config),
+    delete_bridge(Config),
+    Config.
+
+end_per_testcase(_Testcase, Config) ->
+    catch clear_db(Config),
+    delete_bridge(Config),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+mongo_config(MongoHost0, MongoPort0, rs) ->
+    MongoHost = list_to_binary(MongoHost0),
+    MongoPort = integer_to_binary(MongoPort0),
+    Servers = <<MongoHost/binary, ":", MongoPort/binary>>,
+    Name = atom_to_binary(?MODULE),
+    #{
+        <<"type">> => <<"mongodb_rs">>,
+        <<"name">> => Name,
+        <<"enable">> => true,
+        <<"collection">> => <<"mycol">>,
+        <<"servers">> => Servers,
+        <<"database">> => <<"mqtt">>,
+        <<"w_mode">> => <<"safe">>,
+        <<"replica_set_name">> => <<"rs0">>
+    };
+mongo_config(MongoHost0, MongoPort0, sharded) ->
+    MongoHost = list_to_binary(MongoHost0),
+    MongoPort = integer_to_binary(MongoPort0),
+    Servers = <<MongoHost/binary, ":", MongoPort/binary>>,
+    Name = atom_to_binary(?MODULE),
+    #{
+        <<"type">> => <<"mongodb_sharded">>,
+        <<"name">> => Name,
+        <<"enable">> => true,
+        <<"collection">> => <<"mycol">>,
+        <<"servers">> => Servers,
+        <<"database">> => <<"mqtt">>,
+        <<"w_mode">> => <<"safe">>
+    };
+mongo_config(MongoHost0, MongoPort0, single) ->
+    MongoHost = list_to_binary(MongoHost0),
+    MongoPort = integer_to_binary(MongoPort0),
+    Server = <<MongoHost/binary, ":", MongoPort/binary>>,
+    Name = atom_to_binary(?MODULE),
+    #{
+        <<"type">> => <<"mongodb_single">>,
+        <<"name">> => Name,
+        <<"enable">> => true,
+        <<"collection">> => <<"mycol">>,
+        <<"server">> => Server,
+        <<"database">> => <<"mqtt">>,
+        <<"w_mode">> => <<"safe">>
+    }.
+
+create_bridge(Config0 = #{<<"type">> := Type, <<"name">> := Name}) ->
+    Config = maps:without(
+        [
+            <<"type">>,
+            <<"name">>
+        ],
+        Config0
+    ),
+    emqx_bridge:create(Type, Name, Config).
+
+delete_bridge(Config) ->
+    #{
+        <<"type">> := Type,
+        <<"name">> := Name
+    } = ?config(mongo_config, Config),
+    emqx_bridge:remove(Type, Name).
+
+create_bridge_http(Params) ->
+    Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
+    AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
+    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
+        {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])};
+        Error -> Error
+    end.
+
+clear_db(Config) ->
+    #{
+        <<"name">> := Name,
+        <<"type">> := Type,
+        <<"collection">> := Collection
+    } = ?config(mongo_config, Config),
+    ResourceID = emqx_bridge_resource:resource_id(Type, Name),
+    {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
+    Selector = #{},
+    {true, _} = ecpool:pick_and_do(
+        PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover
+    ),
+    ok.
+
+find_all(Config) ->
+    #{
+        <<"name">> := Name,
+        <<"type">> := Type,
+        <<"collection">> := Collection
+    } = ?config(mongo_config, Config),
+    ResourceID = emqx_bridge_resource:resource_id(Type, Name),
+    emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}).
+
+send_message(Config, Payload) ->
+    #{
+        <<"name">> := Name,
+        <<"type">> := Type
+    } = ?config(mongo_config, Config),
+    BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
+    emqx_bridge:send_message(BridgeID, Payload).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_setup_via_config_and_publish(Config) ->
+    MongoConfig = ?config(mongo_config, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge(MongoConfig)
+    ),
+    Val = erlang:unique_integer(),
+    ok = send_message(Config, #{key => Val}),
+    ?assertMatch(
+        {ok, [#{<<"key">> := Val}]},
+        find_all(Config)
+    ),
+    ok.
+
+t_setup_via_http_api_and_publish(Config) ->
+    MongoConfig = ?config(mongo_config, Config),
+    ?assertMatch(
+        {ok, _},
+        create_bridge_http(MongoConfig)
+    ),
+    Val = erlang:unique_integer(),
+    ok = send_message(Config, #{key => Val}),
+    ?assertMatch(
+        {ok, [#{<<"key">> := Val}]},
+        find_all(Config)
+    ),
+    ok.

+ 2 - 1
lib-ee/emqx_ee_connector/rebar.config

@@ -1,7 +1,8 @@
 {erl_opts, [debug_info]}.
 {deps, [
   {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
-  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}}
+  {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.4"}}},
+  {emqx, {path, "../../apps/emqx"}}
 ]}.
 
 {shell, [

+ 1 - 0
scripts/docker-ct-apps

@@ -2,3 +2,4 @@
 apps/emqx_authn
 apps/emqx_authz
 apps/emqx_connector
+lib-ee/emqx_ee_bridge