Просмотр исходного кода

Merge pull request #12114 from JimMoen/EMQX-10793-exhook-clientinfo-peerport

EMQX-10793 exhook clientinfo peerport
JimMoen 2 лет назад
Родитель
Сommit
19bc3c141f

+ 2 - 1
apps/emqx/src/emqx_channel.erl

@@ -213,7 +213,7 @@ caps(#channel{clientinfo = #{zone := Zone}}) ->
 -spec init(emqx_types:conninfo(), opts()) -> channel().
 -spec init(emqx_types:conninfo(), opts()) -> channel().
 init(
 init(
     ConnInfo = #{
     ConnInfo = #{
-        peername := {PeerHost, _Port},
+        peername := {PeerHost, PeerPort},
         sockname := {_Host, SockPort}
         sockname := {_Host, SockPort}
     },
     },
     #{
     #{
@@ -237,6 +237,7 @@ init(
             listener => ListenerId,
             listener => ListenerId,
             protocol => Protocol,
             protocol => Protocol,
             peerhost => PeerHost,
             peerhost => PeerHost,
+            peerport => PeerPort,
             sockport => SockPort,
             sockport => SockPort,
             clientid => undefined,
             clientid => undefined,
             username => undefined,
             username => undefined,

+ 18 - 1
apps/emqx/test/emqx_channel_SUITE.erl

@@ -72,6 +72,22 @@ t_chan_info(_) ->
         conn_state := connected,
         conn_state := connected,
         clientinfo := ClientInfo
         clientinfo := ClientInfo
     } = emqx_channel:info(channel()),
     } = emqx_channel:info(channel()),
+    ?assertMatch(
+        #{
+            zone := default,
+            listener := {tcp, default},
+            protocol := mqtt,
+            peerhost := {127, 0, 0, 1},
+            peerport := 3456,
+            sockport := 1883,
+            clientid := <<"clientid">>,
+            username := <<"username">>,
+            is_superuser := false,
+            is_bridge := false,
+            mountpoint := undefined
+        },
+        ClientInfo
+    ),
     ?assertEqual(clientinfo(), ClientInfo).
     ?assertEqual(clientinfo(), ClientInfo).
 
 
 t_chan_caps(_) ->
 t_chan_caps(_) ->
@@ -1063,7 +1079,8 @@ clientinfo(InitProps) ->
             listener => {tcp, default},
             listener => {tcp, default},
             protocol => mqtt,
             protocol => mqtt,
             peerhost => {127, 0, 0, 1},
             peerhost => {127, 0, 0, 1},
-            sockport => 3456,
+            peerport => 3456,
+            sockport => 1883,
             clientid => <<"clientid">>,
             clientid => <<"clientid">>,
             username => <<"username">>,
             username => <<"username">>,
             is_superuser => false,
             is_superuser => false,

+ 1 - 0
apps/emqx/test/emqx_proper_types.erl

@@ -108,6 +108,7 @@ clientinfo() ->
         {zone, zone()},
         {zone, zone()},
         {protocol, protocol()},
         {protocol, protocol()},
         {peerhost, ip()},
         {peerhost, ip()},
+        {peerport, port()},
         {sockport, port()},
         {sockport, port()},
         {clientid, clientid()},
         {clientid, clientid()},
         {username, username()},
         {username, username()},

+ 27 - 4
apps/emqx_exhook/priv/protos/exhook.proto

