Просмотр исходного кода

Merge pull request #1369 from emqtt/emq24

Version 2.3.1
huangdan 8 лет назад
Родитель
Сommit
37b7e6aee4

+ 1 - 1
Makefile

@@ -1,6 +1,6 @@
 PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
-PROJECT_VERSION = 2.3.0
+PROJECT_VERSION = 2.3.1
 
 DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
 

+ 2 - 2
etc/emq.conf

@@ -242,7 +242,7 @@ mqtt.session.max_awaiting_rel = 100
 mqtt.session.await_rel_timeout = 20s
 
 ## Enable Statistics: on | off
-mqtt.session.enable_stats = off
+mqtt.session.enable_stats = on
 
 ## Expired after 1 day:
 ## w - week
@@ -534,7 +534,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 ##--------------------------------------------------------------------
 ## HTTP Management API Listener
 
-listener.api.mgmt = 127.0.0.1:8080
+listener.api.mgmt = 0.0.0.0:8080
 
 listener.api.mgmt.acceptors = 4
 

+ 2 - 1
include/emqttd_protocol.hrl

@@ -174,7 +174,8 @@
           will_topic  = undefined      :: undefined | binary(),
           will_msg    = undefined      :: undefined | binary(),
           username    = undefined      :: undefined | binary(),
-          password    = undefined      :: undefined | binary()
+          password    = undefined      :: undefined | binary(),
+          is_bridge   = false          :: boolean()
         }).
 
 -record(mqtt_packet_connack,

+ 1 - 1
src/emqttd.app.src

@@ -1,6 +1,6 @@
 {application,emqttd,
              [{description,"Erlang MQTT Broker"},
-              {vsn,"2.3.0"},
+              {vsn,"2.3.1"},
               {modules,[]},
               {registered,[emqttd_sup]},
               {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,

+ 1 - 0
src/emqttd_cm.erl

@@ -152,6 +152,7 @@ monitor_client(ClientId, Pid, State = #state{monitors = Monitors}) ->
     State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monitors)}.
 
 erase_monitor(MRef, State = #state{monitors = Monitors}) ->
+    erlang:demonitor(MRef, [flush]),
     State#state{monitors = dict:erase(MRef, Monitors)}.
 
 setstats(State = #state{statsfun = StatsFun}) ->

+ 3 - 2
src/emqttd_parser.erl

@@ -79,7 +79,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos  = Qos} = Header, Length)
         {?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
             {ProtoName, Rest1} = parse_utf(FrameBin),
             %% Fix mosquitto bridge: 0x83, 0x84
-            <<_Bridge:4, ProtoVersion:4, Rest2/binary>> = Rest1,
+            <<BridgeTag:4, ProtoVersion:4, Rest2/binary>> = Rest1,
             <<UsernameFlag : 1,
               PasswordFlag : 1,
               WillRetain   : 1,
@@ -109,7 +109,8 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos  = Qos} = Header, Length)
                            will_topic  = WillTopic,
                            will_msg    = WillMsg,
                            username    = UserName,
-                           password    = PasssWord}, Rest);
+                           password    = PasssWord,
+                           is_bridge   = (BridgeTag =:= 8)}, Rest);
                false ->
                     {error, protocol_header_corrupt}
             end;

+ 21 - 9
src/emqttd_protocol.erl

@@ -44,7 +44,7 @@
                       clean_sess, proto_ver, proto_name, username, is_superuser,
                       will_msg, keepalive, keepalive_backoff, max_clientid_len,
                       session, stats_data, mountpoint, ws_initial_headers,
-                      connected_at}).
+                      is_bridge, connected_at}).
 
 -type(proto_state() :: #proto_state{}).
 
@@ -180,7 +180,8 @@ process(?CONNECT_PACKET(Var), State0) ->
                          password   = Password,
                          clean_sess = CleanSess,
                          keep_alive = KeepAlive,
-                         client_id  = ClientId} = Var,
+                         client_id  = ClientId,
+                         is_bridge  = IsBridge} = Var,
 
     State1 = State0#proto_state{proto_ver    = ProtoVer,
                                 proto_name   = ProtoName,
@@ -189,6 +190,7 @@ process(?CONNECT_PACKET(Var), State0) ->
                                 clean_sess   = CleanSess,
                                 keepalive    = KeepAlive,
                                 will_msg     = willmsg(Var, State0),
