|
|
@@ -422,13 +422,13 @@ handle_in(?PACKET(_), Channel = #channel{conn_state = ConnState}) when
|
|
|
ConnState =/= connected andalso ConnState =/= reauthenticating
|
|
|
->
|
|
|
handle_out(disconnect, ?RC_PROTOCOL_ERROR, Channel);
|
|
|
-handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel) ->
|
|
|
+handle_in(?PUBLISH_PACKET(_QoS, _Topic, _PacketId) = Packet, Channel) ->
|
|
|
case emqx_packet:check(Packet) of
|
|
|
ok ->
|
|
|
?EXT_TRACE_WITH_PROCESS_FUN(
|
|
|
client_publish,
|
|
|
Packet,
|
|
|
- (basic_trace_attrs(Channel))#{'message.topic' => Topic},
|
|
|
+ (basic_trace_attrs(Channel))#{'message.topic' => _Topic},
|
|
|
fun(PacketWithTrace) -> process_publish(PacketWithTrace, Channel) end
|
|
|
);
|
|
|
{error, ReasonCode} ->
|
|
|
@@ -500,7 +500,7 @@ handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
|
|
|
NChannel = Channel#channel{keepalive = NKeepalive},
|
|
|
{ok, ?PACKET(?PINGRESP), reset_timer(keepalive, NChannel)};
|
|
|
handle_in(
|
|
|
- ?PACKET(?DISCONNECT, PktVar) = Packet,
|
|
|
+ ?PACKET(?DISCONNECT, _PktVar) = Packet,
|
|
|
Channel
|
|
|
) ->
|
|
|
?EXT_TRACE_WITH_PROCESS_FUN(
|
|
|
@@ -509,7 +509,7 @@ handle_in(
|
|
|
(basic_trace_attrs(Channel))#{
|
|
|
'client.peername' => emqx_utils:ntoa(info(peername, Channel)),
|
|
|
'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
|
|
|
- 'client.disconnect.reason_code' => emqx_packet:info(reason_code, PktVar)
|
|
|
+ 'client.disconnect.reason_code' => emqx_packet:info(reason_code, _PktVar)
|
|
|
},
|
|
|
fun(PacketWithTrace) -> process_disconnect(PacketWithTrace, Channel) end
|
|
|
);
|
|
|
@@ -1320,8 +1320,8 @@ handle_out(Type, Data, Channel) ->
|
|
|
%% Return ConnAck
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
-return_connack(?CONNACK_PACKET(RC, _SessPresent) = AckPacket, Channel) ->
|
|
|
- ?EXT_TRACE_ADD_ATTRS(#{'client.connack.reason_code' => RC}),
|
|
|
+return_connack(?CONNACK_PACKET(_RC, _SessPresent) = AckPacket, Channel) ->
|
|
|
+ ?EXT_TRACE_ADD_ATTRS(#{'client.connack.reason_code' => _RC}),
|
|
|
do_return_connack(AckPacket, Channel).
|
|
|
|
|
|
do_return_connack(AckPacket, Channel) ->
|
|
|
@@ -1742,65 +1742,6 @@ overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
|
|
|
emqx_olp:backoff(Zone),
|
|
|
ok.
|
|
|
|
|
|
-%% Client Channel info not be available before `process_connect/2`
|
|
|
-%% The initial attrs should be extracted from packet and update them during `process_connect/2`
|
|
|
-connect_trace_attrs(
|
|
|
- ?PACKET(?CONNECT, #mqtt_packet_connect{
|
|
|
- proto_name = ProtoName,
|
|
|
- proto_ver = ProtoVer,
|
|
|
- is_bridge = IsBridge,
|
|
|
- clean_start = CleanStart,
|
|
|
- will_flag = WillFlag,
|
|
|
- will_qos = WillQos,
|
|
|
- will_retain = WillRetain,
|
|
|
- keepalive = KeepAlive,
|
|
|
- properties = Properties,
|
|
|
- clientid = ClientId,
|
|
|
- will_props = WillProps,
|
|
|
- will_topic = WillTopic,
|
|
|
- will_payload = _,
|
|
|
- username = Username,
|
|
|
- password = _
|
|
|
- }),
|
|
|
- Channel
|
|
|
-) ->
|
|
|
- #{
|
|
|
- 'client.clientid' => ClientId,
|
|
|
- 'client.username' => Username,
|
|
|
- 'client.proto_name' => ProtoName,
|
|
|
- 'client.proto_ver' => ProtoVer,
|
|
|
- 'client.is_bridge' => IsBridge,
|
|
|
- 'client.clean_start' => CleanStart,
|
|
|
- 'client.will_flag' => WillFlag,
|
|
|
- 'client.will_qos' => WillQos,
|
|
|
- 'client.will_retain' => WillRetain,
|
|
|
- 'client.keepalive' => KeepAlive,
|
|
|
- 'client.conn_props' => emqx_utils_json:encode(Properties),
|
|
|
- 'client.will_props' => emqx_utils_json:encode(WillProps),
|
|
|
- 'client.will_topic' => WillTopic,
|
|
|
- 'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
|
|
|
- 'client.peername' => emqx_utils:ntoa(info(peername, Channel))
|
|
|
- }.
|
|
|
-
|
|
|
-basic_trace_attrs(Channel) ->
|
|
|
- #{
|
|
|
- 'client.clientid' => info(clientid, Channel),
|
|
|
- 'client.username' => info(username, Channel)
|
|
|
- }.
|
|
|
-
|
|
|
-serialize_topic_filters(?PACKET(?SUBSCRIBE, PktVar)) ->
|
|
|
- TFs = [
|
|
|
- SubOpts#{topic => emqx_topic:maybe_format_share(Name)}
|
|
|
- || {Name, SubOpts} <- emqx_packet:info(topic_filters, PktVar)
|
|
|
- ],
|
|
|
- emqx_utils_json:encode(TFs);
|
|
|
-serialize_topic_filters(?PACKET(?UNSUBSCRIBE, PktVar)) ->
|
|
|
- TFs = [
|
|
|
- emqx_topic:maybe_format_share(Name)
|
|
|
- || Name <- emqx_packet:info(topic_filters, PktVar)
|
|
|
- ],
|
|
|
- emqx_utils_json:encode(TFs).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Enrich MQTT Connect Info
|
|
|
|
|
|
@@ -2221,21 +2162,6 @@ merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_ma
|
|
|
),
|
|
|
fix_mountpoint(NewClientInfo).
|
|
|
|
|
|
-authn_attrs({continue, _Properties, _Channel}) ->
|
|
|
- %% TODO
|
|
|
- #{};
|
|
|
-authn_attrs({ok, _Properties, Channel}) ->
|
|
|
- #{
|
|
|
- 'client.connect.authn.result' => ok,
|
|
|
- 'client.connect.authn.is_superuser' => info(is_superuser, Channel),
|
|
|
- 'client.connect.authn.expire_at' => info(expire_at, Channel)
|
|
|
- };
|
|
|
-authn_attrs({error, _Reason}) ->
|
|
|
- #{
|
|
|
- 'client.connect.authn.result' => error,
|
|
|
- 'client.connect.authn.failure_reason' => emqx_utils:readable_error_msg(_Reason)
|
|
|
- }.
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Process Topic Alias
|
|
|
|
|
|
@@ -2487,12 +2413,6 @@ do_check_sub_authzs2(ClientInfo, [TopicFilter = {Topic, _SubOpts} | More], Acc)
|
|
|
do_check_sub_authzs2(ClientInfo, More, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
|
|
|
end.
|
|
|
|
|
|
-trace_authz_result_attrs(CheckResult) ->
|
|
|
- emqx_utils_json:encode([
|
|
|
- #{topic => emqx_topic:maybe_format_share(TopicFilter), reason_code => RC}
|
|
|
- || {{TopicFilter, _SubOpts}, RC} <- CheckResult
|
|
|
- ]).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Check Sub Caps
|
|
|
|
|
|
@@ -3156,6 +3076,95 @@ proto_ver(_Reason, #{proto_ver := ProtoVer}) ->
|
|
|
proto_ver(_, _) ->
|
|
|
?MQTT_PROTO_V4.
|
|
|
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+%% External Trace Helpers
|
|
|
+%%--------------------------------------------------------------------
|
|
|
+
|
|
|
+%% Client Channel info not be available before `process_connect/2`
|
|
|
+%% The initial attrs should be extracted from packet and update them during `process_connect/2`
|
|
|
+-if(?EMQX_RELEASE_EDITION == ee).
|
|
|
+
|
|
|
+connect_trace_attrs(
|
|
|
+ ?PACKET(?CONNECT, #mqtt_packet_connect{
|
|
|
+ proto_name = ProtoName,
|
|
|
+ proto_ver = ProtoVer,
|
|
|
+ is_bridge = IsBridge,
|
|
|
+ clean_start = CleanStart,
|
|
|
+ will_flag = WillFlag,
|
|
|
+ will_qos = WillQos,
|
|
|
+ will_retain = WillRetain,
|
|
|
+ keepalive = KeepAlive,
|
|
|
+ properties = Properties,
|
|
|
+ clientid = ClientId,
|
|
|
+ will_props = WillProps,
|
|
|
+ will_topic = WillTopic,
|
|
|
+ will_payload = _,
|
|
|
+ username = Username,
|
|
|
+ password = _
|
|
|
+ }),
|
|
|
+ Channel
|
|
|
+) ->
|
|
|
+ #{
|
|
|
+ 'client.clientid' => ClientId,
|
|
|
+ 'client.username' => Username,
|
|
|
+ 'client.proto_name' => ProtoName,
|
|
|
+ 'client.proto_ver' => ProtoVer,
|
|
|
+ 'client.is_bridge' => IsBridge,
|
|
|
+ 'client.clean_start' => CleanStart,
|
|
|
+ 'client.will_flag' => WillFlag,
|
|
|
+ 'client.will_qos' => WillQos,
|
|
|
+ 'client.will_retain' => WillRetain,
|
|
|
+ 'client.keepalive' => KeepAlive,
|
|
|
+ 'client.conn_props' => emqx_utils_json:encode(Properties),
|
|
|
+ 'client.will_props' => emqx_utils_json:encode(WillProps),
|
|
|
+ 'client.will_topic' => WillTopic,
|
|
|
+ 'client.sockname' => emqx_utils:ntoa(info(sockname, Channel)),
|
|
|
+ 'client.peername' => emqx_utils:ntoa(info(peername, Channel))
|
|
|
+ }.
|
|
|
+
|
|
|
+basic_trace_attrs(Channel) ->
|
|
|
+ #{
|
|
|
+ 'client.clientid' => info(clientid, Channel),
|
|
|
+ 'client.username' => info(username, Channel)
|
|
|
+ }.
|
|
|
+
|
|
|
+serialize_topic_filters(?PACKET(?SUBSCRIBE, PktVar)) ->
|
|
|
+ TFs = [
|
|
|
+ SubOpts#{topic => emqx_topic:maybe_format_share(Name)}
|
|
|
+ || {Name, SubOpts} <- emqx_packet:info(topic_filters, PktVar)
|
|
|
+ ],
|
|
|
+ emqx_utils_json:encode(TFs);
|
|
|
+serialize_topic_filters(?PACKET(?UNSUBSCRIBE, PktVar)) ->
|
|
|
+ TFs = [
|
|
|
+ emqx_topic:maybe_format_share(Name)
|
|
|
+ || Name <- emqx_packet:info(topic_filters, PktVar)
|
|
|
+ ],
|
|
|
+ emqx_utils_json:encode(TFs).
|
|
|
+
|
|
|
+authn_attrs({continue, _Properties, _Channel}) ->
|
|
|
+ %% TODO
|
|
|
+ #{};
|
|
|
+authn_attrs({ok, _Properties, Channel}) ->
|
|
|
+ #{
|
|
|
+ 'client.connect.authn.result' => ok,
|
|
|
+ 'client.connect.authn.is_superuser' => info(is_superuser, Channel),
|
|
|
+ 'client.connect.authn.expire_at' => info(expire_at, Channel)
|
|
|
+ };
|
|
|
+authn_attrs({error, _Reason}) ->
|
|
|
+ #{
|
|
|
+ 'client.connect.authn.result' => error,
|
|
|
+ 'client.connect.authn.failure_reason' => emqx_utils:readable_error_msg(_Reason)
|
|
|
+ }.
|
|
|
+
|
|
|
+trace_authz_result_attrs(CheckResult) ->
|
|
|
+ emqx_utils_json:encode([
|
|
|
+ #{topic => emqx_topic:maybe_format_share(TopicFilter), reason_code => RC}
|
|
|
+ || {{TopicFilter, _SubOpts}, RC} <- CheckResult
|
|
|
+ ]).
|
|
|
+
|
|
|
+-else.
|
|
|
+-endif.
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% For CT tests
|
|
|
%%--------------------------------------------------------------------
|