|
@@ -282,36 +282,40 @@ prioritise_info(Msg, _Len, _State) ->
|
|
|
handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
|
|
handle_call({subscribe, TopicTable0}, _From, Session = #session{client_id = ClientId,
|
|
|
subscriptions = Subscriptions}) ->
|
|
subscriptions = Subscriptions}) ->
|
|
|
|
|
|
|
|
- TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
|
|
|
|
-
|
|
|
|
|
- %% subscribe first and don't care if the subscriptions have been existed
|
|
|
|
|
- {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
|
|
|
|
-
|
|
|
|
|
- emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
|
|
|
|
-
|
|
|
|
|
- lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p",
|
|
|
|
|
- [ClientId, TopicTable, GrantedQos]),
|
|
|
|
|
-
|
|
|
|
|
- Subscriptions1 =
|
|
|
|
|
- lists:foldl(fun({Topic, Qos}, Acc) ->
|
|
|
|
|
- case lists:keyfind(Topic, 1, Acc) of
|
|
|
|
|
- {Topic, Qos} ->
|
|
|
|
|
- lager:warning([{client, ClientId}], "Session(~s): "
|
|
|
|
|
- "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc;
|
|
|
|
|
- {Topic, OldQos} ->
|
|
|
|
|
- lager:warning([{client, ClientId}], "Session(~s): "
|
|
|
|
|
- "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]),
|
|
|
|
|
- lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
|
|
|
|
|
- false ->
|
|
|
|
|
- %%TODO: the design is ugly, rewrite later...:(
|
|
|
|
|
- %% <MQTT V3.1.1>: 3.8.4
|
|
|
|
|
- %% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
|
|
|
|
- %% a new Subscription is created and all matching retained messages are sent.
|
|
|
|
|
- emqttd_retained:dispatch(Topic, self()),
|
|
|
|
|
- [{Topic, Qos} | Acc]
|
|
|
|
|
- end
|
|
|
|
|
- end, Subscriptions, TopicTable),
|
|
|
|
|
- {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
|
|
|
|
|
|
|
+ case TopicTable0 -- Subscriptions of
|
|
|
|
|
+ [] ->
|
|
|
|
|
+ {reply, {ok, [Qos || {_, Qos} <- TopicTable0]}, Session};
|
|
|
|
|
+ _ ->
|
|
|
|
|
+ TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
|
|
|
|
+ %% subscribe first and don't care if the subscriptions have been existed
|
|
|
|
|
+ {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
|
|
|
|
+
|
|
|
|
|
+ emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
|
|
|
|
+
|
|
|
|
|
+ lager:info([{client, ClientId}], "Session(~s): subscribe ~p, Granted QoS: ~p",
|
|
|
|
|
+ [ClientId, TopicTable, GrantedQos]),
|
|
|
|
|
+
|
|
|
|
|
+ Subscriptions1 =
|
|
|
|
|
+ lists:foldl(fun({Topic, Qos}, Acc) ->
|
|
|
|
|
+ case lists:keyfind(Topic, 1, Acc) of
|
|
|
|
|
+ {Topic, Qos} ->
|
|
|
|
|
+ lager:warning([{client, ClientId}], "Session(~s): "
|
|
|
|
|
+ "resubscribe ~s, qos = ~w", [ClientId, Topic, Qos]), Acc;
|
|
|
|
|
+ {Topic, OldQos} ->
|
|
|
|
|
+ lager:warning([{client, ClientId}], "Session(~s): "
|
|
|
|
|
+ "resubscribe ~s, old qos=~w, new qos=~w", [ClientId, Topic, OldQos, Qos]),
|
|
|
|
|
+ lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
|
|
|
|
|
+ false ->
|
|
|
|
|
+ %%TODO: the design is ugly, rewrite later...:(
|
|
|
|
|
+ %% <MQTT V3.1.1>: 3.8.4
|
|
|
|
|
+ %% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
|
|
|
|
+ %% a new Subscription is created and all matching retained messages are sent.
|
|
|
|
|
+ emqttd_retained:dispatch(Topic, self()),
|
|
|
|
|
+ [{Topic, Qos} | Acc]
|
|
|
|
|
+ end
|
|
|
|
|
+ end, Subscriptions, TopicTable),
|
|
|
|
|
+ {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}
|
|
|
|
|
+ end;
|
|
|
|
|
|
|
|
handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId,
|
|
handle_call({unsubscribe, Topics0}, _From, Session = #session{client_id = ClientId,
|
|
|
subscriptions = Subscriptions}) ->
|
|
subscriptions = Subscriptions}) ->
|