|
|
@@ -35,8 +35,11 @@ on_start(InstanceId, Config) ->
|
|
|
{ok, ConnectorState} ->
|
|
|
PayloadTemplate0 = maps:get(payload_template, Config, undefined),
|
|
|
PayloadTemplate = preprocess_template(PayloadTemplate0),
|
|
|
+ CollectionTemplateSource = maps:get(collection, Config),
|
|
|
+ CollectionTemplate = preprocess_template(CollectionTemplateSource),
|
|
|
State = #{
|
|
|
payload_template => PayloadTemplate,
|
|
|
+ collection_template => CollectionTemplate,
|
|
|
connector_state => ConnectorState
|
|
|
},
|
|
|
{ok, State};
|
|
|
@@ -50,10 +53,14 @@ on_stop(InstanceId, _State = #{connector_state := ConnectorState}) ->
|
|
|
on_query(InstanceId, {send_message, Message0}, State) ->
|
|
|
#{
|
|
|
payload_template := PayloadTemplate,
|
|
|
+ collection_template := CollectionTemplate,
|
|
|
connector_state := ConnectorState
|
|
|
} = State,
|
|
|
+ NewConnectorState = ConnectorState#{
|
|
|
+ collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0)
|
|
|
+ },
|
|
|
Message = render_message(PayloadTemplate, Message0),
|
|
|
- emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, ConnectorState);
|
|
|
+ emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState);
|
|
|
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
|
|
|
emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).
|
|
|
|