Explorar el Código

Improve mountpoint (#2363)

turtleDeng hace 7 años
padre
commit
e6d90d5758
Se han modificado 1 ficheros con 46 adiciones y 54 borrados
  1. 46 54
      src/emqx_protocol.erl

+ 46 - 54
src/emqx_protocol.erl

@@ -57,7 +57,6 @@
           will_topic,
           will_msg,
           keepalive,
-          mountpoint,
           is_bridge,
           enable_ban,
           enable_acl,
@@ -101,7 +100,6 @@ init(SocketOpts = #{peername := Peername, peercert := Peercert, sendfun := SendF
             clean_start         = false,
             topic_aliases       = #{},
             packet_size         = emqx_zone:get_env(Zone, max_packet_size),
-            mountpoint          = emqx_zone:get_env(Zone, mountpoint),
             is_bridge           = false,
             enable_ban          = emqx_zone:get_env(Zone, enable_ban, false),
             enable_acl          = emqx_zone:get_env(Zone, enable_acl),
@@ -153,7 +151,6 @@ attrs(#pstate{zone         = Zone,
               proto_ver    = ProtoVer,
               proto_name   = ProtoName,
               keepalive    = Keepalive,
-              mountpoint   = Mountpoint,
               is_bridge    = IsBridge,
               connected_at = ConnectedAt,
               conn_mod     = ConnMod,
@@ -167,7 +164,6 @@ attrs(#pstate{zone         = Zone,
      {proto_name, ProtoName},
      {clean_start, CleanStart},
      {keepalive, Keepalive},
-     {mountpoint, Mountpoint},
      {is_bridge, IsBridge},
      {connected_at, ConnectedAt},
      {conn_mod, ConnMod},
@@ -202,8 +198,6 @@ caps(#pstate{zone = Zone}) ->
 client_id(#pstate{client_id = ClientId}) ->
     ClientId.
 
-credentials(#pstate{credentials = Credentials}) when map_size(Credentials) =/= 0 ->
-    Credentials;
 credentials(#pstate{zone      = Zone,
                     client_id = ClientId,
                     username  = Username,
@@ -212,7 +206,8 @@ credentials(#pstate{zone      = Zone,
     with_cert(#{zone => Zone,
                 client_id => ClientId,
                 username => Username,
-                peername => Peername}, Peercert).
+                peername => Peername,
+                mountpoint => emqx_zone:get_env(Zone, mountpoint)}, Peercert).
 
 with_cert(Credentials, undefined) -> Credentials;
 with_cert(Credentials, Peercert) ->
@@ -481,24 +476,12 @@ process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid})
     {ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
 
 process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
-        PState = #pstate{session = SPid, mountpoint = Mountpoint,
-                         proto_ver = ProtoVer, is_bridge = IsBridge,
-                         ignore_loop = IgnoreLoop}) ->
-    RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
-                           IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
-                           case IsBridge of
-                               true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
-                               false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
-                           end;
-                          true ->
-                              RawTopicFilters
-                       end,
-    case check_subscribe(
-           parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
+        PState = #pstate{session = SPid, credentials = Credentials}) ->
+    case check_subscribe(parse_topic_filters(?SUBSCRIBE, raw_topic_filters(PState, RawTopicFilters)), PState) of
         {ok, TopicFilters} ->
-            TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [credentials(PState)], TopicFilters),
-            ok = emqx_session:subscribe(SPid, PacketId, Properties,
-                                        emqx_mountpoint:mount(Mountpoint, TopicFilters0)),
+            TopicFilters0 = emqx_hooks:run_fold('client.subscribe', [Credentials], TopicFilters),
+            TopicFilters1 = emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters0),
+            ok = emqx_session:subscribe(SPid, PacketId, Properties, TopicFilters1),
             {ok, PState};
         {error, TopicFilters} ->
             {SubTopics, ReasonCodes} =
@@ -518,11 +501,11 @@ process(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
     end;
 
 process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
-        PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
-    TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [credentials(PState)],
+        PState = #pstate{session = SPid, credentials = Credentials}) ->
+    TopicFilters = emqx_hooks:run_fold('client.unsubscribe', [Credentials],
                                        parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)),
     ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
-                                  emqx_mountpoint:mount(MountPoint, TopicFilters)),
+                                  emqx_mountpoint:mount(mountpoint(Credentials), TopicFilters)),
     {ok, PState};
 
 process(?PACKET(?PINGREQ), PState) ->
@@ -550,12 +533,12 @@ process(?DISCONNECT_PACKET(_), PState) ->
 %% ConnAck --> Client
 %%------------------------------------------------------------------------------
 