@@ -148,6 +148,9 @@ message ClientAuthorizeRequest {
 
 
   AuthorizeReqType type = 2;
   AuthorizeReqType type = 2;
 
 
+  // In ClientAuthorizeRequest.
+  // Only "real-topic" will be serialized in gRPC request when shared-sub.
+  // For example, when client subscribes to `$share/group/t/1`, the real topic is `t/1`.
   string topic = 3;
   string topic = 3;
 
 
   bool result = 4;
   bool result = 4;
@@ -368,6 +371,8 @@ message ConnInfo {
   string proto_ver = 7;
   string proto_ver = 7;
 
 
   uint32 keepalive = 8;
   uint32 keepalive = 8;
+
+  uint32 peerport = 9;
 }
 }
 
 
 message ClientInfo {
 message ClientInfo {
@@ -397,6 +402,8 @@ message ClientInfo {
 
 
   // subject of client TLS cert
   // subject of client TLS cert
   string dn = 12;
   string dn = 12;
+
+  uint32 peerport = 13;
 }
 }
 
 
 message Message {
 message Message {
@@ -452,7 +459,14 @@ message TopicFilter {
 
 
   string name = 1;
   string name = 1;
 
 
-  uint32 qos = 2;
+  // Deprecated
+  // Since EMQX 5.4.0, we have deprecated the 'qos' field in the `TopicFilter` structure.
+  // A new field named 'subopts,' has been added to encompass all subscription options.
+  // Please see the `SubOpts` structure for details.
+  reserved 2;
+  reserved "qos";
+
+  SubOpts subopts = 3;
 }
 }
 
 
 message SubOpts {
 message SubOpts {
@@ -460,11 +474,20 @@ message SubOpts {
   // The QoS level
   // The QoS level
   uint32 qos = 1;
   uint32 qos = 1;
 
 
-  // deprecated
+  // Deprecated
   reserved 2;
   reserved 2;
   reserved "share";
   reserved "share";
-  // The group name for shared subscription
-  // string share = 2;
+  // Since EMQX 5.4.0, we have deprecated the 'share' field in the `SubOpts` structure.
+  // The group name of shared subscription will be serialized with topic.
+  // hooks:
+  //     "client.subscribe":
+  //         ClientSubscribeRequest.TopicFilter.name = "$share/group/topic/1"
+  //     "client.unsubscribe":
+  //         ClientUnsubscribeRequest.TopicFilter.name = "$share/group/topic/1"
+  //     "session.subscribed":
+  //         SessionSubscribedRequest.topic = "$share/group/topic/1"
+  //     "session.unsubscribed":
+  //         SessionUnsubscribedRequest.topic = "$share/group/topic/1"
 
 
   // The Retain Handling option (MQTT v5.0)
   // The Retain Handling option (MQTT v5.0)
   //
   //

+ 16 - 7
apps/emqx_exhook/src/emqx_exhook_handler.erl

@@ -143,7 +143,7 @@ on_client_authorize(ClientInfo, Action, Topic, Result) ->
     Req = #{
     Req = #{
         clientinfo => clientinfo(ClientInfo),
         clientinfo => clientinfo(ClientInfo),
         type => Type,
         type => Type,
-        topic => emqx_topic:maybe_format_share(Topic),
+        topic => emqx_topic:get_shared_real_topic(Topic),
         result => Bool
         result => Bool
     },
     },
     case
     case
@@ -192,7 +192,7 @@ on_session_subscribed(ClientInfo, Topic, SubOpts) ->
     Req = #{
     Req = #{
         clientinfo => clientinfo(ClientInfo),
         clientinfo => clientinfo(ClientInfo),
         topic => emqx_topic:maybe_format_share(Topic),
         topic => emqx_topic:maybe_format_share(Topic),
-        subopts => maps:with([qos, rh, rap, nl], SubOpts)
+        subopts => subopts(SubOpts)
     },
     },
     cast('session.subscribed', Req).
     cast('session.subscribed', Req).
 
 
@@ -200,6 +200,7 @@ on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
     Req = #{
     Req = #{
         clientinfo => clientinfo(ClientInfo),
         clientinfo => clientinfo(ClientInfo),
         topic => emqx_topic:maybe_format_share(Topic)
         topic => emqx_topic:maybe_format_share(Topic)
+        %% no subopts when unsub
     },
     },
     cast('session.unsubscribed', Req).
     cast('session.unsubscribed', Req).
 
 
@@ -294,7 +295,7 @@ conninfo(
     ConnInfo =
     ConnInfo =
         #{
         #{
             clientid := ClientId,
             clientid := ClientId,
-            peername := {Peerhost, _},
+            peername := {Peerhost, PeerPort},
             sockname := {_, SockPort}
             sockname := {_, SockPort}
         }
         }
 ) ->
 ) ->
