|
@@ -170,9 +170,9 @@ destroy(SessPid, ClientId) ->
|
|
|
subscribe(SessPid, TopicTable) ->
|
|
subscribe(SessPid, TopicTable) ->
|
|
|
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
|
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
|
|
|
|
|
|
|
--spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok.
|
|
|
|
|
-subscribe(SessPid, TopicTable, Callback) ->
|
|
|
|
|
- gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}).
|
|
|
|
|
|
|
+-spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok.
|
|
|
|
|
+subscribe(SessPid, TopicTable, AckFun) ->
|
|
|
|
|
+ gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% @doc Publish message
|
|
%% @doc Publish message
|
|
@@ -296,20 +296,20 @@ handle_call(Req, _From, State) ->
|
|
|
lager:error("Unexpected Request: ~p", [Req]),
|
|
lager:error("Unexpected Request: ~p", [Req]),
|
|
|
{reply, ok, State}.
|
|
{reply, ok, State}.
|
|
|
|
|
|
|
|
-handle_cast({subscribe, TopicTable0, Callback}, Session = #session{
|
|
|
|
|
|
|
+handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{
|
|
|
client_id = ClientId, subscriptions = Subscriptions}) ->
|
|
client_id = ClientId, subscriptions = Subscriptions}) ->
|
|
|
|
|
|
|
|
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
|
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
|
|
|
|
|
|
|
case TopicTable -- Subscriptions of
|
|
case TopicTable -- Subscriptions of
|
|
|
[] ->
|
|
[] ->
|
|
|
- catch Callback([Qos || {_, Qos} <- TopicTable]),
|
|
|
|
|
|
|
+ catch AckFun([Qos || {_, Qos} <- TopicTable]),
|
|
|
noreply(Session);
|
|
noreply(Session);
|
|
|
_ ->
|
|
_ ->
|
|
|
%% subscribe first and don't care if the subscriptions have been existed
|
|
%% subscribe first and don't care if the subscriptions have been existed
|
|
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
|
|
|
|
|
|
|
- catch Callback(GrantedQos),
|
|
|
|
|
|
|
+ catch AckFun(GrantedQos),
|
|
|
|
|
|
|
|
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
|
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
|
|
|
|
|