Bläddra i källkod

Fix subscribe, unsubscribe

Feng Lee 9 år sedan
förälder
incheckning
6ceb1c6718
4 ändrade filer med 33 tillägg och 22 borttagningar
  1. 6 10
      src/emqttd_client.erl
  2. 0 1
      src/emqttd_mod_subscription.erl
  3. 21 2
      src/emqttd_protocol.erl
  4. 6 9
      src/emqttd_ws_client.erl

+ 6 - 10
src/emqttd_client.erl

@@ -140,14 +140,14 @@ handle_call(Req, _From, State) ->
     ?UNEXPECTED_REQ(Req, State).
 
 handle_cast({subscribe, TopicTable}, State) ->
-    with_session(fun(SessPid) ->
-                   emqttd_session:subscribe(SessPid, TopicTable)
-                 end, State);
+    with_proto_state(fun(ProtoState) ->
+                emqttd_protocol:handle({subscribe, TopicTable}, ProtoState)
+        end, State);
 
 handle_cast({unsubscribe, Topics}, State) ->
-    with_session(fun(SessPid) ->
-                   emqttd_session:unsubscribe(SessPid, Topics)
-                 end, State);
+    with_proto_state(fun(ProtoState) ->
+                emqttd_protocol:handle({unsubscribe, Topics}, ProtoState)
+        end, State);
 
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
@@ -249,10 +249,6 @@ with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) ->
     {ok, ProtoState1} = Fun(ProtoState),
     hibernate(State#client_state{proto_state = ProtoState1}).
 
-with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
-    Fun(emqttd_protocol:session(ProtoState)),
-    hibernate(State).
-
 %% Receive and parse tcp data
 received(<<>>, State) ->
     hibernate(State);

+ 0 - 1
src/emqttd_mod_subscription.erl

@@ -14,7 +14,6 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc Subscription from Broker Side
 -module(emqttd_mod_subscription).
 
 -behaviour(emqttd_gen_mod).

+ 21 - 2
src/emqttd_protocol.erl

@@ -14,7 +14,6 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% @doc MQTT Protocol Processor.
 -module(emqttd_protocol).
 
 -include("emqttd.hrl").
@@ -28,7 +27,7 @@
 %% API
 -export([init/3, info/1, clientid/1, client/1, session/1]).
 
--export([received/2, send/2, redeliver/2, shutdown/2]).
+-export([received/2, handle/2, send/2, redeliver/2, shutdown/2]).
 
 -export([process/2]).
 
@@ -116,6 +115,26 @@ received(Packet = ?PACKET(_Type), State) ->
             {error, Reason, State}
     end.
 
+handle({subscribe, RawTopicTable}, ProtoState = #proto_state{client_id = ClientId,
+                                                             username  = Username,
+                                                             session   = Session}) ->
+    TopicTable = parse_topic_table(RawTopicTable),
+    case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of
+        {ok, TopicTable1} ->
+            emqttd_session:subscribe(Session, TopicTable1);
+        {stop, _} ->
+            ok
+    end,
+    {ok, ProtoState};
+
+handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId,
+                                                           username  = Username,
+                                                           session   = Session}) ->
+    {ok, TopicTable} = emqttd:run_hooks('client.unsubscribe',
+                                        [ClientId, Username], parse_topics(RawTopics)),
+    emqttd_session:unsubscribe(Session, TopicTable),
+    {ok, ProtoState}.
+
 process(Packet = ?CONNECT_PACKET(Var), State0) ->
 
     #mqtt_packet_connect{proto_ver  = ProtoVer,

+ 6 - 9
src/emqttd_ws_client.erl

@@ -97,14 +97,14 @@ handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
     {reply, {error, unsupported_request}, State}.
 
 handle_cast({subscribe, TopicTable}, State) ->
-    with_session(fun(SessPid) ->
-                   emqttd_session:subscribe(SessPid, TopicTable)
-                 end, State);
+    with_proto_state(fun(ProtoState) ->
+                emqttd_protocol:handle({subscribe, TopicTable}, ProtoState)
+        end, State);
 
 handle_cast({unsubscribe, Topics}, State) ->
-    with_session(fun(SessPid) ->
-                   emqttd_session:unsubscribe(SessPid, Topics)
-                 end, State);
+    with_proto_state(fun(ProtoState) ->
+                emqttd_protocol:handle({unsubscribe, Topics}, ProtoState)
+        end, State);
 
 handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
     case emqttd_protocol:received(Packet, ProtoState) of
@@ -194,9 +194,6 @@ with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
     {ok, ProtoState1} = Fun(ProtoState),
     noreply(State#wsclient_state{proto_state = ProtoState1}).
 
-with_session(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
-    Fun(emqttd_protocol:session(ProtoState)), noreply(State).
-
 noreply(State) ->
     {noreply, State, hibernate}.