+                                is_bridge    = IsBridge,
                                 connected_at = os:timestamp()},
 
     {ReturnCode1, SessPresent, State3} =
@@ -211,7 +213,7 @@ process(?CONNECT_PACKET(Var), State0) ->
                             %% ACCEPT
                             {?CONNACK_ACCEPT, SP, State2#proto_state{session = Session, is_superuser = IsSuperuser}};
                         {error, Error} ->
-                            exit({shutdown, Error})
+                            {stop, {shutdown, Error}, State2}
                     end;
                 {error, Reason}->
                     ?LOG(error, "Username '~s' login failed for ~p", [Username, Reason], State1),
@@ -333,10 +335,11 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
 -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
 send(Msg, State = #proto_state{client_id  = ClientId,
                                username   = Username,
-                               mountpoint = MountPoint})
+                               mountpoint = MountPoint,
+                               is_bridge  = IsBridge})
         when is_record(Msg, mqtt_message) ->
     emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
-    send(emqttd_message:to_packet(unmount(MountPoint, Msg)), State);
+    send(emqttd_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
 
 send(Packet = ?PACKET(Type),
      State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
@@ -378,12 +381,12 @@ stop_if_auth_failure(_RC, State) ->
 
 shutdown(_Error, #proto_state{client_id = undefined}) ->
     ignore;
-
-shutdown(conflict, #proto_state{client_id = _ClientId}) ->
+shutdown(conflict, _State) ->
+    %% let it down
+    ignore;
+shutdown(mnesia_conflict, _State) ->
     %% let it down
-    %% emqttd_cm:unreg(ClientId);
     ignore;
-
 shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
     ?LOG(debug, "Shutdown for ~p", [Error], State),
     Client = client(State),
@@ -543,6 +546,15 @@ check_acl(subscribe, Topic, Client) ->
 sp(true)  -> 1;
 sp(false) -> 0.
 
+%%--------------------------------------------------------------------
+%% The retained flag should be propagated for bridge.
+%%--------------------------------------------------------------------
+
+clean_retain(false, Msg = #mqtt_message{retain = true}) ->
+    Msg#mqtt_message{retain = false};
+clean_retain(_IsBridge, Msg) ->
+    Msg.
+
 %%--------------------------------------------------------------------
 %% Mount Point
 %%--------------------------------------------------------------------

+ 3 - 1
src/emqttd_router.erl

@@ -284,5 +284,7 @@ clean_routes_(Node) ->
     mnesia:transaction(Clean).
 
 update_stats_() ->
-    emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)).
+    Size = mnesia:table_info(mqtt_route, size),
+    emqttd_stats:setstats('routes/count', 'routes/max', Size),
+    emqttd_stats:setstats('topics/count', 'topics/max', Size).
 

+ 11 - 16
src/emqttd_session.erl

@@ -152,9 +152,10 @@
          %% Force GC Count
          force_gc_count :: undefined | integer(),
 
-         created_at :: erlang:timestamp(),
+         %% Ignore loop deliver?
+         ignore_loop_deliver = false :: boolean(),
 
-         ignore_loop_deliver = false :: boolean()
+         created_at :: erlang:timestamp()
         }).
 
 -define(TIMEOUT, 60000).
@@ -529,17 +530,14 @@ handle_cast({destroy, ClientId},
 handle_cast(Msg, State) ->
     ?UNEXPECTED_MSG(Msg, State).
 
-%% Dispatch message from self publish
-handle_info({dispatch, Topic, Msg = #mqtt_message{from = {ClientId, _}}}, 
-             State = #state{client_id = ClientId, 
-                            ignore_loop_deliver = IgnoreLoopDeliver}) when is_record(Msg, mqtt_message) ->
-    case IgnoreLoopDeliver of
-        true  -> {noreply, State, hibernate};
-        false -> {noreply, handle_dispatch(Topic, Msg, State), hibernate}
-    end;
+%% Ignore Messages delivered by self
+handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}},
+             State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
+    hibernate(State);
+
 %% Dispatch Message
 handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
-    {noreply, handle_dispatch(Topic, Msg, State), hibernate};
+    hibernate(gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)));
 
 %% Do nothing if the client has been disconnected.
 handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
