Просмотр исходного кода

Merge pull request #1416 from emqtt/emq24

Version 2.3.3 - Enhancements and bug fix
Feng Lee 8 лет назад
Родитель
Сommit
b98d67a6b4
12 измененных файлов с 1079 добавлено и 248 удалено
  1. 3 3
      Makefile
  2. 3 5
      README.md
  3. 992 209
      etc/emq.conf
  4. 33 4
      priv/emq.schema
  5. 1 1
      src/emqttd.app.src
  6. 1 1
      src/emqttd_bridge.erl
  7. 2 2
      src/emqttd_broker.erl
  8. 1 1
      src/emqttd_mgmt.erl
  9. 13 4
      src/emqttd_session.erl
  10. 4 4
      src/emqttd_topic.erl
  11. 16 8
      src/emqttd_ws.erl
  12. 10 6
      test/emqttd_topic_SUITE.erl

+ 3 - 3
Makefile

@@ -1,6 +1,6 @@
 PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
-PROJECT_VERSION = 2.3.2
+PROJECT_VERSION = 2.3.3
 
 DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt clique jsx
 
@@ -9,8 +9,8 @@ dep_gproc        = git https://github.com/uwiger/gproc
 dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
 dep_lager        = git https://github.com/basho/lager master
 dep_esockd       = git https://github.com/emqtt/esockd v5.2
-dep_ekka         = git https://github.com/emqtt/ekka master
-dep_mochiweb     = git https://github.com/emqtt/mochiweb v4.2.0
+dep_ekka         = git https://github.com/emqtt/ekka v0.2.2
+dep_mochiweb     = git https://github.com/emqtt/mochiweb v4.2.1
 dep_pbkdf2       = git https://github.com/emqtt/pbkdf2 2.0.1
 dep_lager_syslog = git https://github.com/basho/lager_syslog
 dep_bcrypt       = git https://github.com/smarkets/erlang-bcrypt master

+ 3 - 5
README.md

