|
|
@@ -173,17 +173,31 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
|
|
|
%%------------------------------------------------------------------------------
|
|
|
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
|
|
|
subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
|
|
|
- Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)],
|
|
|
- case Resubs of
|
|
|
- [] -> ok;
|
|
|
- _ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs])
|
|
|
- end,
|
|
|
- SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
|
|
+
|
|
|
+ %% subscribe first and don't care if the subscriptions have been existed
|
|
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
|
|
+
|
|
|
lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
|
|
|
- [ClientId, Topics, GrantedQos]),
|
|
|
- %%TODO: should be gen_event and notification...
|
|
|
- [emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics],
|
|
|
+ [ClientId, Topics, GrantedQos]),
|
|
|
+
|
|
|
+
|
|
|
+ %% <MQTT V3.1.1>: 3.8.4
|
|
|
+ %% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
|
|
+ %% a new Subscription is created and all matching retained messages are sent.
|
|
|
+ lists:foreach(fun({Name, _Qos}) ->
|
|
|
+ case maps:is_key(Name, SubMap) of
|
|
|
+ true ->
|
|
|
+ lager:warning("~s resubscribe ~p", [ClientId, Name]);
|
|
|
+ false ->
|
|
|
+ %%TODO: this is not right, rewrite later...
|
|
|
+ emqttd_msg_store:redeliver(Name, self())
|
|
|
+ end
|
|
|
+ end, Topics),
|
|
|
+
|
|
|
+ SubMap1 = lists:foldl(fun({Name, Qos}, Acc) ->
|
|
|
+ maps:put(Name, Qos, Acc)
|
|
|
+ end, SubMap, Topics),
|
|
|
+
|
|
|
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
|
|
|
|
|
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|