Просмотр исходного кода

Fix issue#1293 - the retained flags should be propagated for bridge.

Feng Lee 8 лет назад
Родитель
Сommit
26fb809dbe
4 измененных файлов с 32 добавлено и 23 удалено
  1. 2 1
      include/emqttd_protocol.hrl
  2. 3 2
      src/emqttd_parser.erl
  3. 16 4
      src/emqttd_protocol.erl
  4. 11 16
      src/emqttd_session.erl

+ 2 - 1
include/emqttd_protocol.hrl

@@ -174,7 +174,8 @@
           will_topic  = undefined      :: undefined | binary(),
           will_msg    = undefined      :: undefined | binary(),
           username    = undefined      :: undefined | binary(),
-          password    = undefined      :: undefined | binary()
+          password    = undefined      :: undefined | binary(),
+          is_bridge   = false          :: boolean()
         }).
 
 -record(mqtt_packet_connack,

+ 3 - 2
src/emqttd_parser.erl

@@ -79,7 +79,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos  = Qos} = Header, Length)
         {?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
             {ProtoName, Rest1} = parse_utf(FrameBin),
             %% Fix mosquitto bridge: 0x83, 0x84
-            <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1,
+            <<BridgeTag:4, ProtoVersion:4, Rest2/binary>> = Rest1,
             <<UsernameFlag : 1,
               PasswordFlag : 1,
               WillRetain   : 1,
@@ -109,7 +109,8 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos  = Qos} = Header, Length)
                            will_topic  = WillTopic,
                            will_msg    = WillMsg,
                            username    = UserName,
-                           password    = PasssWord}, Rest);
+                           password    = PasssWord,
+                           is_bridge   = (BridgeTag =:= 8)}, Rest);
                false ->
                     {error, protocol_header_corrupt}
             end;

+ 16 - 4
src/emqttd_protocol.erl

@@ -44,7 +44,7 @@
                       clean_sess, proto_ver, proto_name, username, is_superuser,
                       will_msg, keepalive, keepalive_backoff, max_clientid_len,
                       session, stats_data, mountpoint, ws_initial_headers,
-                      connected_at}).
+                      is_bridge, connected_at}).
 
 -type(proto_state() :: #proto_state{}).
 
@@ -180,7 +180,8 @@ process(?CONNECT_PACKET(Var), State0) ->
                          password   = Password,
                          clean_sess = CleanSess,
                          keep_alive = KeepAlive,
-                         client_id  = ClientId} = Var,
+                         client_id  = ClientId,
+                         is_bridge  = IsBridge} = Var,
 
     State1 = State0#proto_state{proto_ver    = ProtoVer,
                                 proto_name   = ProtoName,
@@ -189,6 +190,7 @@ process(?CONNECT_PACKET(Var), State0) ->
                                 clean_sess   = CleanSess,
                                 keepalive    = KeepAlive,
                                 will_msg     = willmsg(Var, State0),
+                                is_bridge    = IsBridge,
                                 connected_at = os:timestamp()},
 
     {ReturnCode1, SessPresent, State3} =
@@ -333,10 +335,11 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
 -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
 send(Msg, State = #proto_state{client_id  = ClientId,
                                username   = Username,
-                               mountpoint = MountPoint})
+                               mountpoint = MountPoint,
+                               is_bridge  = IsBridge})
         when is_record(Msg, mqtt_message) ->
     emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
-    send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State);
+    send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
 
 send(Packet = ?PACKET(Type),
      State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
@@ -543,6 +546,15 @@ check_acl(subscribe, Topic, Client) ->
 sp(true)  -> 1;
 sp(false) -> 0.
 
+%%--------------------------------------------------------------------
+%% The retained flag should be propagated for bridge.
+%%--------------------------------------------------------------------
+
+clean_retain(false, Msg = #mqtt_message{retain = true}) ->
+    Msg#mqtt_message{retain = false};
+clean_retain(true, Msg) ->
+    Msg.
+
 %%--------------------------------------------------------------------
 %% Mount Point
 %%--------------------------------------------------------------------

+ 11 - 16
src/emqttd_session.erl

@@ -152,9 +152,10 @@
          %% Force GC Count
          force_gc_count :: undefined | integer(),
 
-         created_at :: erlang:timestamp(),
+         %% Ignore loop deliver?
+         ignore_loop_deliver = false :: boolean(),
 
-         ignore_loop_deliver = false :: boolean()
+         created_at :: erlang:timestamp()
         }).
 
 -define(TIMEOUT, 60000).
@@ -529,17 +530,14 @@ handle_cast({destroy, ClientId},
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
-%% Dispatch message from self publish
-handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, 
-             State = #state{client_id = ClientId, 
-                            ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) ->
-    case IgnoreLoopDeliver of
-        true  -> {noreply, State, hibernate};
-        false -> {noreply, handle_dispatch(Topic, Msg, State), hibernate}
-    end;
+%% Ignore Messages delivered by self
+handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}},
+             State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
+    hibernate(State);
+
 %% Dispatch Message
 handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
-    {noreply, handle_dispatch(Topic, Msg, State), hibernate};
+    hibernate(gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)));
 
 %% Do nothing if the client has been disconnected.
 handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
@@ -552,7 +550,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
     hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
 
 handle_info({timeout, _Timer, expired}, State) ->
-    ?LOG(debug, "Expired, shutdown now.", [], State),
+    ?LOG(info, "Expired, shutdown now.", [], State),
     shutdown(expired, State);
 
 handle_info({'EXIT', ClientPid, _Reason},
@@ -563,7 +561,7 @@ handle_info({'EXIT', ClientPid, Reason},
             State = #state{clean_sess      = false,
                            client_pid      = ClientPid,
                            expiry_interval = Interval}) ->
-    ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
+    ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
     ExpireTimer = start_timer(Interval, expired),
     State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
     hibernate(emit_stats(State1));
@@ -687,9 +685,6 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen})
 %% Dispatch Messages
 %%--------------------------------------------------------------------
 
-handle_dispatch(Topic, Msg, State) ->
-    gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)).
-
 %% Enqueue message if the client has been disconnected
 dispatch(Msg, State = #state{client_pid = undefined}) ->
     enqueue_msg(Msg, State);