@@ -552,7 +550,7 @@ handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
     hibernate(expire_awaiting_rel(emit_stats(State#state{await_rel_timer = undefined})));
 
 handle_info({timeout, _Timer, expired}, State) ->
-    ?LOG(debug, "Expired, shutdown now.", [], State),
+    ?LOG(info, "Expired, shutdown now.", [], State),
     shutdown(expired, State);
 
 handle_info({'EXIT', ClientPid, _Reason},
@@ -563,7 +561,7 @@ handle_info({'EXIT', ClientPid, Reason},
             State = #state{clean_sess      = false,
                            client_pid      = ClientPid,
                            expiry_interval = Interval}) ->
-    ?LOG(debug, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
+    ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
     ExpireTimer = start_timer(Interval, expired),
     State1 = State#state{client_pid = undefined, expiry_timer = ExpireTimer},
     hibernate(emit_stats(State1));
@@ -687,9 +685,6 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen})
 %% Dispatch Messages
 %%--------------------------------------------------------------------
 
-handle_dispatch(Topic, Msg, State) ->
-    gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State)).
-
 %% Enqueue message if the client has been disconnected
 dispatch(Msg, State = #state{client_pid = undefined}) ->
     enqueue_msg(Msg, State);

+ 14 - 20
src/emqttd_sm.erl

@@ -156,7 +156,7 @@ handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, Stat
                     {reply, {ok, SessPid, true}, State};
                 {error, Erorr} ->
                     {reply, {error, Erorr}, State}
-             end
+            end
     end;
 
 %% Transient Session
@@ -183,16 +183,14 @@ handle_cast(Msg, State) ->
 handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
     case dict:find(MRef, State#state.monitors) of
         {ok, ClientId} ->
-            mnesia:transaction(fun() ->
-                case mnesia:wread({mqtt_session, ClientId}) of
-                    [] ->
-                        ok;
-                    [Sess = #mqtt_session{sess_pid = DownPid}] ->
-                        mnesia:delete_object(mqtt_session, Sess, write);
-                    [_Sess] ->
-                        ok
-                    end
-                end),
+            case mnesia:dirty_read({mqtt_session, ClientId}) of
+                [] ->
+                    ok;
+                [Sess = #mqtt_session{sess_pid = DownPid}] ->
+                    mnesia:dirty_delete_object(Sess);
+                [_Sess] ->
+                    ok
+            end,
             {noreply, erase_monitor(MRef, State), hibernate};
         error ->
             lager:error("MRef of session ~p not found", [DownPid]),
@@ -216,8 +214,7 @@ code_change(_OldVsn, State, _Extra) ->
 create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
     case create_session(CleanSess, {ClientId, Username}, ClientPid) of
         {ok, SessPid} ->
-            {reply, {ok, SessPid, false},
-                monitor_session(ClientId, SessPid, State)};
+            {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)};
         {error, Error} ->
             {reply, {error, Error}, State}
     end.
@@ -284,15 +281,14 @@ destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid  = SessPi
     remove_session(Session);
 
 %% Remote node
-destroy_session(Session = #mqtt_session{client_id = ClientId,
-                                        sess_pid  = SessPid}) ->
+destroy_session(Session = #mqtt_session{client_id = ClientId, sess_pid  = SessPid}) ->
     Node = node(SessPid),
     case rpc:call(Node, emqttd_session, destroy, [SessPid, ClientId]) of
         ok ->
             remove_session(Session);
         {badrpc, nodedown} ->
             ?LOG(error, "Node '~s' down", [Node], Session),
-            remove_session(Session); 
+            remove_session(Session);
         {badrpc, Reason} ->
             ?LOG(error, "Failed to destory ~p on remote node ~p for ~s",
                  [SessPid, Node, Reason], Session),
@@ -300,15 +296,13 @@ destroy_session(Session = #mqtt_session{client_id = ClientId,
      end.
 
 remove_session(Session) ->
-    case mnesia:transaction(fun mnesia:delete_object/1, [Session]) of
-        {atomic, ok}     -> ok;
-        {aborted, Error} -> {error, Error}
-    end.
+    mnesia:dirty_delete_object(Session).
 
 monitor_session(ClientId, SessPid, State = #state{monitors = Monitors}) ->
     MRef = erlang:monitor(process, SessPid),
     State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
 
 erase_monitor(MRef, State = #state{monitors = Monitors}) ->
+    erlang:demonitor(MRef, [flush]),
     State#state{monitors = dict:erase(MRef, Monitors)}.
 

+ 3 - 5
src/emqttd_sm_sup.erl

@@ -24,8 +24,6 @@
 
 -include("emqttd.hrl").
 
--define(SM, emqttd_sm).
-
 -define(HELPER, emqttd_sm_helper).
 
 %% API
@@ -44,11 +42,11 @@ init([]) ->
     %% Helper
     StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
     Helper = {?HELPER, {?HELPER, start_link, [StatsFun]},
-                permanent, 5000, worker, [?HELPER]},
+              permanent, 5000, worker, [?HELPER]},
 
     %% SM Pool Sup
-    MFA = {?SM, start_link, []},
-    PoolSup = emqttd_pool_sup:spec([?SM, hash, erlang:system_info(schedulers), MFA]),
+    MFA = {emqttd_sm, start_link, []},
+    PoolSup = emqttd_pool_sup:spec([emqttd_sm, hash, erlang:system_info(schedulers), MFA]),
 
     {ok, {{one_for_all, 10, 3600}, [Helper, PoolSup]}}.
 

+ 3 - 3
src/emqttd_stats.erl

@@ -57,14 +57,14 @@
 
 %% $SYS Topics for Subscribers
 -define(SYSTOP_PUBSUB, [
-    'routes/count',        % ...
-    'routes/max',          % ...
     'topics/count',        % ...
     'topics/max',          % ...
     'subscribers/count',   % ...
     'subscribers/max',     % ...
     'subscriptions/count', % ...
-    'subscriptions/max'    % ...
+    'subscriptions/max',   % ...
+    'routes/count',        % ...
+    'routes/max'           % ...
 ]).
 
 %% $SYS Topic for retained

+ 27 - 16
src/emqttd_ws_client.erl

@@ -91,23 +91,31 @@ clean_acl_cache(CPid, Topic) ->
 
 init([Env, WsPid, Req, ReplyChannel]) ->
     process_flag(trap_exit, true),
-    true = link(WsPid),
-    {ok, Peername} = Req:get(peername),
-    Headers = mochiweb_headers:to_list(
-                mochiweb_request:get(headers, Req)),
     Conn = Req:get(connection),
-    ProtoState = emqttd_protocol:init(Conn, Peername, send_fun(ReplyChannel),
-                                      [{ws_initial_headers, Headers} | Env]),
-    IdleTimeout = get_value(client_idle_timeout, Env, 30000),
-    EnableStats = get_value(client_enable_stats, Env, false),
-    ForceGcCount = emqttd_gc:conn_max_gc_count(),
-    {ok, #wsclient_state{connection     = Conn,
-                         ws_pid         = WsPid,
-                         peername       = Peername,
-                         proto_state    = ProtoState,
-                         enable_stats   = EnableStats,
-                         force_gc_count = ForceGcCount},
-     IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE}.
+    true = link(WsPid),
+    case Req:get(peername) of
+        {ok, Peername} ->
+            Headers = mochiweb_headers:to_list(
+                        mochiweb_request:get(headers, Req)),
+            ProtoState = emqttd_protocol:init(Conn, Peername, send_fun(ReplyChannel),
+                                              [{ws_initial_headers, Headers} | Env]),
+            IdleTimeout = get_value(client_idle_timeout, Env, 30000),
+            EnableStats = get_value(client_enable_stats, Env, false),
+            ForceGcCount = emqttd_gc:conn_max_gc_count(),
+            {ok, #wsclient_state{connection     = Conn,
+                                 ws_pid         = WsPid,
+                                 peername       = Peername,
+                                 proto_state    = ProtoState,
+                                 enable_stats   = EnableStats,
+                                 force_gc_count = ForceGcCount},
+             IdleTimeout, {backoff, 2000, 2000, 20000}, ?MODULE};
+        {error, enotconn} -> Conn:fast_close(),
+                             exit(WsPid, normal),
+                             exit(normal);
+        {error, Reason}   -> Conn:fast_close(),
+                             exit(WsPid, normal),
+                             exit({shutdown, Reason})
+    end.
 
 prioritise_call(Msg, _From, _Len, _State) ->
     case Msg of info -> 10; stats -> 10; state -> 10; _ -> 5 end.
@@ -203,6 +211,9 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
     ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
     shutdown(conflict, State);
 
+handle_info({shutdown, Reason}, State) ->
+    shutdown(Reason, State);
+
 handle_info({keepalive, start, Interval}, State = #wsclient_state{connection = Conn}) ->
     ?WSLOG(debug, "Keepalive at the interval of ~p", [Interval], State),
     case emqttd_keepalive:start(stat_fun(Conn), Interval, {keepalive, check}) of