Procházet zdrojové kódy

Merge pull request #261 from emqtt/dev

Fix issue #259, add emqttd_client:subscribe/2 api
Feng Lee před 10 roky
rodič
revize
dd21b4d3b5
5 změnil soubory, kde provedl 27 přidání a 28 odebrání
  1. 1 1
      src/emqttd.app.src
  2. 4 12
      src/emqttd_app.erl
  3. 20 14
      src/emqttd_client.erl
  4. 1 0
      src/emqttd_ctl.erl
  5. 1 1
      src/emqttd_mod_autosub.erl

+ 1 - 1
src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
 {application, emqttd,
  [
  [
   {description, "Erlang MQTT Broker"},
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.10.0"},
+  {vsn, "0.10.1"},
   {modules, []},
   {modules, []},
   {registered, []},
   {registered, []},
   {applications, [kernel,
   {applications, [kernel,

+ 4 - 12
src/emqttd_app.erl

@@ -31,7 +31,7 @@
 -behaviour(application).
 -behaviour(application).
 
 
 %% Application callbacks
 %% Application callbacks
--export([start/2, prep_stop/1, stop/1]).
+-export([start/2, stop/1]).
 
 
 -define(PRINT_MSG(Msg), io:format(Msg)).
 -define(PRINT_MSG(Msg), io:format(Msg)).
 
 
@@ -132,13 +132,9 @@ worker_spec(Name, Opts) ->
         {Name, start_link, [Opts]},
         {Name, start_link, [Opts]},
             permanent, 10000, worker, [Name]}.
             permanent, 10000, worker, [Name]}.
 
 
-%% close all listeners first...
-prep_stop(State) ->
-    stop_listeners(), 
-    timer:sleep(2),
-    emqttd_plugins:unload(),
-    timer:sleep(2),
-    State.
+-spec stop(State :: term()) -> term().
+stop(_State) ->
+    stop_listeners().
 
 
 stop_listeners() ->
 stop_listeners() ->
     %% ensure that esockd applications is started?
     %% ensure that esockd applications is started?
@@ -150,7 +146,3 @@ stop_listeners() ->
             emqttd:close_listeners(Listeners)
             emqttd:close_listeners(Listeners)
     end.
     end.
 
 
--spec stop(State :: term()) -> term().
-stop(_State) ->
-    ok.
-

+ 20 - 14
src/emqttd_client.erl

@@ -34,7 +34,7 @@
 -include("emqttd_protocol.hrl").
 -include("emqttd_protocol.hrl").
 
 
 %% API Function Exports
 %% API Function Exports
--export([start_link/2, info/1, kick/1]).
+-export([start_link/2, info/1, kick/1, subscribe/2]).
 
 
 -behaviour(gen_server).
 -behaviour(gen_server).
 
 
@@ -58,11 +58,14 @@
 start_link(SockArgs, PktOpts) ->
 start_link(SockArgs, PktOpts) ->
     {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}.
     {ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, PktOpts]])}.
 
 
-info(Pid) ->
-    gen_server:call(Pid, info, infinity).
+info(CPid) ->
+    gen_server:call(CPid, info, infinity).
 
 
-kick(Pid) ->
-    gen_server:call(Pid, kick).
+kick(CPid) ->
+    gen_server:call(CPid, kick).
+
+subscribe(CPid, TopicTable) ->
+    gen_server:cast(CPid, {subscribe, TopicTable}).
 
 
 init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
 init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
     % Transform if ssl.
     % Transform if ssl.
@@ -95,6 +98,10 @@ handle_call(Req, _From, State = #state{peername = Peername}) ->
     lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     lager:critical("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     {reply, {error, unsupported_request}, State}.    
     {reply, {error, unsupported_request}, State}.    
 
 
+handle_cast({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) ->
+    {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
+    noreply(State#state{proto_state = ProtoState1});
+
 handle_cast(Msg, State = #state{peername = Peername}) ->
 handle_cast(Msg, State = #state{peername = Peername}) ->
     lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
     lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
     {noreply, State}.
     {noreply, State}.
@@ -110,18 +117,14 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
 
 
 handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
 handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
     {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
     {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
-    {noreply, State#state{proto_state = ProtoState1}, hibernate};
+    noreply(State#state{proto_state = ProtoState1});
 
 
 handle_info({redeliver, {?PUBREL, PacketId}},  State = #state{proto_state = ProtoState}) ->
 handle_info({redeliver, {?PUBREL, PacketId}},  State = #state{proto_state = ProtoState}) ->
     {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
     {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
-    {noreply, State#state{proto_state = ProtoState1}, hibernate};
-
-handle_info({subscribe, TopicTable}, State = #state{proto_state = ProtoState}) ->
-    {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
-    {noreply, State#state{proto_state = ProtoState1}};
+    noreply(State#state{proto_state = ProtoState1});
 
 
 handle_info({inet_reply, _Ref, ok}, State) ->
 handle_info({inet_reply, _Ref, ok}, State) ->
-    {noreply, State, hibernate};
+    noreply(State);
 
 
 handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
 handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
     lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
     lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
@@ -138,7 +141,7 @@ handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peer
 handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
 handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
     lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
     lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
     KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
     KeepAlive = emqttd_keepalive:new({Transport, Socket}, TimeoutSec, {keepalive, timeout}),
-    {noreply, State#state{ keepalive = KeepAlive }};
+    noreply(State#state{keepalive = KeepAlive});
 
 
 handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
 handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
     case emqttd_keepalive:resume(KeepAlive) of
     case emqttd_keepalive:resume(KeepAlive) of
@@ -147,7 +150,7 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive
         stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
         stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
     {resumed, KeepAlive1} ->
     {resumed, KeepAlive1} ->
         lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
         lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
-        {noreply, State#state{keepalive = KeepAlive1}}
+        noreply(State#state{keepalive = KeepAlive1})
     end;
     end;
 
 
 handle_info(Info, State = #state{peername = Peername}) ->
 handle_info(Info, State = #state{peername = Peername}) ->
@@ -167,6 +170,9 @@ terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state
 
 
 code_change(_OldVsn, State, _Extra) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
     {ok, State}.
+
+noreply(State) ->
+    {noreply, State, hibernate}.
     
     
 %-------------------------------------------------------
 %-------------------------------------------------------
 % receive and parse tcp data
 % receive and parse tcp data

+ 1 - 0
src/emqttd_ctl.erl

@@ -79,6 +79,7 @@ cluster([SNode]) ->
     pong ->
     pong ->
         case emqttd:is_running(Node) of
         case emqttd:is_running(Node) of
             true ->
             true ->
+                emqttd_plugins:unload(),
                 application:stop(emqttd),
                 application:stop(emqttd),
                 application:stop(esockd),
                 application:stop(esockd),
                 application:stop(gproc),
                 application:stop(gproc),

+ 1 - 1
src/emqttd_mod_autosub.erl

@@ -47,7 +47,7 @@ load(Opts) ->
 
 
 client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) ->
 client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) ->
     F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
     F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
-    ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};
+    emqttd_client:subscribe(ClientPid, [{F(Topic), Qos} || {Topic, Qos} <- Topics]);
 
 
 client_connected(_ConnAck, _Client, _Topics) ->
 client_connected(_ConnAck, _Client, _Topics) ->
     ignore.
     ignore.