|
@@ -168,13 +168,14 @@ init_channel_state(#{parameters := Parameters}) ->
|
|
|
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
|
|
on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ->
|
|
|
case maps:get(Tag, Channels, undefined) of
|
|
case maps:get(Tag, Channels, undefined) of
|
|
|
ChannelState = #{} ->
|
|
ChannelState = #{} ->
|
|
|
- run_simple_upload(InstId, Data, ChannelState, Config);
|
|
|
|
|
|
|
+ run_simple_upload(InstId, Tag, Data, ChannelState, Config);
|
|
|
undefined ->
|
|
undefined ->
|
|
|
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
|
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
run_simple_upload(
|
|
run_simple_upload(
|
|
|
InstId,
|
|
InstId,
|
|
|
|
|
+ ChannelID,
|
|
|
Data,
|
|
Data,
|
|
|
#{
|
|
#{
|
|
|
bucket := BucketTemplate,
|
|
bucket := BucketTemplate,
|
|
@@ -188,6 +189,11 @@ run_simple_upload(
|
|
|
Client = emqx_s3_client:create(Bucket, Config),
|
|
Client = emqx_s3_client:create(Bucket, Config),
|
|
|
Key = render_key(KeyTemplate, Data),
|
|
Key = render_key(KeyTemplate, Data),
|
|
|
Content = render_content(ContentTemplate, Data),
|
|
Content = render_content(ContentTemplate, Data),
|
|
|
|
|
+ emqx_trace:rendered_action_template(ChannelID, #{
|
|
|
|
|
+ bucket => Bucket,
|
|
|
|
|
+ key => Key,
|
|
|
|
|
+ content => Content
|
|
|
|
|
+ }),
|
|
|
case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
|
|
case emqx_s3_client:put_object(Client, Key, UploadOpts, Content) of
|
|
|
ok ->
|
|
ok ->
|
|
|
?tp(s3_bridge_connector_upload_ok, #{
|
|
?tp(s3_bridge_connector_upload_ok, #{
|