Browse Source

do_subscribe_/4, do_unsubscribe_/3

Feng Lee 9 years atrás
parent
commit
0a967df15a
3 changed files with 30 additions and 28 deletions
  1. 4 8
      src/emqttd_pubsub.erl
  2. 11 11
      src/emqttd_server.erl
  3. 15 9
      src/emqttd_session.erl

+ 4 - 8
src/emqttd_pubsub.erl

@@ -162,18 +162,14 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 add_subscriber_(Topic, Subscriber) ->
-    case ets:member(mqtt_subscriber, Topic) of
-        false -> emqttd_router:add_route(Topic, node());
-        true  -> ok
-    end,
+    (not ets:member(mqtt_subscriber, Topic))
+        andalso emqttd_router:add_route(Topic),
     ets:insert(mqtt_subscriber, {Topic, Subscriber}).
 
 del_subscriber_(Topic, Subscriber) ->
     ets:delete_object(mqtt_subscriber, {Topic, Subscriber}),
-    case ets:member(mqtt_subscriber, Topic) of
-        false -> emqttd_router:del_route(Topic, node());
-        true  -> ok
-    end.
+    (not ets:member(mqtt_subscriber, Topic))
+        andalso emqttd_router:del_route(Topic).
 
 setstats(State) ->
     emqttd_stats:setstats('subscribers/count', 'subscribers/max',

+ 11 - 11
src/emqttd_server.erl

@@ -172,13 +172,13 @@ init([Pool, Id, Env]) ->
     {ok, #state{pool = Pool, id = Id, env = Env, submon = emqttd_pmon:new()}}.
 
 handle_call({subscribe, Topic, Subscriber, Options}, _From, State) ->
-    case subscribe_(Topic, Subscriber, Options, State) of
+    case do_subscribe_(Topic, Subscriber, Options, State) of
         {ok, NewState} -> {reply, ok, setstats(NewState)};
         {error, Error} -> {reply, {error, Error}, State}
     end;
 
 handle_call({unsubscribe, Topic, Subscriber}, _From, State) ->
-    case unsubscribe_(Topic, Subscriber, State) of
+    case do_unsubscribe_(Topic, Subscriber, State) of
         {ok, NewState} -> {reply, ok, setstats(NewState), hibernate};
         {error, Error} -> {reply, {error, Error}, State}
     end;
@@ -198,13 +198,13 @@ handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 
 handle_cast({subscribe, Topic, Subscriber, Options}, State) ->
-    case subscribe_(Topic, Subscriber, Options, State) of
+    case do_subscribe_(Topic, Subscriber, Options, State) of
         {ok, NewState}  -> {noreply, setstats(NewState)};
         {error, _Error} -> {noreply, State}
     end;
 
 handle_cast({unsubscribe, Topic, Subscriber}, State) ->
-    case unsubscribe_(Topic, Subscriber, State) of
+    case do_unsubscribe_(Topic, Subscriber, State) of
         {ok, NewState}  -> {noreply, setstats(NewState), hibernate};
         {error, _Error} -> {noreply, State}
     end;
@@ -233,7 +233,7 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal Functions
 %%--------------------------------------------------------------------
 
-subscribe_(Topic, Subscriber, Options, State) ->
+do_subscribe_(Topic, Subscriber, Options, State) ->
     case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
         [] ->
             emqttd_pubsub:async_subscribe(Topic, Subscriber),
@@ -244,7 +244,12 @@ subscribe_(Topic, Subscriber, Options, State) ->
             {error, {already_subscribed, Topic}}
     end.
 
-unsubscribe_(Topic, Subscriber, State) ->
+monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
+    State#state{submon = PMon:monitor(SubPid)};
+monitor_subpid(_SubPid, State) ->
+    State.
+
+do_unsubscribe_(Topic, Subscriber, State) ->
     case ets:lookup(mqtt_subproperty, {Topic, Subscriber}) of
         [_] ->
             emqttd_pubsub:async_unsubscribe(Topic, Subscriber),
@@ -258,11 +263,6 @@ unsubscribe_(Topic, Subscriber, State) ->
             {error, {subscription_not_found, Topic}}
     end.
 
-monitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
-    State#state{submon = PMon:monitor(SubPid)};
-monitor_subpid(_SubPid, State) ->
-    State.
-
 demonitor_subpid(SubPid, State = #state{submon = PMon}) when is_pid(SubPid) ->
     State#state{submon = PMon:demonitor(SubPid)};
 demonitor_subpid(_SubPid, State) ->

+ 15 - 9
src/emqttd_session.erl

@@ -284,14 +284,18 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
 handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 
-handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     = ClientId,
-                                                                 subscriptions = Subscriptions}) ->
-
+handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id     = ClientId,
+                                                                   subscriptions = Subscriptions}) ->
+    %% TODO: Ugly...
+    TopicTable0 = lists:map(fun({T, Q}) ->
+                                {T1, Opts} = emqttd_topic:strip(T),
+                                {T1, [{qos, Q} | Opts]}
+                            end, RawTopicTable),
     case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of
         {ok, TopicTable} ->
             ?LOG(info, "Subscribe ~p", [TopicTable], Session),
             Subscriptions1 = lists:foldl(
-                fun({Topic, Qos}, SubDict) ->
+                fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) ->
                     case dict:find(Topic, SubDict) of
                         {ok, Qos} ->
                             ?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
@@ -301,7 +305,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     =
                             ?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
                             dict:store(Topic, Qos, SubDict);
                         error ->
-                            emqttd:subscribe(Topic, ClientId, [{qos, Qos}]),
+                            emqttd:subscribe(Topic, ClientId, Opts),
                             %%TODO: the design is ugly...
                             %% <MQTT V3.1.1>: 3.8.4
                             %% Where the Topic Filter is not identical to any existing Subscription’s filter,
@@ -319,9 +323,11 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id     =
             hibernate(Session)
     end;
 
-handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
-                                                       subscriptions = Subscriptions}) ->
-
+handle_cast({unsubscribe, RawTopics}, Session = #session{client_id     = ClientId,
+                                                         subscriptions = Subscriptions}) ->
+    Topics0 = lists:map(fun(Topic) ->
+                            {T, _Opts} = emqttd_topic:strip(Topic), T
+                        end, RawTopics),
     case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of
         {ok, Topics} ->
             ?LOG(info, "unsubscribe ~p", [Topics], Session),
@@ -329,7 +335,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id     = ClientId,
                 fun(Topic, SubDict) ->
                     case dict:find(Topic, SubDict) of
                         {ok, _Qos} ->
-                            emqttd:unsubscribe(ClientId, Topic),
+                            emqttd:unsubscribe(Topic, ClientId),
                             dict:erase(Topic, SubDict);
                         error ->
                             SubDict