Bladeren bron

feat(emqx_bridge_mongodb): port mongodb to shared connector and actions

Stefan Strigler 2 jaren geleden
bovenliggende
commit
4e077c951b

+ 9 - 3
apps/emqx_bridge/src/emqx_action_info.erl

@@ -77,6 +77,7 @@ hard_coded_action_info_modules_ee() ->
         emqx_bridge_confluent_producer_action_info,
         emqx_bridge_gcp_pubsub_producer_action_info,
         emqx_bridge_kafka_action_info,
+        emqx_bridge_mongodb_action_info,
         emqx_bridge_syskeeper_action_info
     ].
 -else.
@@ -116,14 +117,17 @@ bridge_v1_type_to_action_type(Type) ->
 
 action_type_to_bridge_v1_type(Bin, Conf) when is_binary(Bin) ->
     action_type_to_bridge_v1_type(binary_to_existing_atom(Bin), Conf);
-action_type_to_bridge_v1_type(ActionType, Conf) ->
+action_type_to_bridge_v1_type(ActionType, ActionConf) ->
     ActionInfoMap = info_map(),
     ActionTypeToBridgeV1Type = maps:get(action_type_to_bridge_v1_type, ActionInfoMap),
     case maps:get(ActionType, ActionTypeToBridgeV1Type, undefined) of
         undefined ->
             ActionType;
         BridgeV1TypeFun when is_function(BridgeV1TypeFun) ->
-            BridgeV1TypeFun(get_confs(ActionType, Conf));
+            case get_confs(ActionType, ActionConf) of
+                {ConnectorConfig, ActionConfig} -> BridgeV1TypeFun({ConnectorConfig, ActionConfig});
+                undefined -> ActionType
+            end;
         BridgeV1Type ->
             BridgeV1Type
     end.
@@ -131,7 +135,9 @@ action_type_to_bridge_v1_type(ActionType, Conf) ->
 get_confs(ActionType, #{<<"connector">> := ConnectorName} = ActionConfig) ->
     ConnectorType = action_type_to_connector_type(ActionType),
     ConnectorConfig = emqx_conf:get_raw([connectors, ConnectorType, ConnectorName]),
-    {ActionConfig, ConnectorConfig}.
+    {ConnectorConfig, ActionConfig};
+get_confs(_, _) ->
+    undefined.
 
 %% This function should return true for all inputs that are bridge V1 types for
 %% bridges that have been refactored to bridge V2s, and for all all bridge V2

+ 12 - 6
apps/emqx_bridge/src/emqx_bridge.erl

@@ -237,9 +237,15 @@ send_to_matched_egress_bridges_loop(Topic, Msg, [Id | Ids]) ->
     send_to_matched_egress_bridges_loop(Topic, Msg, Ids).
 
 send_message(BridgeId, Message) ->
-    {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
-    ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
-    send_message(BridgeType, BridgeName, ResId, Message, #{}).
+    {BridgeV1Type, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
+    case emqx_bridge_v2:is_bridge_v2_type(BridgeV1Type) of
+        true ->
+            BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
+            emqx_bridge_v2:send_message(BridgeV2Type, BridgeName, Message, #{});
+        false ->
+            ResId = emqx_bridge_resource:resource_id(BridgeV1Type, BridgeName),
+            send_message(BridgeV1Type, BridgeName, ResId, Message, #{})
+    end.
 
 send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
     case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
@@ -377,8 +383,8 @@ disable_enable(Action, BridgeType0, BridgeName) when
             )
     end.
 
-create(BridgeType0, BridgeName, RawConf) ->
-    BridgeType = upgrade_type(BridgeType0),
+create(BridgeV1Type, BridgeName, RawConf) ->
+    BridgeType = upgrade_type(BridgeV1Type),
     ?SLOG(debug, #{
         bridge_action => create,
         bridge_type => BridgeType,
@@ -387,7 +393,7 @@ create(BridgeType0, BridgeName, RawConf) ->
     }),
     case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
         true ->
-            emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeType, BridgeName, RawConf);
+            emqx_bridge_v2:bridge_v1_split_config_and_create(BridgeV1Type, BridgeName, RawConf);
         false ->
             emqx_conf:update(
                 emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],

+ 8 - 6
apps/emqx_bridge/src/emqx_bridge_lib.erl

@@ -78,6 +78,14 @@ external_ids(Type, Name) ->
             [external_id(Type0, Name), external_id(Type, Name)]
     end.
 
+get_conf(BridgeType, BridgeName) ->
+    case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
+        true ->
+            emqx_conf:get_raw([actions, BridgeType, BridgeName]);
+        false ->
+            undefined
+    end.
+
 %% Creates the external id for the bridge_v2 that is used by the rule actions
 %% to refer to the bridge_v2
 external_id(BridgeType, BridgeName) ->
@@ -87,9 +95,3 @@ external_id(BridgeType, BridgeName) ->
 
 bin(Bin) when is_binary(Bin) -> Bin;
 bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
-
-get_conf(BridgeType, BridgeName) ->
-    case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
-        true -> emqx_conf:get_raw([actions, BridgeType, BridgeName]);
-        false -> emqx_conf:get_raw([bridges, BridgeType, BridgeName])
-    end.

+ 6 - 4
apps/emqx_bridge/src/emqx_bridge_v2.erl

@@ -1053,8 +1053,8 @@ bridge_v1_is_valid(BridgeV1Type, BridgeName) ->
 bridge_v1_type_to_bridge_v2_type(Type) ->
     emqx_action_info:bridge_v1_type_to_action_type(Type).
 
-bridge_v2_type_to_bridge_v1_type(Type, Conf) ->
-    emqx_action_info:action_type_to_bridge_v1_type(Type, Conf).
+bridge_v2_type_to_bridge_v1_type(ActionType, ActionConf) ->
+    emqx_action_info:action_type_to_bridge_v1_type(ActionType, ActionConf).
 
 is_bridge_v2_type(Type) ->
     emqx_action_info:is_action_type(Type).
@@ -1065,8 +1065,8 @@ bridge_v1_list_and_transform() ->
 
 bridge_v1_lookup_and_transform(ActionType, Name) ->
     case lookup(ActionType, Name) of
-        {ok, #{raw_config := #{<<"connector">> := ConnectorName}} = ActionConfig} ->
-            BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, ActionConfig),
+        {ok, #{raw_config := #{<<"connector">> := ConnectorName} = RawConfig} = ActionConfig} ->
+            BridgeV1Type = ?MODULE:bridge_v2_type_to_bridge_v1_type(ActionType, RawConfig),
             case ?MODULE:bridge_v1_is_valid(BridgeV1Type, Name) of
                 true ->
                     ConnectorType = connector_type(ActionType),
@@ -1244,6 +1244,8 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR
             #{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}},
             PreviousRawConf =/= undefined
         ),
+    %% [FIXME] this will loop through all connector types, instead pass the
+    %% connector type and just do it for that one
     Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(
         FakeGlobalConfig
     ),

+ 20 - 14
apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl

@@ -552,18 +552,24 @@ t_on_get_status(Config, Opts) ->
         _Attempts = 20,
         ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
     ),
-    emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
-        ct:sleep(500),
-        ?retry(
-            _Interval0 = 200,
-            _Attempts0 = 10,
-            ?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId))
-        )
-    end),
-    %% Check that it recovers itself.
-    ?retry(
-        _Sleep = 1_000,
-        _Attempts = 20,
-        ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
-    ),
+    case ProxyHost of
+        undefined ->
+            ok;
+        _ ->
+            emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
+                ?retry(
+                    _Interval0 = 100,
+                    _Attempts0 = 20,
+                    ?assertEqual(
+                        {ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId)
+                    )
+                )
+            end),
+            %% Check that it recovers itself.
+            ?retry(
+                _Sleep = 1_000,
+                _Attempts = 20,
+                ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
+            )
+    end,
     ok.