@@ -77,7 +77,7 @@ Plugin                                                                 | Descrip
 -----------------------------------------------------------------------|--------------------------------------
 [emq_plugin_template](https://github.com/emqtt/emq_plugin_template)    | Plugin template and demo
 [emq_dashboard](https://github.com/emqtt/emq_dashboard)                | Web Dashboard
-[emq_retainer](https://github.com/emqtt/emq_retainer)                  | Store MQTT Retained Messages
+[emq_retainer](https://github.com/emqtt/emq-retainer)                  | Store MQTT Retained Messages
 [emq_modules](https://github.com/emqtt/emq-modules)                    | Presence, Subscription and Rewrite Modules
 [emq_auth_username](https://github.com/emqtt/emq_auth_username)        | Username/Password Authentication Plugin
 [emq_auth_clientid](https://github.com/emqtt/emq_auth_clientid)        | ClientId Authentication Plugin
@@ -93,7 +93,7 @@ Plugin                                                                 | Descrip
 [emq_sn](https://github.com/emqtt/emq_sn)                              | MQTT-SN Protocol Plugin
 [emq_coap](https://github.com/emqtt/emq_coap)                          | CoAP Protocol Plugin
 [emq_stomp](https://github.com/emqtt/emq_stomp)                        | Stomp Protocol Plugin
-[emq_lwm2m](https://github.com/emqtt/emq-lwm2m)                        | LWM2M Prototol Plugin
+[emq_lwm2m](https://github.com/emqx/emqx-lwm2m)                        | LWM2M Prototol Plugin
 [emq_recon](https://github.com/emqtt/emq_recon)                        | Recon Plugin
 [emq_reloader](https://github.com/emqtt/emq_reloader)                  | Reloader Plugin
 [emq_sockjs](https://github.com/emqtt/emq_sockjs)                      | SockJS(Stomp) Plugin
@@ -109,9 +109,7 @@ Plugin                                                                 | Descrip
 * Issues: https://github.com/emqtt/emqttd/issues
 * QQ Group: 12222225
 
-## Partners
-
-[QingCloud](https://qingcloud.com) is the world’s first IaaS provider that can deliver any number of IT resources in seconds and adopts a second-based billing system. QingCloud is committed to providing a reliable, secure, on-demand and real-time IT resource platform with excellent performance, which includes all components of a complete IT infrastructure system: computing, storage, networking and security.
+## Test Servers
 
 The **q.emqtt.com** hosts a public Four-Node *EMQ* cluster on [QingCloud](https://qingcloud.com):
 

Разница между файлами не показана из-за своего большого размера
+ 992 - 209
etc/emq.conf


+ 33 - 4
priv/emq.schema

@@ -702,8 +702,8 @@ end}.
 %%--------------------------------------------------------------------
 
 {mapping, "mqtt.broker.sys_interval", "emqttd.broker_sys_interval", [
-  {default, 60},
-  {datatype, integer}
+  {datatype, {duration, ms}},
+  {default, "1m"}
 ]}.
 
 %%--------------------------------------------------------------------
@@ -735,8 +735,8 @@ end}.
 ]}.
 
 {mapping, "mqtt.bridge.ping_down_interval", "emqttd.bridge", [
-  {default, 1},
-  {datatype, integer}
+  {datatype, {duration, ms}},
+  {default, "1s"}
 ]}.
 
 {translation, "emqttd.bridge", fun(Conf) ->
@@ -1007,6 +1007,10 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "listener.ws.$name.mountpoint", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
 {mapping, "listener.ws.$name.access.$id", "emqttd.listeners", [
   {datatype, string}
 ]}.
@@ -1140,6 +1144,14 @@ end}.
   hidden
 ]}.
 
+{mapping, "listener.wss.$name.tls_versions", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
+{mapping, "listener.wss.$name.ciphers", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
 {mapping, "listener.wss.$name.handshake_timeout", "emqttd.listeners", [
   {default, "15s"},
   {datatype, {duration, ms}}
@@ -1165,6 +1177,23 @@ end}.
   {datatype, {enum, [true, false]}}
 ]}.
 
+{mapping, "listener.wss.$name.secure_renegotiate", "emqttd.listeners", [
+  {datatype, flag}
+]}.
+
+{mapping, "listener.wss.$name.reuse_sessions", "emqttd.listeners", [
+  {default, on},
+  {datatype, flag}
+]}.
+
+{mapping, "listener.wss.$name.honor_cipher_order", "emqttd.listeners", [
+  {datatype, flag}
+]}.
+
+{mapping, "listener.wss.$name.peer_cert_as_username", "emqttd.listeners", [
+  {datatype, {enum, [cn, dn]}}
+]}.
+
 {translation, "emqttd.listeners", fun(Conf) ->
 
     Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,

+ 1 - 1
src/emqttd.app.src

@@ -1,6 +1,6 @@
 {application,emqttd,
              [{description,"Erlang MQTT Broker"},
-              {vsn,"2.3.2"},
+              {vsn,"2.3.3"},
               {modules,[]},
               {registered,[emqttd_sup]},
               {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,

+ 1 - 1
src/emqttd_bridge.erl

@@ -92,7 +92,7 @@ parse_opts([{topic_prefix, Prefix} | Opts], State) ->
 parse_opts([{max_queue_len, Len} | Opts], State) ->
     parse_opts(Opts, State#state{max_queue_len = Len});
 parse_opts([{ping_down_interval, Interval} | Opts], State) ->
-    parse_opts(Opts, State#state{ping_down_interval = Interval*1000});
+    parse_opts(Opts, State#state{ping_down_interval = Interval});
 parse_opts([_Opt | Opts], State) ->
     parse_opts(Opts, State).
 

+ 2 - 2
src/emqttd_broker.erl

@@ -105,9 +105,9 @@ datetime() ->
         io_lib:format(
             "~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w", [Y, M, D, H, MM, S])).
 
-%% @doc Start a tick timer
+%% @doc Start a tick timer.
 start_tick(Msg) ->
-    start_tick(timer:seconds(emqttd:env(broker_sys_interval, 60)), Msg).
+    start_tick(emqttd:env(broker_sys_interval, 60000), Msg).
 
 start_tick(0, _Msg) ->
     undefined;

+ 1 - 1
src/emqttd_mgmt.erl

@@ -45,7 +45,7 @@
 
 -export([publish/1, subscribe/1, unsubscribe/1]).
 
--export([kick_client/1, clean_acl_cache/2]).
+-export([kick_client/1, kick_client/2, clean_acl_cache/2, clean_acl_cache/3]).
 
 -export([modify_config/2, modify_config/3, modify_config/4, get_configs/0, get_config/1,
          get_plugin_config/1, get_plugin_config/2, modify_plugin_config/2, modify_plugin_config/3]).

+ 13 - 4
src/emqttd_session.erl

@@ -453,6 +453,8 @@ handle_cast({pubrel, PacketId}, State = #state{awaiting_rel = AwaitingRel}) ->
     {noreply,
      case maps:take(PacketId, AwaitingRel) of
          {Msg, AwaitingRel1} ->
+             %% Implement Qos2 by method A [MQTT 4.33]
+             %% Dispatch to subscriber when received PUBREL
              spawn(emqttd_server, publish, [Msg]), %%:)
              gc(State#state{awaiting_rel = AwaitingRel1});
          error ->
@@ -628,8 +630,10 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
                     redeliver(Msg, State),
                     Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
                     retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
-                {pubrel, PacketId} -> %% remove 'pubrel' directly?
-                    retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight:delete(PacketId)})
+                {pubrel, PacketId} ->
+                    redeliver({pubrel, PacketId}, State),
+                    Inflight1 = Inflight:update(PacketId, {pubrel, PacketId, Now}),
+                    retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1})
             end;
         true ->
             State#state{retry_timer = start_timer(Interval - Diff, retry_delivery)}
@@ -649,11 +653,13 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
 expire_awaiting_rel([], _Now, State) ->
     State#state{await_rel_timer = undefined};
 
-expire_awaiting_rel([{PacketId, #mqtt_message{timestamp = TS}} | Msgs],
+expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
                     Now, State = #state{awaiting_rel      = AwaitingRel,
                                         await_rel_timeout = Timeout}) ->
     case (timer:now_diff(Now, TS) div 1000) of
         Diff when Diff >= Timeout ->
+            ?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State),
+            emqttd_metrics:inc('messages/qos2/dropped'),
             expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
         Diff ->
             State#state{await_rel_timer = start_timer(Timeout - Diff, check_awaiting_rel)}
@@ -714,7 +720,10 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) ->
 %%--------------------------------------------------------------------
 
 redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
-    deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State).
+    deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State);
+
+redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
+    Pid ! {redeliver, {?PUBREL, PacketId}}.
 
 deliver(Msg, #state{client_pid = Pid}) ->
     inc_stats(deliver_msg),

+ 4 - 4
src/emqttd_topic.erl

@@ -61,18 +61,18 @@ wildcard([_H|T]) ->
 -spec(match(Name, Filter) -> boolean() when
       Name   :: topic() | words(),
       Filter :: topic() | words()).
+match(<<$$, _/binary>>, <<$+, _/binary>>) ->
+    false;
+match(<<$$, _/binary>>, <<$#, _/binary>>) ->
+    false;
 match(Name, Filter) when is_binary(Name) and is_binary(Filter) ->
     match(words(Name), words(Filter));
 match([], []) ->
     true;
 match([H|T1], [H|T2]) ->
     match(T1, T2);
-match([<<$$, _/binary>>|_], ['+'|_]) ->
-    false;
 match([_H|T1], ['+'|T2]) ->
     match(T1, T2);
-match([<<$$, _/binary>>|_], ['#']) ->
-    false;
 match(_, ['#']) ->
     true;
 match([_H1|_], [_H2|_]) ->

+ 16 - 8
src/emqttd_ws.erl

@@ -45,14 +45,22 @@ handle_request('GET', "/mqtt", Req) ->
     Proto   = check_protocol_header(Req),
     case {is_websocket(Upgrade), Proto} of
         {true, "mqtt" ++ _Vsn} ->
-            {ok, ProtoEnv} = emqttd:env(protocol),
-            PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
-            Parser = emqttd_parser:initial_state(PacketSize),
-            %% Upgrade WebSocket.
-            {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
-            {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
-            ReentryWs(#wsocket_state{peername = Req:get(peername), parser = Parser,
-                                     max_packet_size = PacketSize, client_pid = ClientPid});
+            case Req:get(peername) of
+                {ok, Peername} ->
+                    {ok, ProtoEnv} = emqttd:env(protocol),
+                    PacketSize = get_value(max_packet_size, ProtoEnv, ?MAX_PACKET_SIZE),
+                    Parser = emqttd_parser:initial_state(PacketSize),
+                    %% Upgrade WebSocket.
+                    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3),
+                    {ok, ClientPid} = emqttd_ws_client_sup:start_client(self(), Req, ReplyChannel),
+                    ReentryWs(#wsocket_state{peername = Peername,
+                                             parser = Parser,
+                                             max_packet_size = PacketSize,
+                                             client_pid = ClientPid});
+                {error, Reason} ->
+                    lager:error("Get peername with error ~s", [Reason]),
+                    Req:respond({400, [], <<"Bad Request">>})
+            end;
         {false, _} ->
             lager:error("Not WebSocket: Upgrade = ~s", [Upgrade]),
             Req:respond({400, [], <<"Bad Request">>});

+ 10 - 6
test/emqttd_topic_SUITE.erl

@@ -73,10 +73,10 @@ t_match2(_) ->
 
 t_match3(_) ->
     true = match(<<"device/60019423a83c/fw">>, <<"device/60019423a83c/#">>),
-    false = match(<<"device/60019423a83c/$fw">>, <<"device/60019423a83c/#">>),
+    true = match(<<"device/60019423a83c/$fw">>, <<"device/60019423a83c/#">>),
     true = match(<<"device/60019423a83c/$fw/fw">>, <<"device/60019423a83c/$fw/#">>),
     true = match(<<"device/60019423a83c/fw/checksum">>, <<"device/60019423a83c/#">>),
-    false = match(<<"device/60019423a83c/$fw/checksum">>, <<"device/60019423a83c/#">>),
+    true = match(<<"device/60019423a83c/$fw/checksum">>, <<"device/60019423a83c/#">>),
     true = match(<<"device/60019423a83c/dust/type">>, <<"device/60019423a83c/#">>).
 
 t_sigle_level_match(_) ->
@@ -86,7 +86,9 @@ t_sigle_level_match(_) ->
     true  = match(<<"sport/">>, <<"sport/+">>),
     true  = match(<<"/finance">>, <<"+/+">>),
     true  = match(<<"/finance">>, <<"/+">>),
-    false = match(<<"/finance">>, <<"+">>).
+    false = match(<<"/finance">>, <<"+">>),
+    true  = match(<<"/devices/$dev1">>, <<"/devices/+">>),
+    true  = match(<<"/devices/$dev1/online">>, <<"/devices/+/online">>).
 
 t_sys_match(_) ->
     true  = match(<<"$SYS/broker/clients/testclient">>, <<"$SYS/#">>),
@@ -95,9 +97,11 @@ t_sys_match(_) ->
     false = match(<<"$SYS/broker">>, <<"#">>).
 
 't_#_match'(_) ->
-    true = match(<<"a/b/c">>, <<"#">>),
-    true = match(<<"a/b/c">>, <<"+/#">>),
-    false = match(<<"$SYS/brokers">>, <<"#">>).
+    true  = match(<<"a/b/c">>, <<"#">>),
+    true  = match(<<"a/b/c">>, <<"+/#">>),
+    false = match(<<"$SYS/brokers">>, <<"#">>),
+    true  = match(<<"a/b/$c">>, <<"a/b/#">>),
+    true  = match(<<"a/b/$c">>, <<"a/#">>).
 
 t_match_perf(_) ->
     true = match(<<"a/b/ccc">>, <<"a/#">>),