Преглед изворни кода

Merge pull request #941 from callbay/issue#935

Fix Issue#935
Feng Lee пре 9 година
родитељ
комит
bb91bc04c6
7 измењених фајлова са 66 додато и 15 уклоњено
  1. 3 0
      .gitignore
  2. 1 1
      include/emqttd_internal.hrl
  3. 32 0
      rebar.lock
  4. 8 0
      src/emqttd.app.src
  5. 1 1
      src/emqttd_broker.erl
  6. 7 2
      src/emqttd_client.erl
  7. 14 11
      src/emqttd_keepalive.erl

+ 3 - 0
.gitignore

@@ -28,3 +28,6 @@ ct.coverdata
 emqttd.iml
 _rel/
 data/
+_build
+.rebar3
+rebar3.crashdump

+ 1 - 1
include/emqttd_internal.hrl

@@ -19,7 +19,7 @@
 -define(GPROC_POOL(JoinOrLeave, Pool, I),
         (begin
             case JoinOrLeave of
-                join  -> gproc_pool:connect_worker(Pool, {Pool, Id});
+                join  -> gproc_pool:connect_worker(Pool, {Pool, I});
                 leave -> gproc_pool:disconnect_worker(Pool, {Pool, I})
             end
         end)).

+ 32 - 0
rebar.lock

@@ -0,0 +1,32 @@
+[{<<"esockd">>,
+  {git,"https://github.com/emqtt/esockd",
+       {ref,"6ef597f16ce242fe37ae019d6ff5214f7a784c80"}},
+  0},
+ {<<"gen_logger">>,
+  {git,"https://github.com/emqtt/gen_logger.git",
+       {ref,"f6e9f2f373d99f41ffe0579ab5a5f3b19472c9c5"}},
+  1},
+ {<<"goldrush">>,
+  {git,"https://github.com/basho/goldrush.git",
+       {ref,"8f1b715d36b650ec1e1f5612c00e28af6ab0de82"}},
+  1},
+ {<<"gproc">>,
+  {git,"https://github.com/uwiger/gproc",
+       {ref,"01c8fbfdd5e4701e8e4b57b0c8279872f9574b0b"}},
+  0},
+ {<<"lager">>,
+  {git,"https://github.com/basho/lager",
+       {ref,"81eaef0ce98fdbf64ab95665e3bc2ec4b24c7dac"}},
+  0},
+ {<<"lager_syslog">>,
+  {git,"https://github.com/basho/lager_syslog",
+       {ref,"126dd0284fcac9b01613189a82facf8d803411a2"}},
+  0},
+ {<<"mochiweb">>,
+  {git,"https://github.com/emqtt/mochiweb",
+       {ref,"af27c0c90bf4c1bfeae0290e4c541264b69f7168"}},
+  0},
+ {<<"syslog">>,
+  {git,"git://github.com/Vagabond/erlang-syslog",
+       {ref,"0e4f0e95c361af298c5d1d17ceccfa831efc036d"}},
+  1}].

+ 8 - 0
src/emqttd.app.src

@@ -0,0 +1,8 @@
+{application, emqttd, [
+	{description, "Erlang MQTT Broker"},
+	{vsn, "2.1.0"},
+	{modules, []},
+	{registered, [emqttd_sup]},
+	{applications, [kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog]},
+	{mod, {emqttd_app, []}}
+]}.

+ 1 - 1
src/emqttd_broker.erl

@@ -106,7 +106,7 @@ start_tick(0, _Msg) ->
 start_tick(Interval, Msg) when Interval > 0 ->
     {ok, TRef} = timer:send_interval(Interval, Msg), TRef.
 
-%% @doc Start tick timer
+%% @doc Stop tick timer
 stop_tick(undefined) ->
     ok;
 stop_tick(TRef) ->

+ 7 - 2
src/emqttd_client.erl

@@ -252,8 +252,13 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
                     {error, Error}              -> {error, Error}
                 end
              end,
-    KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
-    {noreply, State#client_state{keepalive = KeepAlive}, hibernate};
+    case emqttd_keepalive:start(StatFun, Interval, {keepalive, check}) of
+        {ok, KeepAlive} ->
+            {noreply, State#client_state{keepalive = KeepAlive}, hibernate};
+        {error, Error} ->
+            ?LOG(warning, "Keepalive error - ~p", [Error], State),
+            shutdown(Error, State)
+    end;
 
 handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of

+ 14 - 11
src/emqttd_keepalive.erl

@@ -29,14 +29,18 @@
 -export_type([keepalive/0]).
 
 %% @doc Start a keepalive
--spec(start(fun(), integer(), any()) -> undefined | keepalive()).
+-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, any()}).
 start(_, 0, _) ->
-    undefined;
+    {ok, #keepalive{}};
 start(StatFun, TimeoutSec, TimeoutMsg) ->
-    {ok, StatVal} = StatFun(),
-    #keepalive{statfun = StatFun, statval = StatVal,
-               tsec = TimeoutSec, tmsg = TimeoutMsg,
-               tref = timer(TimeoutSec, TimeoutMsg)}.
+    case StatFun() of
+        {ok, StatVal} ->
+            {ok, #keepalive{statfun = StatFun, statval = StatVal,
+                       tsec = TimeoutSec, tmsg = TimeoutMsg,
+                       tref = timer(TimeoutSec, TimeoutMsg)}};
+        {error, Error} ->
+            {error, Error}
+    end.
 
 %% @doc Check keepalive, called when timeout.
 -spec(check(keepalive()) -> {ok, keepalive()} | {error, any()}).
@@ -59,12 +63,11 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
 
 %% @doc Cancel Keepalive
 -spec(cancel(keepalive()) -> ok).
-cancel(#keepalive{tref = TRef}) ->
-    cancel(TRef);
-cancel(undefined) -> 
+cancel(#keepalive{tref = TRef}) when is_reference(TRef) ->
+    erlang:cancel_timer(TRef),
     ok;
-cancel(TRef) ->
-    catch erlang:cancel_timer(TRef).
+cancel(_) ->
+    ok.
 
 timer(Sec, Msg) ->
     erlang:send_after(timer:seconds(Sec), self(), Msg).