Просмотр исходного кода

Merge pull request #9709 from thalesmg/mongodb-bridge-payload-template-v50

feat(mongodb): add `payload_template` field for bridge (e5.0)
Thales Macedo Garitezi 3 лет назад
Родитель
Сommit
9d99d180f9

+ 11 - 0
lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_mongodb.conf

@@ -86,4 +86,15 @@ emqx_ee_bridge_mongodb {
       zh: "桥接名称"
     }
   }
+
+  payload_template {
+    desc {
+      en: "The template for formatting the outgoing messages.  If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc."
+      zh: "用于格式化写入 MongoDB 的消息模板。 如果未定义,规则引擎会使用 JSON 格式序列化所有的可见输入,例如 clientid, topic, payload 等。"
+    }
+    label: {
+      en: "Payload template"
+      zh: "有效载荷模板"
+    }
+  }
 }

+ 3 - 3
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl

@@ -61,9 +61,9 @@ resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, u
 resource_type(kafka) -> emqx_bridge_impl_kafka;
 resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
 resource_type(gcp_pubsub) -> emqx_ee_connector_gcp_pubsub;
-resource_type(mongodb_rs) -> emqx_connector_mongo;
-resource_type(mongodb_sharded) -> emqx_connector_mongo;
-resource_type(mongodb_single) -> emqx_connector_mongo;
+resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
+resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
+resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
 resource_type(mysql) -> emqx_connector_mysql;
 resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
 resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;

+ 2 - 1
lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl

@@ -37,7 +37,8 @@ roots() ->
 fields("config") ->
     [
         {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
-        {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}
+        {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
+        {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
     ];
 fields(mongodb_rs) ->
     emqx_connector_mongo:fields(rs) ++ fields("config");

+ 21 - 3
lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mongodb_SUITE.erl

@@ -25,7 +25,8 @@ all() ->
 group_tests() ->
     [
         t_setup_via_config_and_publish,
-        t_setup_via_http_api_and_publish
+        t_setup_via_http_api_and_publish,
+        t_payload_template
     ].
 
 groups() ->
@@ -196,9 +197,14 @@ parse_and_check(ConfigString, Type, Name) ->
     Config.
 
 create_bridge(Config) ->
+    create_bridge(Config, _Overrides = #{}).
+
+create_bridge(Config, Overrides) ->
     Type = mongo_type_bin(?config(mongo_type, Config)),
     Name = ?config(mongo_name, Config),
-    MongoConfig = ?config(mongo_config, Config),
+    MongoConfig0 = ?config(mongo_config, Config),
+    MongoConfig = emqx_map_lib:deep_merge(MongoConfig0, Overrides),
+    ct:pal("creating ~p bridge with config:\n ~p", [Type, MongoConfig]),
     emqx_bridge:create(Type, Name, MongoConfig).
 
 delete_bridge(Config) ->
@@ -219,7 +225,8 @@ clear_db(Config) ->
     Name = ?config(mongo_name, Config),
     #{<<"collection">> := Collection} = ?config(mongo_config, Config),
     ResourceID = emqx_bridge_resource:resource_id(Type, Name),
-    {ok, _, #{state := #{poolname := PoolName}}} = emqx_resource:get_instance(ResourceID),
+    {ok, _, #{state := #{connector_state := #{poolname := PoolName}}}} =
+        emqx_resource:get_instance(ResourceID),
     Selector = #{},
     {true, _} = ecpool:pick_and_do(
         PoolName, {mongo_api, delete, [Collection, Selector]}, no_handover
@@ -275,3 +282,14 @@ t_setup_via_http_api_and_publish(Config) ->
         find_all(Config)
     ),
     ok.
+
+t_payload_template(Config) ->
+    {ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}),
+    Val = erlang:unique_integer(),
+    ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
+    ok = send_message(Config, #{key => Val, clientid => ClientId}),
+    ?assertMatch(
+        {ok, [#{<<"foo">> := ClientId}]},
+        find_all(Config)
+    ),
+    ok.

+ 78 - 0
lib-ee/emqx_ee_connector/src/emqx_ee_connector_mongodb.erl

@@ -0,0 +1,78 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_connector_mongodb).
+
+-behaviour(emqx_resource).
+
+-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+%% `emqx_resource' API
+-export([
+    callback_mode/0,
+    is_buffer_supported/0,
+    on_start/2,
+    on_stop/2,
+    on_query/3,
+    on_get_status/2
+]).
+
+%%========================================================================================
+%% `emqx_resource' API
+%%========================================================================================
+
+callback_mode() -> emqx_connector_mongo:callback_mode().
+
+is_buffer_supported() -> false.
+
+on_start(InstanceId, Config) ->
+    case emqx_connector_mongo:on_start(InstanceId, Config) of
+        {ok, ConnectorState} ->
+            PayloadTemplate0 = maps:get(payload_template, Config, undefined),
+            PayloadTemplate = preprocess_template(PayloadTemplate0),
+            State = #{
+                payload_template => PayloadTemplate,
+                connector_state => ConnectorState
+            },
+            {ok, State};
+        Error ->
+            Error
+    end.
+
+on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
+    emqx_connector_mongo:on_stop(InstanceId, ConnectorState).
+
+on_query(InstanceId, {send_message, Message0}, State) ->
+    #{
+        payload_template := PayloadTemplate,
+        connector_state := ConnectorState
+    } = State,
+    Message = render_message(PayloadTemplate, Message0),
+    emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, ConnectorState);
+on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
+    emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).
+
+on_get_status(InstanceId, _State = #{connector_state := ConnectorState}) ->
+    emqx_connector_mongo:on_get_status(InstanceId, ConnectorState).
+
+%%========================================================================================
+%% Helper fns
+%%========================================================================================
+
+preprocess_template(undefined = _PayloadTemplate) ->
+    undefined;
+preprocess_template(PayloadTemplate) ->
+    emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate).
+
+render_message(undefined = _PayloadTemplate, Message) ->
+    Message;
+render_message(PayloadTemplate, Message) ->
+    %% Note: mongo expects a map as a document, so the rendered result
+    %% must be JSON-serializable
+    Rendered = emqx_plugin_libs_rule:proc_tmpl(PayloadTemplate, Message),
+    emqx_json:decode(Rendered, [return_maps]).