+ 1 - 1
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.app.src

@@ -9,7 +9,7 @@
         emqx_resource,
         emqx_mongodb
     ]},
-    {env, []},
+    {env, [{emqx_action_info_modules, [emqx_bridge_mongodb_action_info]}]},
     {modules, []},
     {links, []}
 ]}.

+ 178 - 33
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb.erl

@@ -12,7 +12,9 @@
 
 %% emqx_bridge_enterprise "callbacks"
 -export([
-    conn_bridge_examples/1
+    bridge_v2_examples/1,
+    conn_bridge_examples/1,
+    connector_examples/1
 ]).
 
 %% hocon_schema callbacks
@@ -27,10 +29,13 @@
 %% hocon_schema API
 %%=================================================================================================
 
+%% [TODO] Namespace should be different depending on whether this is used for a
+%% connector, an action or a legacy bridge type.
 namespace() ->
     "bridge_mongodb".
 
 roots() ->
+    %% ???
     [].
 
 fields("config") ->
@@ -44,6 +49,18 @@ fields("config") ->
                 #{required => true, desc => ?DESC(emqx_resource_schema, "creation_opts")}
             )}
     ];
+fields("config_connector") ->
+    emqx_connector_schema:common_fields() ++
+        [
+            {parameters,
+                mk(
+                    hoconsc:union([
+                        ref(emqx_mongodb, "connector_" ++ T)
+                     || T <- ["single", "sharded", "rs"]
+                    ]),
+                    #{required => true, desc => ?DESC("mongodb_parameters")}
+                )}
+        ] ++ emqx_mongodb:fields(mongodb);
 fields("creation_opts") ->
     %% so far, mongodb connector does not support batching
     %% but we cannot delete this field due to compatibility reasons
@@ -55,12 +72,47 @@ fields("creation_opts") ->
             desc => ?DESC("batch_size")
         }}
     ]);
+fields(action) ->
+    {mongodb,
+        mk(
+            hoconsc:map(name, ref(?MODULE, mongodb_action)),
+            #{desc => <<"MongoDB Action Config">>, required => false}
+        )};
+fields(mongodb_action) ->
+    emqx_bridge_v2_schema:make_producer_action_schema(
+        mk(ref(?MODULE, action_parameters), #{
+            required => true, desc => ?DESC(action_parameters)
+        })
+    );
+fields(action_parameters) ->
+    [
+        {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
+        {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
+    ];
+fields(resource_opts) ->
+    fields("creation_opts");
 fields(mongodb_rs) ->
     emqx_mongodb:fields(rs) ++ fields("config");
 fields(mongodb_sharded) ->
     emqx_mongodb:fields(sharded) ++ fields("config");
 fields(mongodb_single) ->
     emqx_mongodb:fields(single) ++ fields("config");
+fields("post_connector") ->
+    type_and_name_fields(mongodb) ++
+        fields("config_connector");
+fields("put_connector") ->
+    fields("config_connector");
+fields("get_connector") ->
+    emqx_bridge_schema:status_fields() ++
+        fields("post_connector");
+fields("get_bridge_v2") ->
+    emqx_bridge_schema:status_fields() ++
+        fields("post_bridge_v2");
+fields("post_bridge_v2") ->
+    type_and_name_fields(mongodb) ++
+        fields(mongodb_action);
+fields("put_bridge_v2") ->
+    fields(mongodb_action);
 fields("post_rs") ->
     fields(mongodb_rs) ++ type_and_name_fields(mongodb_rs);
 fields("post_sharded") ->
@@ -86,6 +138,16 @@ fields("get_single") ->
         fields(mongodb_single) ++
         type_and_name_fields(mongodb_single).
 
+bridge_v2_examples(Method) ->
+    [
+        #{
+            <<"mongodb">> => #{
+                summary => <<"MongoDB Action">>,
+                value => action_values(Method)
+            }
+        }
+    ].
+
 conn_bridge_examples(Method) ->
     [
         #{
@@ -108,16 +170,46 @@ conn_bridge_examples(Method) ->
         }
     ].
 
