Преглед изворни кода

Handle the retained flag correctly (#2811)

Handle the retained flag correctly
Mousse пре 6 година
родитељ
комит
bf942e4bec
3 измењених фајлова са 11 додато и 8 уклоњено
  1. 2 2
      src/emqx_channel.erl
  2. 3 2
      src/emqx_packet.erl
  3. 6 4
      src/emqx_session.erl

+ 2 - 2
src/emqx_channel.erl

@@ -407,10 +407,10 @@ process_connect(ConnPkt, Channel) ->
 
 %% Process Publish
 process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId),
-                Channel = #channel{client = Client}) ->
+                Channel = #channel{client = Client, proto_ver = ProtoVer}) ->
     Msg = emqx_packet:to_message(Client, Packet),
     %%TODO: Improve later.
-    Msg1 = emqx_message:set_flag(dup, false, Msg),
+    Msg1 = emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, ProtoVer, Msg)),
     process_publish(PacketId, mount(Client, Msg1), Channel).
 
 process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->

+ 3 - 2
src/emqx_packet.erl

@@ -165,10 +165,11 @@ will_msg(#mqtt_packet_connect{client_id    = ClientId,
                               will_qos     = QoS,
                               will_topic   = Topic,
                               will_props   = Properties,
-                              will_payload = Payload}) ->
+                              will_payload = Payload,
+                              proto_ver    = ProtoVer}) ->
     Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
     Msg#message{flags = #{dup => false, retain => Retain},
-                headers = merge_props(#{username => Username}, Properties)}.
+                headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}.
 
 merge_props(Headers, undefined) ->
     Headers;

+ 6 - 4
src/emqx_session.erl

@@ -567,12 +567,14 @@ enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{up
     enrich(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
 enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) ->
     enrich(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
-enrich([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, Session = #session{}) ->
-    enrich(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, Session);
-enrich([{rap, 0}|Opts], Msg = #message{flags = Flags}, Session) ->
+enrich([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
     enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
-enrich([{rap, _}|Opts], Msg, Session) ->
+enrich([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) ->
     enrich(Opts, Msg, Session);
+enrich([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) ->
+    enrich(Opts, Msg, Session);
+enrich([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) ->
+    enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session);
 enrich([{subid, SubId}|Opts], Msg, Session) ->
     enrich(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), Session).