@@ -307,6 +308,7 @@ conninfo(
         clientid => ClientId,
         clientid => ClientId,
         username => maybe(Username),
         username => maybe(Username),
         peerhost => ntoa(Peerhost),
         peerhost => ntoa(Peerhost),
+        peerport => PeerPort,
         sockport => SockPort,
         sockport => SockPort,
         proto_name => ProtoName,
         proto_name => ProtoName,
         proto_ver => stringfy(ProtoVer),
         proto_ver => stringfy(ProtoVer),
@@ -319,6 +321,7 @@ clientinfo(
             clientid := ClientId,
             clientid := ClientId,
             username := Username,
             username := Username,
             peerhost := PeerHost,
             peerhost := PeerHost,
+            peerport := PeerPort,
             sockport := SockPort,
             sockport := SockPort,
             protocol := Protocol,
             protocol := Protocol,
             mountpoint := Mountpoiont
             mountpoint := Mountpoiont
@@ -330,6 +333,7 @@ clientinfo(
         username => maybe(Username),
         username => maybe(Username),
         password => maybe(maps:get(password, ClientInfo, undefined)),
         password => maybe(maps:get(password, ClientInfo, undefined)),
         peerhost => ntoa(PeerHost),
         peerhost => ntoa(PeerHost),
+        peerport => PeerPort,
         sockport => SockPort,
         sockport => SockPort,
         protocol => stringfy(Protocol),
         protocol => stringfy(Protocol),
         mountpoint => maybe(Mountpoiont),
         mountpoint => maybe(Mountpoiont),
@@ -413,14 +417,19 @@ enrich_header(Headers, Message) ->
     end.
     end.
 
 
 topicfilters(Tfs) when is_list(Tfs) ->
 topicfilters(Tfs) when is_list(Tfs) ->
-    GetQos = fun(SubOpts) ->
-        maps:get(qos, SubOpts, 0)
-    end,
     [
     [
-        #{name => emqx_topic:maybe_format_share(Topic), qos => GetQos(SubOpts)}
+        #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
      || {Topic, SubOpts} <- Tfs
      || {Topic, SubOpts} <- Tfs
     ].
     ].
 
 
+subopts(SubOpts) ->
+    #{
+        qos => maps:get(qos, SubOpts, 0),
+        rh => maps:get(rh, SubOpts, 0),
+        rap => maps:get(rap, SubOpts, 0),
+        nl => maps:get(nl, SubOpts, 0)
+    }.
+
 ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
 ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
     list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
     list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
 ntoa(IP) ->
 ntoa(IP) ->

+ 2 - 0
apps/emqx_exhook/test/emqx_exhook_SUITE.erl

@@ -118,6 +118,7 @@ t_access_failed_if_no_server_running(Config) ->
         clientid => <<"user-id-1">>,
         clientid => <<"user-id-1">>,
         username => <<"usera">>,
         username => <<"usera">>,
         peerhost => {127, 0, 0, 1},
         peerhost => {127, 0, 0, 1},
+        peerport => 3456,
         sockport => 1883,
         sockport => 1883,
         protocol => mqtt,
         protocol => mqtt,
         mountpoint => undefined
         mountpoint => undefined
@@ -301,6 +302,7 @@ t_simulated_handler(_) ->
         clientid => <<"user-id-1">>,
         clientid => <<"user-id-1">>,
         username => <<"usera">>,
         username => <<"usera">>,
         peerhost => {127, 0, 0, 1},
         peerhost => {127, 0, 0, 1},
+        peerport => 3456,
         sockport => 1883,
         sockport => 1883,
         protocol => mqtt,
         protocol => mqtt,
         mountpoint => undefined
         mountpoint => undefined

+ 9 - 1
apps/emqx_exhook/test/props/prop_exhook_hooks.erl

@@ -496,6 +496,9 @@ nodestr() ->
 peerhost(#{peername := {Host, _}}) ->
 peerhost(#{peername := {Host, _}}) ->
     ntoa(Host).
     ntoa(Host).
 
 
+peerport(#{peername := {_, Port}}) ->
+    Port.
+
 sockport(#{sockname := {_, Port}}) ->
 sockport(#{sockname := {_, Port}}) ->
     Port.
     Port.
 
 
@@ -527,7 +530,10 @@ properties(M) when is_map(M) ->
     ).
     ).
 
 
 topicfilters(Tfs) when is_list(Tfs) ->
 topicfilters(Tfs) when is_list(Tfs) ->
-    [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
+    [
+        #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
+     || {Topic, SubOpts} <- Tfs
+    ].
 
 
 %% @private
 %% @private
 stringfy(Term) when is_binary(Term) ->
 stringfy(Term) when is_binary(Term) ->
@@ -564,6 +570,7 @@ from_conninfo(ConnInfo) ->
         clientid => maps:get(clientid, ConnInfo),
         clientid => maps:get(clientid, ConnInfo),
         username => maybe(maps:get(username, ConnInfo, <<>>)),
         username => maybe(maps:get(username, ConnInfo, <<>>)),
         peerhost => peerhost(ConnInfo),
         peerhost => peerhost(ConnInfo),
+        peerport => peerport(ConnInfo),
         sockport => sockport(ConnInfo),
         sockport => sockport(ConnInfo),
         proto_name => maps:get(proto_name, ConnInfo),
         proto_name => maps:get(proto_name, ConnInfo),
         proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
         proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
@@ -577,6 +584,7 @@ from_clientinfo(ClientInfo) ->
         username => maybe(maps:get(username, ClientInfo, <<>>)),
         username => maybe(maps:get(username, ClientInfo, <<>>)),
         password => maybe(maps:get(password, ClientInfo, <<>>)),
         password => maybe(maps:get(password, ClientInfo, <<>>)),
         peerhost => ntoa(maps:get(peerhost, ClientInfo)),
         peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+        peerport => maps:get(peerport, ClientInfo),
         sockport => maps:get(sockport, ClientInfo),
         sockport => maps:get(sockport, ClientInfo),
         protocol => stringfy(maps:get(protocol, ClientInfo)),
         protocol => stringfy(maps:get(protocol, ClientInfo)),
         mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
         mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),

+ 6 - 0
changes/feat-12114.en.md

@@ -0,0 +1,6 @@
+Added the `peerport` field to ClientInfo.
+Added the `peerport` field to the messages `ClientInfo` and `ConnInfo` in ExHook.
+
+## Breaking changes
+* ExHook Proto changed. The `qos` field in message `TopicFilter` was deprecated.
+  ExHook Server will now receive full subscription options: `qos`, `rh`, `rap`, `nl` in message `SubOpts`