|
|
@@ -295,7 +295,7 @@ client_publish(Packet, Attrs, ProcessFun) ->
|
|
|
Ctx = otel_tracer:set_current_span(RootCtx, SpanCtx),
|
|
|
%% Otel attach for next spans (client_authz, msg_route... etc )
|
|
|
_ = otel_ctx:attach(Ctx),
|
|
|
- %% Attach in process dictionary for outgoing/pending packets
|
|
|
+ %% Attach in process dictionary for outgoing/awaiting packets
|
|
|
attach_outgoing(Packet, Ctx),
|
|
|
try
|
|
|
ProcessFun(Packet)
|
|
|
@@ -563,9 +563,9 @@ start_outgoing_trace(Packet, Attrs, ParentCtx) ->
|
|
|
stop_outgoing_trace(Anys, Attrs) when is_list(Anys) ->
|
|
|
lists:foreach(fun(Any) -> stop_outgoing_trace(Any, Attrs) end, Anys);
|
|
|
stop_outgoing_trace(Packet, _Attrs) when is_record(Packet, mqtt_packet) ->
|
|
|
- %% Maybe pending for next Packet
|
|
|
+ %% Maybe awaiting for next Packet
|
|
|
%% The current outgoing Packet SHOULD NOT be modified
|
|
|
- ok = outgoing_maybe_pending_next(Packet),
|
|
|
+ ok = outgoing_maybe_awaiting_next(Packet),
|
|
|
end_span(get_ctx(Packet));
|
|
|
stop_outgoing_trace(Any, _Attrs) ->
|
|
|
end_span(get_ctx(Any)).
|
|
|
@@ -580,74 +580,74 @@ end_span(_) ->
|
|
|
|
|
|
%% Note:
|
|
|
%% When replying PUBACK/PUBREC/PUBCOMP to the `Publisher' or sending PUBLISH/PUBREL to the `Subscriber',
|
|
|
-%% the current `PacketId' is stored and pending-trace begins.
|
|
|
+%% the current `PacketId' is stored and awaiting-trace begins.
|
|
|
%% At this time, a new span begins and the span ends after receiving and processing the reply
|
|
|
%% from Client(might be Publisher or Subscriber).
|
|
|
|
|
|
--compile({inline, [outgoing_maybe_pending_next/1]}).
|
|
|
+-compile({inline, [outgoing_maybe_awaiting_next/1]}).
|
|
|
|
|
|
%% ====================
|
|
|
%% Broker -> Client(`Publisher'):
|
|
|
-outgoing_maybe_pending_next(?PACKET(?PUBACK)) ->
|
|
|
+outgoing_maybe_awaiting_next(?PACKET(?PUBACK)) ->
|
|
|
%% PUBACK (QoS=1), Ignore
|
|
|
ok;
|
|
|
-outgoing_maybe_pending_next(?PUBREC_PACKET(PacketId) = OutgoingPubrecPacket) ->
|
|
|
- %% PUBREC (QoS=2), Pending PUBREL
|
|
|
- start_pending_trace(?PUBREL, PacketId, get_ctx(OutgoingPubrecPacket));
|
|
|
-outgoing_maybe_pending_next(?PACKET(?PUBCOMP)) ->
|
|
|
+outgoing_maybe_awaiting_next(?PUBREC_PACKET(PacketId) = OutgoingPubrecPacket) ->
|
|
|
+ %% PUBREC (QoS=2), Awaiting PUBREL
|
|
|
+ start_awaiting_trace(?PUBREL, PacketId, get_ctx(OutgoingPubrecPacket));
|
|
|
+outgoing_maybe_awaiting_next(?PACKET(?PUBCOMP)) ->
|
|
|
%% PUBCOMP (QoS=2), Ignore
|
|
|
ok;
|
|
|
%% ====================
|
|
|
%% Broker -> Client(`Subscriber'):
|
|
|
-outgoing_maybe_pending_next(?PUBLISH_PACKET(?QOS_0)) ->
|
|
|
+outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_0)) ->
|
|
|
%% PUBLISH (QoS=0), Ignore
|
|
|
ok;
|
|
|
-outgoing_maybe_pending_next(?PUBLISH_PACKET(?QOS_1, PacketId) = Packet) ->
|
|
|
- %% PUBLISH (QoS=1), Pending PUBACK
|
|
|
- start_pending_trace(?PUBACK, PacketId, get_ctx(Packet));
|
|
|
-outgoing_maybe_pending_next(?PUBLISH_PACKET(?QOS_2, PacketId) = Packet) ->
|
|
|
- %% PUBLISH (QoS=2), Pending PUBREC
|
|
|
- start_pending_trace(?PUBREC, PacketId, get_ctx(Packet));
|
|
|
-outgoing_maybe_pending_next(?PACKET(?PUBREL, PacketId) = Packet) ->
|
|
|
- %% PUBREL (QoS=2), Pending PUBCOMP
|
|
|
- start_pending_trace(?PUBCOMP, PacketId, get_ctx(Packet));
|
|
|
+outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_1, PacketId) = Packet) ->
|
|
|
+ %% PUBLISH (QoS=1), Awaiting PUBACK
|
|
|
+ start_awaiting_trace(?PUBACK, PacketId, get_ctx(Packet));
|
|
|
+outgoing_maybe_awaiting_next(?PUBLISH_PACKET(?QOS_2, PacketId) = Packet) ->
|
|
|
+ %% PUBLISH (QoS=2), Awaiting PUBREC
|
|
|
+ start_awaiting_trace(?PUBREC, PacketId, get_ctx(Packet));
|
|
|
+outgoing_maybe_awaiting_next(?PACKET(?PUBREL, PacketId) = Packet) ->
|
|
|
+ %% PUBREL (QoS=2), Awaiting PUBCOMP
|
|
|
+ start_awaiting_trace(?PUBCOMP, PacketId, get_ctx(Packet));
|
|
|
%% ====================
|
|
|
-outgoing_maybe_pending_next(_) ->
|
|
|
- %% TODO: Pending AUTH
|
|
|
+outgoing_maybe_awaiting_next(_) ->
|
|
|
+ %% TODO: Awaiting AUTH
|
|
|
ok.
|
|
|
|
|
|
-start_pending_trace(PendingType, PacketId, ParentCtx) ->
|
|
|
- PendingCtxKey = internal_extra_key(PendingType, PacketId),
|
|
|
- PendingSpanCtx = otel_tracer:start_span(
|
|
|
+start_awaiting_trace(AwaitingType, PacketId, ParentCtx) ->
|
|
|
+ AwaitingCtxKey = internal_extra_key(AwaitingType, PacketId),
|
|
|
+ AwaitingSpanCtx = otel_tracer:start_span(
|
|
|
ParentCtx,
|
|
|
?current_tracer,
|
|
|
- pending_span_name(PendingType),
|
|
|
+ awaiting_span_name(AwaitingType),
|
|
|
#{}
|
|
|
),
|
|
|
- NCtx = otel_tracer:set_current_span(ParentCtx, PendingSpanCtx),
|
|
|
- _ = attach_internal_ctx(PendingCtxKey, NCtx),
|
|
|
+ NCtx = otel_tracer:set_current_span(ParentCtx, AwaitingSpanCtx),
|
|
|
+ _ = attach_internal_ctx(AwaitingCtxKey, NCtx),
|
|
|
ok.
|
|
|
|
|
|
-client_incoming(?PACKET(PendingType, PktVar) = Packet, Attrs, ProcessFun) ->
|
|
|
+client_incoming(?PACKET(AwaitingType, PktVar) = Packet, Attrs, ProcessFun) ->
|
|
|
try
|
|
|
ProcessFun(Packet)
|
|
|
after
|
|
|
- end_pending_client_packet(
|
|
|
- internal_extra_key(PendingType, PktVar), Attrs
|
|
|
+ end_awaiting_client_packet(
|
|
|
+ internal_extra_key(AwaitingType, PktVar), Attrs
|
|
|
)
|
|
|
end.
|
|
|
|
|
|
-end_pending_client_packet(PendingCtxKey, Attrs) ->
|
|
|
- case detach_internal_ctx(PendingCtxKey) of
|
|
|
+end_awaiting_client_packet(AwaitingCtxKey, Attrs) ->
|
|
|
+ case detach_internal_ctx(AwaitingCtxKey) of
|
|
|
Ctx when is_map(Ctx) ->
|
|
|
ok = add_span_attrs(Attrs, Ctx),
|
|
|
_ = end_span(Ctx),
|
|
|
- earse_internal_ctx(PendingCtxKey),
|
|
|
+ earse_internal_ctx(AwaitingCtxKey),
|
|
|
ok;
|
|
|
_ ->
|
|
|
%% TODO:
|
|
|
- %% erlang:get() for all internal_extra_key and erase all unknown PendingType
|
|
|
- earse_internal_ctx(PendingCtxKey),
|
|
|
+ %% erlang:get() for all internal_extra_key and erase all unknown Awaiting Type
|
|
|
+ earse_internal_ctx(AwaitingCtxKey),
|
|
|
ok
|
|
|
end,
|
|
|
ok.
|
|
|
@@ -673,7 +673,7 @@ earse_internal_ctx(Key) ->
|
|
|
{inline, [
|
|
|
internal_extra_key/2,
|
|
|
outgoing_span_name/1,
|
|
|
- pending_span_name/1
|
|
|
+ awaiting_span_name/1
|
|
|
]}
|
|
|
).
|
|
|
|
|
|
@@ -707,17 +707,17 @@ outgoing_span_name(?PACKET(?PUBCOMP)) ->
|
|
|
|
|
|
%% Incoming Packet Span Name
|
|
|
%% Client(`Publisher') -> Broker:
|
|
|
-pending_span_name(?PUBREL) ->
|
|
|
+awaiting_span_name(?PUBREL) ->
|
|
|
%% PUBREL (QoS=2)
|
|
|
?CLIENT_PUBREL_SPAN_NAME;
|
|
|
%% Client(`Subscriber') -> Broker:
|
|
|
-pending_span_name(?PUBACK) ->
|
|
|
+awaiting_span_name(?PUBACK) ->
|
|
|
%% PUBACK (QoS=1)
|
|
|
?CLIENT_PUBACK_SPAN_NAME;
|
|
|
-pending_span_name(?PUBREC) ->
|
|
|
+awaiting_span_name(?PUBREC) ->
|
|
|
%% PUBREC (QoS=2)
|
|
|
?CLIENT_PUBREC_SPAN_NAME;
|
|
|
-pending_span_name(?PUBCOMP) ->
|
|
|
+awaiting_span_name(?PUBCOMP) ->
|
|
|
%% PUBCOMP (QoS=2)
|
|
|
?CLIENT_PUBCOMP_SPAN_NAME.
|
|
|
|