|
|
@@ -305,43 +305,36 @@ client_publish(Packet, Attrs, ProcessFun) ->
|
|
|
#{attributes => gen_attrs(Packet, Attrs)}
|
|
|
),
|
|
|
Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
|
|
|
- %% XXX: Remove otel_ctx attach
|
|
|
- %% use maybe_attach_for_outgoing/2 only
|
|
|
+ %% Otel attach for next spans (client_authz, msg_route... etc )
|
|
|
_ = otel_ctx:attach(Ctx),
|
|
|
- %% Attach before ProcessFun evaluated
|
|
|
- attach_by_maybe_outgoing(Packet, Ctx),
|
|
|
+ %% Attach in process dictionary for outgoing/pending packets
|
|
|
+ attach_outgoing(Packet, Ctx),
|
|
|
try
|
|
|
ProcessFun(Packet)
|
|
|
after
|
|
|
- %% XXX: Remove otel_ctx detach and `?end_span()`
|
|
|
- %% end current span by
|
|
|
- %% otel_span:end_span(otel_tracer:current_span_ctx(Ctx));
|
|
|
_ = ?end_span(),
|
|
|
+ erase_outgoing(Packet),
|
|
|
clear()
|
|
|
end.
|
|
|
|
|
|
-%% to `Publisher'
|
|
|
-attach_by_maybe_outgoing(?PACKET(?PUBACK), _Ctx) ->
|
|
|
+%% TODO: CONNACK, AUTH, SUBACK, UNSUBACK, DISCONNECT
|
|
|
+-compile({inline, [attach_outgoing/2, erase_outgoing/1]}).
|
|
|
+attach_outgoing(?PUBLISH_PACKET(?QOS_0), _Ctx) ->
|
|
|
ok;
|
|
|
-attach_by_maybe_outgoing(?PUBREC_PACKET(PacketId), Ctx) ->
|
|
|
- attach_internal_ctx({?PUBREC, PacketId}, Ctx);
|
|
|
-attach_by_maybe_outgoing(?PACKET(?PUBCOMP), _Ctx) ->
|
|
|
- ok;
|
|
|
-%% to `Subscriber'
|
|
|
-attach_by_maybe_outgoing(?PUBLISH_PACKET(?QOS_0), _Ctx) ->
|
|
|
- ok;
|
|
|
-attach_by_maybe_outgoing(?PUBLISH_PACKET(?QOS_1, PacketId), Ctx) ->
|
|
|
+attach_outgoing(?PUBLISH_PACKET(?QOS_1, PacketId), Ctx) ->
|
|
|
attach_internal_ctx({?PUBACK, PacketId}, Ctx),
|
|
|
ok;
|
|
|
-attach_by_maybe_outgoing(?PUBLISH_PACKET(?QOS_2, PacketId), Ctx) ->
|
|
|
+attach_outgoing(?PUBLISH_PACKET(?QOS_2, PacketId), Ctx) ->
|
|
|
attach_internal_ctx({?PUBREC, PacketId}, Ctx),
|
|
|
+ ok.
|
|
|
+
|
|
|
+erase_outgoing(?PUBLISH_PACKET(?QOS_0)) ->
|
|
|
ok;
|
|
|
-attach_by_maybe_outgoing(?PUBREL_PACKET(PacketId), Ctx) ->
|
|
|
- attach_internal_ctx({?PUBCOMP, PacketId}, Ctx),
|
|
|
+erase_outgoing(?PUBLISH_PACKET(?QOS_1, PacketId)) ->
|
|
|
+ earse_internal_ctx({?PUBACK, PacketId}),
|
|
|
ok;
|
|
|
-attach_by_maybe_outgoing(Packet, Ctx) ->
|
|
|
- %% TODO: SUBACK, UNSUBACK
|
|
|
- ?SLOG(error, #{msg => "Unknown Packet", packet => Packet, ctx => Ctx}),
|
|
|
+erase_outgoing(?PUBLISH_PACKET(?QOS_2, PacketId)) ->
|
|
|
+ earse_internal_ctx({?PUBREC, PacketId}),
|
|
|
ok.
|
|
|
|
|
|
-spec client_puback(
|