-connack({?RC_SUCCESS, SP, PState}) ->
-    ok = emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
-    deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
+connack({?RC_SUCCESS, SP, PState = #pstate{credentials = Credentials}}) ->
+    ok = emqx_hooks:run('client.connected', [Credentials, ?RC_SUCCESS, attrs(PState)]),
+    deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
 
-connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
-    ok = emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
+connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer, credentials = Credentials}}) ->
+    ok = emqx_hooks:run('client.connected', [Credentials, ReasonCode, attrs(PState)]),
     [ReasonCode1] = reason_codes_compat(connack, [ReasonCode], ProtoVer),
     _ = deliver({connack, ReasonCode1}, PState),
     {error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
@@ -565,9 +548,9 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
 %%------------------------------------------------------------------------------
 
 do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
-           PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
-    Msg = emqx_mountpoint:mount(MountPoint,
-                                emqx_packet:to_message(credentials(PState), Packet)),
+           PState = #pstate{session = SPid, credentials = Credentials}) ->
+    Msg = emqx_mountpoint:mount(mountpoint(Credentials),
+                                emqx_packet:to_message(Credentials, Packet)),
     puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, emqx_message:set_flag(dup, false, Msg)), PState).
 
 %%------------------------------------------------------------------------------
@@ -663,10 +646,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
 deliver({connack, ReasonCode, SP}, PState) ->
     send(?CONNACK_PACKET(ReasonCode, SP), PState);
 
-deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
-    Msg0 = emqx_hooks:run_fold('message.deliver', [credentials(PState)], Msg),
+deliver({publish, PacketId, Msg}, PState = #pstate{credentials = Credentials}) ->
+    Msg0 = emqx_hooks:run_fold('message.deliver', [Credentials], Msg),
     Msg1 = emqx_message:update_expiry(Msg0),
-    Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
+    Msg2 = emqx_mountpoint:unmount(mountpoint(Credentials), Msg1),
     send(emqx_packet:from_message(PacketId, Msg2), PState);
 
 deliver({puback, PacketId, ReasonCode}, PState) ->
@@ -830,8 +813,8 @@ check_will_topic(#mqtt_packet_connect{will_topic = WillTopic} = ConnPkt, PState)
 
 check_will_acl(_ConnPkt, #pstate{enable_acl = EnableAcl}) when not EnableAcl ->
     ok;
-check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
-    case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of
+check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, #pstate{credentials = Credentials}) ->
+    case emqx_access_control:check_acl(Credentials, publish, WillTopic) of
         allow -> ok;
         deny ->
             ?LOG(warning, "Cannot publish will message to ~p for acl denied", [WillTopic]),
@@ -850,8 +833,8 @@ check_pub_caps(#mqtt_packet{header = #mqtt_packet_header{qos = QoS, retain = Ret
 check_pub_acl(_Packet, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
         when IsSuper orelse (not EnableAcl) ->
     ok;
-check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, PState) ->
-    case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
+check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #pstate{credentials = Credentials}) ->
+    case emqx_access_control:check_acl(Credentials, publish, Topic) of
         allow -> ok;
         deny -> {error, ?RC_NOT_AUTHORIZED}
     end.
@@ -877,10 +860,10 @@ check_subscribe(TopicFilters, PState = #pstate{zone = Zone}) ->
 check_sub_acl(TopicFilters, #pstate{credentials = #{is_superuser := IsSuper}, enable_acl = EnableAcl})
         when IsSuper orelse (not EnableAcl) ->
     {ok, TopicFilters};
-check_sub_acl(TopicFilters, PState) ->
+check_sub_acl(TopicFilters, #pstate{credentials = Credentials}) ->
     lists:foldr(
       fun({Topic, SubOpts}, {Ok, Acc}) ->
-              case emqx_access_control:check_acl(credentials(PState), publish, Topic) of
+              case emqx_access_control:check_acl(Credentials, publish, Topic) of
                   allow -> {Ok, [{Topic, SubOpts}|Acc]};
                   deny  ->
                       {error, [{Topic, SubOpts#{rc := ?RC_NOT_AUTHORIZED}}|Acc]}
@@ -912,9 +895,9 @@ terminate(conflict, _PState) ->
     ok;
 terminate(discard, _PState) ->
     ok;
-terminate(Reason, PState) ->
+terminate(Reason, #pstate{credentials = Credentials}) ->
     ?LOG(info, "Shutdown for ~p", [Reason]),
-    ok = emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
+    ok = emqx_hooks:run('client.disconnected', [Credentials, Reason]).
 
 start_keepalive(0, _PState) ->
     ignore;
@@ -932,14 +915,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
 parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
     lists:map(fun emqx_topic:parse/1, RawTopicFilters).
 
-%%------------------------------------------------------------------------------
-%% Update mountpoint
-
-update_mountpoint(PState = #pstate{mountpoint = undefined}) ->
-    PState;
-update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
-    PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}.
-
 sp(true)  -> 1;
 sp(false) -> 0.
 
@@ -986,3 +961,20 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
     undefined;
 reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
     [emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].
+
+raw_topic_filters(#pstate{proto_ver = ProtoVer,
+                          is_bridge = IsBridge,
+                          ignore_loop = IgnoreLoop}, RawTopicFilters) ->
+    case ProtoVer < ?MQTT_PROTO_V5 of
+        true ->
+            IfIgnoreLoop = case IgnoreLoop of true -> 1; false -> 0 end,
+            case IsBridge of
+               true -> [{RawTopic, SubOpts#{rap => 1, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters];
+               false -> [{RawTopic, SubOpts#{rap => 0, nl => IfIgnoreLoop}} || {RawTopic, SubOpts} <- RawTopicFilters]
+            end;
+        false ->
+            RawTopicFilters
+    end.
+
+mountpoint(Credentials) ->
+    maps:get(mountpoint, Credentials, undefined).