+connector_examples(Method) ->
+    [
+        #{
+            <<"mongodb_rs">> => #{
+                summary => <<"MongoDB Replica Set Connector">>,
+                value => connector_values(mongodb_rs, Method)
+            }
+        },
+        #{
+            <<"mongodb_sharded">> => #{
+                summary => <<"MongoDB Sharded Connector">>,
+                value => connector_values(mongodb_sharded, Method)
+            }
+        },
+        #{
+            <<"mongodb_single">> => #{
+                summary => <<"MongoDB Standalone Connector">>,
+                value => connector_values(mongodb_single, Method)
+            }
+        }
+    ].
+
+desc("config_connector") ->
+    ?DESC("desc_config");
 desc("config") ->
     ?DESC("desc_config");
 desc("creation_opts") ->
     ?DESC(emqx_resource_schema, "creation_opts");
+desc(resource_opts) ->
+    ?DESC(emqx_resource_schema, "resource_opts");
 desc(mongodb_rs) ->
     ?DESC(mongodb_rs_conf);
 desc(mongodb_sharded) ->
     ?DESC(mongodb_sharded_conf);
 desc(mongodb_single) ->
     ?DESC(mongodb_single_conf);
+desc(mongodb_action) ->
+    ?DESC(mongodb_action);
+desc(action_parameters) ->
+    ?DESC(action_parameters);
 desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
     ["Configuration for MongoDB using `", string:to_upper(Method), "` method."];
 desc(_) ->
@@ -133,49 +225,102 @@ type_and_name_fields(MongoType) ->
         {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
     ].
 
-values(mongodb_rs = MongoType, Method) ->
-    TypeOpts = #{
+connector_values(Type, Method) ->
+    lists:foldl(
+        fun(M1, M2) ->
+            maps:merge(M1, M2)
+        end,
+        #{
+            description => <<"My example connector">>,
+            parameters => mongo_type_opts(Type)
+        },
+        [
+            common_values(),
+            method_values(mongodb, Method)
+        ]
+    ).
+
+action_values(Method) ->
+    maps:merge(
+        method_values(mongodb, Method),
+        #{
+            description => <<"My example action">>,
+            enable => true,
+            connector => <<"my_mongodb_connector">>,
+            parameters => #{
+                collection => <<"mycol">>
+            }
+        }
+    ).
+
+values(MongoType, Method) ->
+    maps:merge(
+        mongo_type_opts(MongoType),
+        bridge_values(MongoType, Method)
+    ).
+
+mongo_type_opts(mongodb_rs) ->
+    #{
+        mongo_type => <<"rs">>,
         servers => <<"localhost:27017, localhost:27018">>,
         w_mode => <<"safe">>,
         r_mode => <<"safe">>,
         replica_set_name => <<"rs">>
-    },
-    values(common, MongoType, Method, TypeOpts);
-values(mongodb_sharded = MongoType, Method) ->
-    TypeOpts = #{
+    };
+mongo_type_opts(mongodb_sharded) ->
+    #{
+        mongo_type => <<"sharded">>,
         servers => <<"localhost:27017, localhost:27018">>,
         w_mode => <<"safe">>
-    },
-    values(common, MongoType, Method, TypeOpts);
-values(mongodb_single = MongoType, Method) ->
-    TypeOpts = #{
+    };
+mongo_type_opts(mongodb_single) ->
+    #{
+        mongo_type => <<"single">>,
         server => <<"localhost:27017">>,
         w_mode => <<"safe">>
-    },
-    values(common, MongoType, Method, TypeOpts).
-
-values(common, MongoType, Method, TypeOpts) ->
-    MongoTypeBin = atom_to_binary(MongoType),
-    Common = #{
-        name => <<MongoTypeBin/binary, "_demo">>,
-        type => MongoTypeBin,
+    }.
+
+bridge_values(Type, _Method) ->
+    %% [FIXME] _Method makes a difference since PUT doesn't allow name and type
+    %% for connectors.
+    TypeBin = atom_to_binary(Type),
+    maps:merge(
+        #{
+            name => <<TypeBin/binary, "_demo">>,
+            type => TypeBin,
+            collection => <<"mycol">>
+        },
+        common_values()
+    ).
+
+common_values() ->
+    #{
         enable => true,
-        collection => <<"mycol">>,
         database => <<"mqtt">>,
         srv_record => false,
         pool_size => 8,
         username => <<"myuser">>,
         password => <<"******">>
-    },
-    MethodVals = method_values(MongoType, Method),
-    Vals0 = maps:merge(MethodVals, Common),
-    maps:merge(Vals0, TypeOpts).
-
-method_values(MongoType, _) ->
-    ConnectorType =
-        case MongoType of
-            mongodb_rs -> <<"rs">>;
-            mongodb_sharded -> <<"sharded">>;
-            mongodb_single -> <<"single">>
-        end,
-    #{mongo_type => ConnectorType}.
+    }.
+
+method_values(Type, post) ->
+    TypeBin = atom_to_binary(Type),
+    #{
+        name => <<TypeBin/binary, "_demo">>,
+        type => TypeBin
+    };
+method_values(Type, get) ->
+    maps:merge(
+        method_values(Type, post),
+        #{
+            status => <<"connected">>,
+            node_status => [
+                #{
+                    node => <<"emqx@localhost">>,
+                    status => <<"connected">>
+                }
+            ]
+        }
+    );
+method_values(_Type, put) ->
+    #{}.

