|
|
@@ -480,9 +480,7 @@ handle_in(?SUBSCRIBE_PACKET(_PacketId, _Properties, _TopicFilters0) = Packet, Ch
|
|
|
?EXT_TRACE_WITH_PROCESS_FUN(
|
|
|
client_subscribe,
|
|
|
Packet,
|
|
|
- (basic_trace_attrs(Channel))#{
|
|
|
- 'client.subscribe.topics' => serialize_topic_filters(Packet)
|
|
|
- },
|
|
|
+ maps:merge(basic_trace_attrs(Channel), topic_filters_attrs(Packet)),
|
|
|
fun(PacketWithTrace) -> process_subscribe(PacketWithTrace, Channel) end
|
|
|
);
|
|
|
handle_in(
|
|
|
@@ -492,9 +490,7 @@ handle_in(
|
|
|
?EXT_TRACE_WITH_PROCESS_FUN(
|
|
|
client_unsubscribe,
|
|
|
Packet,
|
|
|
- (basic_trace_attrs(Channel))#{
|
|
|
- 'client.unsubscribe.topics' => serialize_topic_filters(Packet)
|
|
|
- },
|
|
|
+ maps:merge(basic_trace_attrs(Channel), topic_filters_attrs(Packet)),
|
|
|
fun(PacketWithTrace) -> process_unsubscribe(PacketWithTrace, Channel) end
|
|
|
);
|
|
|
handle_in(?PACKET(?PINGREQ), Channel = #channel{keepalive = Keepalive}) ->
|
|
|
@@ -2382,16 +2378,18 @@ do_check_sub_authzs(
|
|
|
DenyAction = emqx:get_config([authorization, deny_action], ignore),
|
|
|
case DenyAction =:= disconnect andalso HasAuthzDeny of
|
|
|
true ->
|
|
|
- ?EXT_TRACE_ADD_ATTRS(#{
|
|
|
- 'authz.deny_action' => disconnect,
|
|
|
- 'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
|
|
|
- }),
|
|
|
+ ?EXT_TRACE_ADD_ATTRS(
|
|
|
+ (subscribe_authz_result_attrs(CheckResult))#{
|
|
|
+ 'authz.deny_action' => disconnect
|
|
|
+ }
|
|
|
+ ),
|
|
|
{error, {disconnect, ?RC_NOT_AUTHORIZED}, Channel};
|
|
|
false ->
|
|
|
- ?EXT_TRACE_ADD_ATTRS(#{
|
|
|
- 'authz.deny_action' => ignore,
|
|
|
- 'authz.subscribe.result' => trace_authz_result_attrs(CheckResult)
|
|
|
- }),
|
|
|
+ ?EXT_TRACE_ADD_ATTRS(
|
|
|
+ (subscribe_authz_result_attrs(CheckResult))#{
|
|
|
+ 'authz.deny_action' => ignore
|
|
|
+ }
|
|
|
+ ),
|
|
|
{ok, ?SUBSCRIBE_PACKET(PacketId, SubProps, CheckResult), Channel}
|
|
|
end.
|
|
|
|
|
|
@@ -3136,18 +3134,24 @@ basic_trace_attrs(Channel) ->
|
|
|
'client.username' => info(username, Channel)
|
|
|
}.
|
|
|
|
|
|
-serialize_topic_filters(?PACKET(?SUBSCRIBE, PktVar)) ->
|
|
|
- TFs = [
|
|
|
- SubOpts#{topic => emqx_topic:maybe_format_share(Name)}
|
|
|
- || {Name, SubOpts} <- emqx_packet:info(topic_filters, PktVar)
|
|
|
- ],
|
|
|
- emqx_utils_json:encode(TFs);
|
|
|
-serialize_topic_filters(?PACKET(?UNSUBSCRIBE, PktVar)) ->
|
|
|
+topic_filters_attrs(?PACKET(?SUBSCRIBE, PktVar)) ->
|
|
|
+ {TFs, SubOpts} = lists:foldl(
|
|
|
+ fun({Topic, SubOpts}, {AccTFs, AccSubOpts}) ->
|
|
|
+ {[emqx_topic:maybe_format_share(Topic) | AccTFs], [SubOpts | AccSubOpts]}
|
|
|
+ end,
|
|
|
+ {[], []},
|
|
|
+ emqx_packet:info(topic_filters, PktVar)
|
|
|
+ ),
|
|
|
+ #{
|
|
|
+ 'client.subscribe.topics' => emqx_utils_json:encode(lists:reverse(TFs)),
|
|
|
+ 'client.subscribe.sub_opts' => emqx_utils_json:encode(lists:reverse(SubOpts))
|
|
|
+ };
|
|
|
+topic_filters_attrs(?PACKET(?UNSUBSCRIBE, PktVar)) ->
|
|
|
TFs = [
|
|
|
emqx_topic:maybe_format_share(Name)
|
|
|
|| Name <- emqx_packet:info(topic_filters, PktVar)
|
|
|
],
|
|
|
- emqx_utils_json:encode(TFs).
|
|
|
+ #{'client.unsubscribe.topics' => emqx_utils_json:encode(TFs)}.
|
|
|
|
|
|
authn_attrs({continue, _Properties, _Channel}) ->
|
|
|
%% TODO
|
|
|
@@ -3164,11 +3168,18 @@ authn_attrs({error, _Reason}) ->
|
|
|
'client.connect.authn.failure_reason' => emqx_utils:readable_error_msg(_Reason)
|
|
|
}.
|
|
|
|
|
|
-trace_authz_result_attrs(CheckResult) ->
|
|
|
- emqx_utils_json:encode([
|
|
|
- #{topic => emqx_topic:maybe_format_share(TopicFilter), reason_code => RC}
|
|
|
- || {{TopicFilter, _SubOpts}, RC} <- CheckResult
|
|
|
- ]).
|
|
|
+subscribe_authz_result_attrs(CheckResult) ->
|
|
|
+ {TFs, AuthZRCs} = lists:foldl(
|
|
|
+ fun({{TopicFilter, _SubOpts}, RC}, {AccTFs, AccRCs}) ->
|
|
|
+ {[emqx_topic:maybe_format_share(TopicFilter) | AccTFs], [RC | AccRCs]}
|
|
|
+ end,
|
|
|
+ {[], []},
|
|
|
+ CheckResult
|
|
|
+ ),
|
|
|
+ #{
|
|
|
+ 'authz.subscribe.topics' => emqx_utils_json:encode(lists:reverse(TFs)),
|
|
|
+ 'authz.subscribe.reason_codes' => emqx_utils_json:encode(lists:reverse(AuthZRCs))
|
|
|
+ }.
|
|
|
|
|
|
-else.
|
|
|
-endif.
|