|
|
@@ -196,6 +196,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) ->
|
|
|
{error, channel_not_found};
|
|
|
{ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} ->
|
|
|
PulsarMessage = render_message(Message, MessageTmpl),
|
|
|
+ emqx_trace:rendered_action_template(ChannelId, #{
|
|
|
+ message => PulsarMessage,
|
|
|
+ sync_timeout => SyncTimeout,
|
|
|
+ is_async => false
|
|
|
+ }),
|
|
|
try
|
|
|
pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout)
|
|
|
catch
|
|
|
@@ -217,12 +222,16 @@ on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) ->
|
|
|
?tp_span(
|
|
|
pulsar_producer_on_query_async,
|
|
|
#{instance_id => _InstanceId, message => Message},
|
|
|
- on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn)
|
|
|
+ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn)
|
|
|
)
|
|
|
end.
|
|
|
|
|
|
-on_query_async2(Producers, Message, MessageTmpl, AsyncReplyFn) ->
|
|
|
+on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) ->
|
|
|
PulsarMessage = render_message(Message, MessageTmpl),
|
|
|
+ emqx_trace:rendered_action_template(ChannelId, #{
|
|
|
+ message => PulsarMessage,
|
|
|
+ is_async => true
|
|
|
+ }),
|
|
|
pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}).
|
|
|
|
|
|
%%-------------------------------------------------------------------------------------
|