+ 95 - 0
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_action_info.erl

@@ -0,0 +1,95 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_mongodb_action_info).
+
+-behaviour(emqx_action_info).
+
+%% behaviour callbacks
+-export([
+    bridge_v1_config_to_action_config/2,
+    bridge_v1_config_to_connector_config/1,
+    connector_action_config_to_bridge_v1_config/2,
+    action_type_name/0,
+    bridge_v1_type_name/0,
+    connector_type_name/0,
+    schema_module/0
+]).
+
+%% dynamic callback
+-export([
+    bridge_v1_type_name_fun/1
+]).
+
+-import(emqx_utils_conv, [bin/1]).
+
+-define(SCHEMA_MODULE, emqx_bridge_mongodb).
+
+connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
+    fix_v1_type(
+        maps:merge(
+            maps:without(
+                [<<"connector">>],
+                map_unindent(<<"parameters">>, ActionConfig)
+            ),
+            map_unindent(<<"parameters">>, ConnectorConfig)
+        )
+    ).
+
+fix_v1_type(#{<<"mongo_type">> := MongoType} = Conf) ->
+    Conf#{<<"type">> => v1_type(MongoType)}.
+
+bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
+    ActionTopLevelKeys = schema_keys(mongodb_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
+    ActionConfig#{<<"connector">> => ConnectorName}.
+
+bridge_v1_config_to_connector_config(BridgeV1Config) ->
+    ActionTopLevelKeys = schema_keys(mongodb_action),
+    ActionParametersKeys = schema_keys(action_parameters),
+    ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
+    ConnectorTopLevelKeys = schema_keys("config_connector"),
+    ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
+    ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
+    make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config).
+
+make_config_map(PickKeys, IndentKeys, Config) ->
+    Conf0 = maps:with(PickKeys, Config),
+    map_indent(<<"parameters">>, IndentKeys, Conf0).
+
+bridge_v1_type_name() ->
+    {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
+
+action_type_name() -> mongodb.
+
+connector_type_name() -> mongodb.
+
+schema_module() -> ?SCHEMA_MODULE.
+
+bridge_v1_type_names() -> [mongodb_rs, mongodb_sharded, mongodb_single].
+
+bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"mongo_type">> := MongoType}}, _}) ->
+    v1_type(MongoType).
+
+v1_type(<<"rs">>) -> mongodb_rs;
+v1_type(<<"sharded">>) -> mongodb_sharded;
+v1_type(<<"single">>) -> mongodb_single.
+
+map_unindent(Key, Map) ->
+    maps:merge(
+        maps:get(Key, Map),
+        maps:remove(Key, Map)
+    ).
+
+map_indent(IndentKey, PickKeys, Map) ->
+    maps:put(
+        IndentKey,
+        maps:with(PickKeys, Map),
+        maps:without(PickKeys, Map)
+    ).
+
+schema_keys(Name) ->
+    [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].

+ 83 - 30
apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl

@@ -6,16 +6,19 @@
 
 -behaviour(emqx_resource).
 
--include_lib("emqx/include/logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 %% `emqx_resource' API
 -export([
+    on_remove_channel/3,
     callback_mode/0,
-    on_start/2,
-    on_stop/2,
+    on_add_channel/4,
+    on_get_channel_status/3,
+    on_get_channels/1,
+    on_get_status/2,
     on_query/3,
-    on_get_status/2
+    on_start/2,
+    on_stop/2
 ]).
 
 %%========================================================================================
@@ -24,44 +27,94 @@
 
 callback_mode() -> emqx_mongodb:callback_mode().
 
-on_start(InstanceId, Config) ->
-    case emqx_mongodb:on_start(InstanceId, Config) of
-        {ok, ConnectorState} ->
-            PayloadTemplate0 = maps:get(payload_template, Config, undefined),
-            PayloadTemplate = preprocess_template(PayloadTemplate0),
-            CollectionTemplateSource = maps:get(collection, Config),
-            CollectionTemplate = preprocess_template(CollectionTemplateSource),
-            State = #{
-                payload_template => PayloadTemplate,
-                collection_template => CollectionTemplate,
-                connector_state => ConnectorState
-            },
-            {ok, State};
-        Error ->
-            Error
+on_add_channel(
+    _InstanceId,
+    #{channels := Channels} = OldState,
+    ChannelId,
+    #{parameters := Parameters} = ChannelConfig0
+) ->
+    PayloadTemplate0 = maps:get(payload_template, Parameters, undefined),
+    PayloadTemplate = preprocess_template(PayloadTemplate0),
+    CollectionTemplateSource = maps:get(collection, Parameters),
+    CollectionTemplate = preprocess_template(CollectionTemplateSource),
+    ChannelConfig = maps:merge(
+        Parameters,
+        ChannelConfig0#{
+            payload_template => PayloadTemplate,
+            collection_template => CollectionTemplate
+        }
+    ),
+    NewState = OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)},
+    {ok, NewState}.
+
+on_get_channel_status(InstanceId, _ChannelId, State) ->
+    case on_get_status(InstanceId, State) of
+        connected ->
+            connected;
+        _ ->
+            connecting
     end.
 
-on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
-    emqx_mongodb:on_stop(InstanceId, ConnectorState).
+on_get_channels(InstanceId) ->
+    emqx_bridge_v2:get_channels_for_connector(InstanceId).
 
