|
|
@@ -352,7 +352,10 @@ route(Routes, Delivery = #delivery{message = Msg}, PersistRes) ->
|
|
|
Delivery,
|
|
|
TraceRouteAttrs,
|
|
|
fun(DeliveryWithTrace) ->
|
|
|
- add_route_attrs(Msg),
|
|
|
+ add_route_attrs(
|
|
|
+ (not emqx_message:is_sys(Msg)) andalso
|
|
|
+ emqx_external_trace:msg_attrs(Msg)
|
|
|
+ ),
|
|
|
do_route(Routes, DeliveryWithTrace, PersistRes)
|
|
|
end
|
|
|
).
|
|
|
@@ -367,10 +370,13 @@ route_result({TF, Group}) ->
|
|
|
do_route([], #delivery{message = Msg}, _PersistRes = []) ->
|
|
|
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
|
|
ok = inc_dropped_cnt(Msg),
|
|
|
- ?ext_trace_add_attrs(#{
|
|
|
- 'route.dropped.node' => node(),
|
|
|
- 'route.dropped.reason' => no_subscribers
|
|
|
- }),
|
|
|
+ add_route_attrs(
|
|
|
+ (not emqx_message:is_sys(Msg)) andalso
|
|
|
+ #{
|
|
|
+ 'route.dropped.node' => node(),
|
|
|
+ 'route.dropped.reason' => no_subscribers
|
|
|
+ }
|
|
|
+ ),
|
|
|
[];
|
|
|
do_route([], _Delivery, PersistRes = [_ | _]) ->
|
|
|
PersistRes;
|
|
|
@@ -490,15 +496,10 @@ inc_dropped_cnt(Msg) ->
|
|
|
end.
|
|
|
|
|
|
-compile({inline, [add_route_attrs/1]}).
|
|
|
-add_route_attrs(Msg) ->
|
|
|
- %% TODO? : maybe a switch for sys messages
|
|
|
- case emqx_message:is_sys(Msg) of
|
|
|
- true ->
|
|
|
- ok;
|
|
|
- false ->
|
|
|
- ?ext_trace_add_attrs(emqx_external_trace:msg_attrs(Msg)),
|
|
|
- ok
|
|
|
- end.
|
|
|
+add_route_attrs(false) ->
|
|
|
+ ok;
|
|
|
+add_route_attrs(Attrs) ->
|
|
|
+ ?ext_trace_add_attrs(Attrs).
|
|
|
|
|
|
-compile({inline, [subscribers/1]}).
|
|
|
-spec subscribers(
|