-on_query(InstanceId, {send_message, Message0}, State) ->
+on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
+    emqx_mongodb:on_get_status(InstanceId, ConnectorState).
+
+on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_state := ConnectorState}) ->
     #{
         payload_template := PayloadTemplate,
-        collection_template := CollectionTemplate,
-        connector_state := ConnectorState
-    } = State,
-    NewConnectorState = ConnectorState#{
+        collection_template := CollectionTemplate
+    } = ChannelState0 = maps:get(Channel, Channels),
+    ChannelState = ChannelState0#{
         collection => emqx_placeholder:proc_tmpl(CollectionTemplate, Message0)
     },
     Message = render_message(PayloadTemplate, Message0),
-    Res = emqx_mongodb:on_query(InstanceId, {send_message, Message}, NewConnectorState),
-    ?tp(mongo_bridge_connector_on_query_return, #{result => Res}),
+    Res = emqx_mongodb:on_query(
+        InstanceId,
+        {Channel, Message},
+        maps:merge(ConnectorState, ChannelState)
+    ),
+    ?tp(mongo_bridge_connector_on_query_return, #{instance_id => InstanceId, result => Res}),
     Res;
 on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
     emqx_mongodb:on_query(InstanceId, Request, ConnectorState).
 
-on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
-    emqx_mongodb:on_get_status(InstanceId, ConnectorState).
+on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
+    NewState = State#{channels => maps:remove(ChannelId, Channels)},
+    {ok, NewState}.
+
+on_start(InstanceId, Config0) ->
+    Config = config_transform(Config0),
+    case emqx_mongodb:on_start(InstanceId, Config) of
+        {ok, ConnectorState} ->
+            State = #{
+                connector_state => ConnectorState,
+                channels => #{}
+            },
+            {ok, State};
+        Error ->
+            Error
+    end.
+
+config_transform(#{parameters := #{mongo_type := MongoType} = Parameters} = Config) ->
+    maps:put(
+        type,
+        connector_type(MongoType),
+        maps:merge(
+            maps:remove(parameters, Config),
+            Parameters
+        )
+    ).
+
+connector_type(rs) -> mongodb_rs;
+connector_type(sharded) -> mongodb_sharded;
+connector_type(single) -> mongodb_single.
+
+on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
+    ok = emqx_mongodb:on_stop(InstanceId, ConnectorState),
+    ?tp(mongodb_stopped, #{instance_id => InstanceId}),
+    ok.
 
 %%========================================================================================
 %% Helper fns

+ 34 - 8
apps/emqx_bridge_mongodb/test/emqx_bridge_mongodb_SUITE.erl

@@ -132,7 +132,17 @@ init_per_suite(Config) ->
 
 end_per_suite(_Config) ->
     emqx_mgmt_api_test_util:end_suite(),
-    ok = emqx_common_test_helpers:stop_apps([emqx_mongodb, emqx_bridge, emqx_rule_engine, emqx_conf]),
+    ok = emqx_common_test_helpers:stop_apps(
+        [
+            emqx_management,
+            emqx_bridge_mongodb,
+            emqx_mongodb,
+            emqx_bridge,
+            emqx_connector,
+            emqx_rule_engine,
+            emqx_conf
+        ]
+    ),
     ok.
 
 init_per_testcase(_Testcase, Config) ->
@@ -144,6 +154,7 @@ init_per_testcase(_Testcase, Config) ->
 end_per_testcase(_Testcase, Config) ->
     clear_db(Config),
     delete_bridge(Config),
+    [] = emqx_connector:list(),
     snabbkaffe:stop(),
     ok.
 
@@ -157,9 +168,17 @@ start_apps() ->
     %% we want to make sure they are loaded before
     %% ekka start in emqx_common_test_helpers:start_apps/1
     emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
-    ok = emqx_common_test_helpers:start_apps([
-        emqx_conf, emqx_rule_engine, emqx_bridge, emqx_mongodb
-    ]).
+    ok = emqx_common_test_helpers:start_apps(
+        [
+            emqx_conf,
+            emqx_rule_engine,
+            emqx_connector,
+            emqx_bridge,
+            emqx_mongodb,
+            emqx_bridge_mongodb,
+            emqx_management
+        ]
+    ).
 
 ensure_loaded() ->
     _ = application:load(emqtt),
@@ -198,6 +217,7 @@ mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
             "\n   w_mode = safe"
             "\n   use_legacy_protocol = auto"
             "\n   database = mqtt"
+            "\n   mongo_type = rs"
             "\n   resource_opts = {"
             "\n     query_mode = ~s"
             "\n     worker_pool_size = 1"
@@ -224,6 +244,7 @@ mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
             "\n   w_mode = safe"
             "\n   use_legacy_protocol = auto"
             "\n   database = mqtt"
+            "\n   mongo_type = sharded"
             "\n   resource_opts = {"
             "\n     query_mode = ~s"
             "\n     worker_pool_size = 1"
@@ -253,6 +274,7 @@ mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
             "\n   auth_source = ~s"
             "\n   username = ~s"
             "\n   password = \"file://~s\""
+            "\n   mongo_type = single"
             "\n   resource_opts = {"
             "\n     query_mode = ~s"
             "\n     worker_pool_size = 1"
@@ -290,13 +312,17 @@ create_bridge(Config, Overrides) ->
 delete_bridge(Config) ->
     Type = mongo_type_bin(?config(mongo_type, Config)),
     Name = ?config(mongo_name, Config),
-    emqx_bridge:remove(Type, Name).
+    emqx_bridge:check_deps_and_remove(Type, Name, [connector, rule_actions]).
 
 create_bridge_http(Params) ->
     Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
     AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
-    case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
-        {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])};
+    case
+        emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, #{
+            return_all => true
+        })
+    of
+        {ok, {{_, 201, _}, _, Body}} -> {ok, emqx_utils_json:decode(Body, [return_maps])};
         Error -> Error
     end.
 
@@ -564,8 +590,8 @@ t_get_status_server_selection_too_short(Config) ->
     ok.
 
 t_use_legacy_protocol_option(Config) ->
-    ResourceID = resource_id(Config),
     {ok, _} = create_bridge(Config, #{<<"use_legacy_protocol">> => <<"true">>}),
+    ResourceID = resource_id(Config),
     ?retry(
         _Interval0 = 200,
         _NAttempts0 = 20,

+ 232 - 0
apps/emqx_bridge_mongodb/test/emqx_bridge_v2_mongodb_SUITE.erl

@@ -0,0 +1,232 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_v2_mongodb_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-define(BRIDGE_TYPE, mongodb).
+-define(BRIDGE_TYPE_BIN, <<"mongodb">>).
+-define(CONNECTOR_TYPE, mongodb).
+-define(CONNECTOR_TYPE_BIN, <<"mongodb">>).
+
+-import(emqx_common_test_helpers, [on_exit/1]).
+-import(emqx_utils_conv, [bin/1]).
+
+%%------------------------------------------------------------------------------
+%% CT boilerplate
+%%------------------------------------------------------------------------------
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+init_per_suite(Config) ->
+    MongoHost = os:getenv("MONGO_SINGLE_HOST", "mongo"),
+    MongoPort = list_to_integer(os:getenv("MONGO_SINGLE_PORT", "27017")),
+    case emqx_common_test_helpers:is_tcp_server_available(MongoHost, MongoPort) of
+        true ->
+            Apps = emqx_cth_suite:start(
+                [
+                    emqx,
+                    emqx_conf,
+                    emqx_connector,
+                    emqx_bridge,
+                    emqx_bridge_mongodb,
+                    emqx_rule_engine,
+                    emqx_management,
+                    {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
+                ],
+                #{work_dir => emqx_cth_suite:work_dir(Config)}
+            ),
+            {ok, Api} = emqx_common_test_http:create_default_app(),
+            [
+                {apps, Apps},
+                {api, Api},
+                {mongo_host, MongoHost},
+                {mongo_port, MongoPort}
+                | Config
+            ];
+        false ->
+            case os:getenv("IS_CI") of
+                "yes" ->
+                    throw(no_mongo);
+                _ ->
+                    {skip, no_mongo}
+            end
+    end.
+
+end_per_suite(Config) ->
+    Apps = ?config(apps, Config),
+    emqx_cth_suite:stop(Apps),
+    ok.
+
+init_per_testcase(TestCase, Config) ->
+    common_init_per_testcase(TestCase, Config).
+
+common_init_per_testcase(TestCase, Config) ->
+    ct:timetrap(timer:seconds(60)),
+    emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+    emqx_config:delete_override_conf_files(),
+    UniqueNum = integer_to_binary(erlang:unique_integer()),
+    Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
+    AuthSource = bin(os:getenv("MONGO_AUTHSOURCE", "admin")),
+    Username = bin(os:getenv("MONGO_USERNAME", "")),
+    Password = bin(os:getenv("MONGO_PASSWORD", "")),
+    Passfile = filename:join(?config(priv_dir, Config), "passfile"),
+    ok = file:write_file(Passfile, Password),
+    NConfig = [
+        {mongo_authsource, AuthSource},
+        {mongo_username, Username},
+        {mongo_password, Password},
+        {mongo_passfile, Passfile}
+        | Config
+    ],
+    ConnectorConfig = connector_config(Name, NConfig),
+    BridgeConfig = bridge_config(Name, Name),
+    ok = snabbkaffe:start_trace(),
+    [
+        {connector_type, ?CONNECTOR_TYPE},
+        {connector_name, Name},
+        {connector_config, ConnectorConfig},
+        {bridge_type, ?BRIDGE_TYPE},
+        {bridge_name, Name},
+        {bridge_config, BridgeConfig}
+        | NConfig
+    ].
+
+end_per_testcase(_Testcase, Config) ->
+    case proplists:get_bool(skip_does_not_apply, Config) of
+        true ->
+            ok;
+        false ->
+            emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
+            emqx_common_test_helpers:call_janitor(60_000),
+            ok = snabbkaffe:stop(),
+            ok
+    end.
+
+%%------------------------------------------------------------------------------
+%% Helper fns
+%%------------------------------------------------------------------------------
+
+connector_config(Name, Config) ->
+    MongoHost = ?config(mongo_host, Config),
+    MongoPort = ?config(mongo_port, Config),
+    AuthSource = ?config(mongo_authsource, Config),
+    Username = ?config(mongo_username, Config),
+    PassFile = ?config(mongo_passfile, Config),
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"database">> => <<"mqtt">>,
+            <<"parameters">> =>
+                #{
+                    <<"mongo_type">> => <<"single">>,
+                    <<"server">> => iolist_to_binary([MongoHost, ":", integer_to_binary(MongoPort)]),
+                    <<"w_mode">> => <<"safe">>
+                },
+            <<"pool_size">> => 8,
+            <<"srv_record">> => false,
+            <<"username">> => Username,
+            <<"password">> => iolist_to_binary(["file://", PassFile]),
+            <<"auth_source">> => AuthSource
+        },
+    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    parse_and_check_connector_config(InnerConfigMap, Name).
+
+parse_and_check_connector_config(InnerConfigMap, Name) ->
+    TypeBin = ?CONNECTOR_TYPE_BIN,
+    RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
+    #{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
+        hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
+            required => false, atom_key => false
+        }),
+    ct:pal("parsed config: ~p", [Config]),
+    InnerConfigMap.
+
+bridge_config(Name, ConnectorId) ->
+    InnerConfigMap0 =
+        #{
+            <<"enable">> => true,
+            <<"connector">> => ConnectorId,
+            <<"parameters">> =>
+                #{},
+            <<"local_topic">> => <<"t/aeh">>
+            %%,
+        },
+    InnerConfigMap = serde_roundtrip(InnerConfigMap0),
+    parse_and_check_bridge_config(InnerConfigMap, Name).
+
+%% check it serializes correctly
+serde_roundtrip(InnerConfigMap0) ->
+    IOList = hocon_pp:do(InnerConfigMap0, #{}),
+    {ok, InnerConfigMap} = hocon:binary(IOList),
+    InnerConfigMap.
+
+parse_and_check_bridge_config(InnerConfigMap, Name) ->
+    TypeBin = ?BRIDGE_TYPE_BIN,
+    RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
+    hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
+    InnerConfigMap.
+
+shared_secret_path() ->
+    os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
+
+shared_secret(client_keyfile) ->
+    filename:join([shared_secret_path(), "client.key"]);
+shared_secret(client_certfile) ->
+    filename:join([shared_secret_path(), "client.crt"]);
+shared_secret(client_cacertfile) ->
+    filename:join([shared_secret_path(), "ca.crt"]);
+shared_secret(rig_keytab) ->
+    filename:join([shared_secret_path(), "rig.keytab"]).
+
+make_message() ->
+    Time = erlang:unique_integer(),
+    BinTime = integer_to_binary(Time),
+    Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
+    #{
+        clientid => BinTime,
+        payload => Payload,
+        timestamp => Time
+    }.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_start_stop(Config) ->
+    emqx_bridge_v2_testlib:t_start_stop(Config, mongodb_stopped),
+    ok.
+
+t_create_via_http(Config) ->
+    emqx_bridge_v2_testlib:t_create_via_http(Config),
+    ok.
+
+t_on_get_status(Config) ->
+    emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
+    ok.
+
+t_sync_query(Config) ->
+    ok = emqx_bridge_v2_testlib:t_sync_query(
+        Config,
+        fun make_message/0,
+        fun(Res) -> ?assertEqual(ok, Res) end,
+        mongo_bridge_connector_on_query_return
+    ),
+    ok.

+ 14 - 2
apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl

@@ -20,8 +20,8 @@
 
 resource_type(Type) when is_binary(Type) ->
     resource_type(binary_to_atom(Type, utf8));
-%% We use AEH's Kafka interface.
 resource_type(azure_event_hub_producer) ->
+    %% We use AEH's Kafka interface.
     emqx_bridge_kafka_impl_producer;
 resource_type(confluent_producer) ->
     emqx_bridge_kafka_impl_producer;
@@ -29,6 +29,8 @@ resource_type(gcp_pubsub_producer) ->
     emqx_bridge_gcp_pubsub_impl_producer;
 resource_type(kafka_producer) ->
     emqx_bridge_kafka_impl_producer;
+resource_type(mongodb) ->
+    emqx_bridge_mongodb_connector;
 resource_type(syskeeper_forwarder) ->
     emqx_bridge_syskeeper_connector;
 resource_type(syskeeper_proxy) ->
@@ -83,6 +85,14 @@ connector_structs() ->
                     required => false
                 }
             )},
+        {mongodb,
+            mk(
+                hoconsc:map(name, ref(emqx_bridge_mongodb, "config_connector")),
+                #{
+                    desc => <<"MongoDB Connector Config">>,
+                    required => false
+                }
+            )},
         {syskeeper_forwarder,
             mk(
                 hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)),
@@ -119,6 +129,7 @@ schema_modules() ->
         emqx_bridge_confluent_producer,
         emqx_bridge_gcp_pubsub_producer_schema,
         emqx_bridge_kafka,
+        emqx_bridge_mongodb,
         emqx_bridge_syskeeper_connector,
         emqx_bridge_syskeeper_proxy
     ].
@@ -133,12 +144,13 @@ api_schemas(Method) ->
         api_ref(
             emqx_bridge_confluent_producer, <<"confluent_producer">>, Method ++ "_connector"
         ),
-        api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
         api_ref(
             emqx_bridge_gcp_pubsub_producer_schema,
             <<"gcp_pubsub_producer">>,
             Method ++ "_connector"
         ),
+        api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
+        api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
         api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
         api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
     ].

+ 4 - 8
apps/emqx_connector/src/schema/emqx_connector_schema.erl

@@ -68,8 +68,9 @@ enterprise_fields_connectors() -> [].
 
 connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer];
 connector_type_to_bridge_types(confluent_producer) -> [confluent_producer];
-connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub_producer];
+connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub, gcp_pubsub_producer];
 connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer];
+connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
 connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder];
 connector_type_to_bridge_types(syskeeper_proxy) -> [].
 
@@ -266,8 +267,9 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
                 RawConfigSoFar1
             ),
             %% Add action
+            ActionType = emqx_action_info:bridge_v1_type_to_action_type(to_bin(BridgeType)),
             RawConfigSoFar3 = emqx_utils_maps:deep_put(
-                [actions_config_name(), to_bin(maybe_rename(BridgeType)), BridgeName],
+                [actions_config_name(), to_bin(ActionType), BridgeName],
                 RawConfigSoFar2,
                 ActionMap
             ),
@@ -286,12 +288,6 @@ transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) ->
     ),
     NewRawConf.
 
-%% v1 uses 'kafka' as bridge type v2 uses 'kafka_producer'
-maybe_rename(kafka) ->
-    kafka_producer;
-maybe_rename(Name) ->
-    Name.
-
 %%======================================================================================
 %% HOCON Schema Callbacks
 %%======================================================================================

+ 46 - 36
apps/emqx_mongodb/src/emqx_mongodb.erl

@@ -68,19 +68,10 @@ roots() ->
         }}
     ].
 
-fields(single) ->
-    [
-        {mongo_type, #{
-            type => single,
-            default => single,
-            desc => ?DESC("single_mongo_type")
-        }},
-        {server, server()},
-        {w_mode, fun w_mode/1}
-    ] ++ mongo_fields();
-fields(rs) ->
+fields("connector_rs") ->
     [
         {mongo_type, #{
+            required => true,
             type => rs,
             default => rs,
             desc => ?DESC("rs_mongo_type")
@@ -89,17 +80,51 @@ fields(rs) ->
         {w_mode, fun w_mode/1},
         {r_mode, fun r_mode/1},
         {replica_set_name, fun replica_set_name/1}
-    ] ++ mongo_fields();
-fields(sharded) ->
+    ];
+fields("connector_sharded") ->
     [
         {mongo_type, #{
+            required => true,
             type => sharded,
             default => sharded,
             desc => ?DESC("sharded_mongo_type")
         }},
         {servers, servers()},
         {w_mode, fun w_mode/1}
-    ] ++ mongo_fields();
+    ];
+fields("connector_single") ->
+    [
+        {mongo_type, #{
+            required => true,
+            type => single,
+            default => single,
+            desc => ?DESC("single_mongo_type")
+        }},
+        {server, server()},
+        {w_mode, fun w_mode/1}
+    ];
+fields(Type) when Type =:= rs; Type =:= single; Type =:= sharded ->
+    fields("connector_" ++ atom_to_list(Type)) ++ fields(mongodb);
+fields(mongodb) ->
+    [
+        {srv_record, fun srv_record/1},
+        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
+        {username, fun emqx_connector_schema_lib:username/1},
+        {password, emqx_connector_schema_lib:password_field()},
+        {use_legacy_protocol,
+            hoconsc:mk(hoconsc:enum([auto, true, false]), #{
+                default => auto,
+                desc => ?DESC("use_legacy_protocol")
+            })},
+        {auth_source, #{
+            type => binary(),
+            required => false,
+            desc => ?DESC("auth_source")
+        }},
+        {database, fun emqx_connector_schema_lib:database/1},
+        {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
+    ] ++
+        emqx_connector_schema_lib:ssl_fields();
 fields(topology) ->
     [
         {pool_size,
@@ -129,6 +154,12 @@ fields(topology) ->
         {min_heartbeat_frequency_ms, duration("min_heartbeat_period")}
     ].
 
+desc("connector_single") ->
+    ?DESC("desc_single");
+desc("connector_rs") ->
+    ?DESC("desc_rs");
+desc("connector_sharded") ->
+    ?DESC("desc_sharded");
 desc(single) ->
     ?DESC("desc_single");
 desc(rs) ->
@@ -140,27 +171,6 @@ desc(topology) ->
 desc(_) ->
     undefined.
 
-mongo_fields() ->
-    [
-        {srv_record, fun srv_record/1},
-        {pool_size, fun emqx_connector_schema_lib:pool_size/1},
-        {username, fun emqx_connector_schema_lib:username/1},
-        {password, emqx_connector_schema_lib:password_field()},
-        {use_legacy_protocol,
-            hoconsc:mk(hoconsc:enum([auto, true, false]), #{
-                default => auto,
-                desc => ?DESC("use_legacy_protocol")
-            })},
-        {auth_source, #{
-            type => binary(),
-            required => false,
-            desc => ?DESC("auth_source")
-        }},
-        {database, fun emqx_connector_schema_lib:database/1},
-        {topology, #{type => hoconsc:ref(?MODULE, topology), required => false}}
-    ] ++
-        emqx_connector_schema_lib:ssl_fields().
-
 %% ===================================================================
 
 callback_mode() -> always_sync.
@@ -236,7 +246,7 @@ on_stop(InstId, _State) ->
 
 on_query(
     InstId,
-    {send_message, Document},
+    {_ChannelId, Document},
     #{pool_name := PoolName, collection := Collection} = State
 ) ->
     Request = {insert, Collection, Document},

+ 21 - 0
rel/i18n/emqx_bridge_mongodb.hocon

@@ -48,6 +48,12 @@ mongodb_single_conf.desc:
 mongodb_single_conf.label:
 """MongoDB (Standalone) Configuration"""
 
+mongodb_parameters.label:
+"""MongoDB Type Specific Parameters"""
+
+mongodb_parameters.desc:
+"""Set of parameters specific for the given type of this MongoDB connector, `mongo_type` can be one of `single` (Standalone), `sharded` (Sharded) or `rs` (Replica Set)."""
+
 payload_template.desc:
 """The template for formatting the outgoing messages.  If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc."""
 
@@ -59,4 +65,19 @@ batch_size.desc:
 batch_size.label:
 """Batch Size"""
 
+action_parameters.label:
+"""Action Parameters"""
+action_parameters.desc:
+"""Additional parameters specific to this action type"""
+
+mongodb_action.label:
+"""MongoDB Action"""
+mongodb_action.desc:
+"""Action to interact with a MongoDB connector"""
+
+mqtt_topic.desc:
+"""MQTT topic or topic filter as data source (bridge input).  If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in MongoDB."""
+mqtt_topic.label:
+"""Source MQTT Topic"""
+
 }