Procházet zdrojové kódy

Merge emq24 to X branch

Feng Lee před 8 roky
rodič
revize
f1640f5b85

+ 4 - 4
Makefile

@@ -1,6 +1,6 @@
 PROJECT = emqx
 PROJECT_DESCRIPTION = EMQ X Broker
-PROJECT_VERSION = 2.3.0
+PROJECT_VERSION = 2.3.2
 
 NO_AUTOPATCH = cuttlefish
 
@@ -12,9 +12,9 @@ dep_getopt       = git https://github.com/jcomellas/getopt v0.8.2
 dep_lager        = git https://github.com/basho/lager master
 dep_lager_syslog = git https://github.com/basho/lager_syslog
 dep_jsx          = git https://github.com/talentdeficit/jsx
-dep_esockd       = git https://github.com/emqtt/esockd master
+dep_esockd       = git https://github.com/emqtt/esockd v5.1
 dep_ekka         = git https://github.com/emqtt/ekka master
-dep_mochiweb     = git https://github.com/emqtt/mochiweb master
+dep_mochiweb     = git https://github.com/emqtt/mochiweb v4.2.0
 dep_pbkdf2       = git https://github.com/emqtt/pbkdf2 2.0.1
 dep_bcrypt       = git https://github.com/smarkets/erlang-bcrypt master
 dep_clique       = git https://github.com/emqtt/clique
@@ -27,7 +27,7 @@ dep_cuttlefish = git https://github.com/emqtt/cuttlefish
 
 TEST_DEPS = emqttc emq_dashboard
 dep_emqttc = git https://github.com/emqtt/emqttc
-dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard
+dep_emq_dashboard = git https://github.com/emqtt/emq_dashboard develop
 
 TEST_ERLC_OPTS += +debug_info
 TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'

+ 31 - 1
etc/emqx.conf

@@ -346,6 +346,10 @@ listener.tcp.external.access.2 = allow all
 ## TCP Socket Options
 listener.tcp.external.backlog = 1024
 
+listener.tcp.external.send_timeout = 15s
+
+listener.tcp.external.send_timeout_close = on
+
 #listener.tcp.external.recbuf = 4KB
 
 #listener.tcp.external.sndbuf = 4KB
@@ -376,6 +380,10 @@ listener.tcp.internal.max_clients = 102400
 ## TCP Socket Options
 listener.tcp.internal.backlog = 512
 
+listener.tcp.internal.send_timeout = 15s
+
+listener.tcp.external.send_timeout_close = on
+
 listener.tcp.internal.tune_buffer = on
 
 listener.tcp.internal.buffer = 1MB
@@ -485,6 +493,10 @@ listener.ssl.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 ## SSL Socket Options
 ## listener.ssl.external.backlog = 1024
 
+## listener.ssl.external.send_timeout = 15s
+
+## listener.ssl.external.send_timeout_close = on
+
 ## listener.ssl.external.recbuf = 4KB
 
 ## listener.ssl.external.sndbuf = 4KB
@@ -509,6 +521,10 @@ listener.ws.external.access.1 = allow all
 ## TCP Options
 listener.ws.external.backlog = 1024
 
+listener.ws.external.send_timeout = 15s
+
+listener.ws.external.send_timeout_close = on
+
 listener.ws.external.recbuf = 4KB
 
 listener.ws.external.sndbuf = 4KB
@@ -528,7 +544,21 @@ listener.wss.external.max_clients = 64
 
 ## listener.wss.external.zone = external
 
-listener.wss.external.access.1 = allow all
+## listener.wss.external.access.1 = allow all
+
+listener.wss.external.backlog = 1024
+
+listener.wss.external.send_timeout = 15s
+
+listener.wss.external.send_timeout_close = on
+
+## listener.wss.external.recbuf = 4KB
+
+## listener.wss.external.sndbuf = 4KB
+
+## listener.wss.external.buffer = 4KB
+
+## listener.wss.external.nodelay = true
 
 ## SSL Options
 listener.wss.external.handshake_timeout = 15s

+ 54 - 2
priv/emqx.schema

@@ -804,8 +804,18 @@ end}.
 ]}.
 
 {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [
-  {default, 1024},
-  {datatype, integer}
+  {datatype, integer},
+  {default, 1024}
+]}.
+
+{mapping, "listener.tcp.$name.send_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.tcp.$name.send_timeout_close", "emqx.listeners", [
+  {datatype, flag},
+  {default, on}
 ]}.
 
 {mapping, "listener.tcp.$name.recbuf", "emqx.listeners", [
@@ -882,6 +892,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.ssl.$name.send_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.ssl.$name.send_timeout_close", "emqx.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.ssl.$name.recbuf", "emqx.listeners", [
   {datatype, bytesize},
   hidden
@@ -995,6 +1015,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.ws.$name.send_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.ws.$name.send_timeout_close", "emqx.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.ws.$name.recbuf", "emqx.listeners", [
   {datatype, bytesize},
   hidden
@@ -1058,6 +1088,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.wss.$name.send_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.wss.$name.send_timeout_close", "emqx.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.wss.$name.recbuf", "emqx.listeners", [
   {datatype, bytesize},
   hidden
@@ -1144,6 +1184,8 @@ end}.
               end,
     TcpOpts = fun(Prefix) ->
                    Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
+                           {send_timeout, cuttlefish:conf_get(Prefix ++ ".send_timeout", Conf, undefined)},
+                           {send_timeout_close, cuttlefish:conf_get(Prefix ++ ".send_timeout_close", Conf, undefined)},
                            {recbuf,  cuttlefish:conf_get(Prefix ++ ".recbuf", Conf, undefined)},
                            {sndbuf,  cuttlefish:conf_get(Prefix ++ ".sndbuf", Conf, undefined)},
                            {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
@@ -1251,6 +1293,16 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.api.$name.send_timeout", "emqx.listeners", [
+  {datatype, {duration, ms}},
+  {default, "15s"}
+]}.
+
+{mapping, "listener.api.$name.send_timeout_close", "emqx.listeners", [
+  {datatype, flag},
+  {default, on}
+]}.
+
 {mapping, "listener.api.$name.recbuf", "emqx.listeners", [
   {datatype, bytesize},
   hidden

+ 1 - 1
src/emqx.app.src

@@ -1,6 +1,6 @@
 {application,emqx,
              [{description,"EMQ X Broker"},
-              {vsn,"2.3.0"},
+              {vsn,"2.3.2"},
               {modules,[]},
               {registered,[emqx_sup]},
               {applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]},

+ 3 - 1
src/emqx_client.erl

@@ -140,7 +140,9 @@ send_fun(Conn, Peername) ->
         ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}),
         emqx_metrics:inc('bytes/sent', iolist_size(Data)),
         try Conn:async_send(Data) of
-            true -> ok
+            ok -> ok;
+            true -> ok; %% Compatible with esockd 4.x
+            {error, Reason} -> Self ! {shutdown, Reason}
         catch
             error:Error -> Self ! {shutdown, Error}
         end

+ 3 - 5
src/emqx_protocol.erl

@@ -44,7 +44,7 @@
                       clean_sess, proto_ver, proto_name, username, is_superuser,
                       will_msg, keepalive, keepalive_backoff, max_clientid_len,
                       session, stats_data, mountpoint, ws_initial_headers,
-                      is_bridge, connected_at}).
+                      peercert_username, is_bridge, connected_at}).
 
 -type(proto_state() :: #proto_state{}).
 
@@ -362,13 +362,11 @@ send(Msg, State = #proto_state{client_id  = ClientId,
     emqx_hooks:run('message.delivered', [ClientId, Username], Msg),
     send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
 
-send(Packet = ?PACKET(Type),
-     State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
+send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
     trace(send, Packet, State),
     emqx_metrics:sent(Packet),
     SendFun(Packet),
-    Stats1 = inc_stats(send, Type, Stats),
-    {ok, State#proto_state{stats_data = Stats1}}.
+    {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}.
 
 trace(recv, Packet, ProtoState) ->
     ?LOG(info, "RECV ~s", [emqx_packet:format(Packet)], ProtoState);

+ 1 - 0
src/emqx_ws.erl

@@ -38,6 +38,7 @@ handle_request(Req) ->
 %%--------------------------------------------------------------------
 %% MQTT Over WebSocket
 %%--------------------------------------------------------------------
+
 handle_request('GET', "/mqtt", Req) ->
     lager:debug("WebSocket Connection from: ~s", [Req:get(peer)]),
     Upgrade = Req:get_header_value("Upgrade"),

+ 5 - 1
src/emqx_ws_client.erl

@@ -275,10 +275,14 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 send_fun(ReplyChannel) ->
+    Self = self(),
     fun(Packet) ->
         Data = emqx_serializer:serialize(Packet),
         emqx_metrics:inc('bytes/sent', iolist_size(Data)),
-        ReplyChannel({binary, Data})
+        case ReplyChannel({binary, Data}) of
+            ok -> ok;
+            {error, Reason} -> Self ! {shutdown, Reason}
+        end
     end.
 
 stat_fun(Conn) ->

+ 48 - 6
test/emqttd_router_SUITE.erl

@@ -34,7 +34,8 @@ groups() ->
        t_add_del_route,
        t_match_route,
        t_print,
-       t_has_route]},
+       t_has_route,
+       router_unused]},
      {local_route, [sequence],
       [t_get_local_topics,
        t_add_del_local_route,
@@ -86,11 +87,6 @@ t_match_route(_) ->
                   #mqtt_route{topic = <<"a/b/c">>, node = Node}],
                  lists:sort(?R:match(<<"a/b/c">>))).
 
-t_print(_) ->
-    ?R:add_route(<<"topic">>),
-    ?R:add_route(<<"topic/#">>),
-    ?R:print(<<"topic">>).
-
 t_has_route(_) ->
     ?R:add_route(<<"devices/+/messages">>),
     ?assert(?R:has_route(<<"devices/+/messages">>)).
@@ -130,3 +126,49 @@ clear_tables() ->
     ?R:clean_local_routes(),
     lists:foreach(fun mnesia:clear_table/1, [mqtt_route, mqtt_trie, mqtt_trie_node]).
 
+%%--------------------------------------------------------------------
+%% Router Test
+%%--------------------------------------------------------------------
+
+router_add_del(_) ->
+    %% Add
+    ?R:add_route(<<"#">>),
+    ?R:add_route(<<"a/b/c">>),
+    ?R:add_route(<<"+/#">>),
+    Routes = [R1, R2 | _] = [
+            #mqtt_route{topic = <<"#">>,     node = node()},
+            #mqtt_route{topic = <<"+/#">>,   node = node()},
+            #mqtt_route{topic = <<"a/b/c">>, node = node()}],
+    Routes = lists:sort(?R:match(<<"a/b/c">>)),
+
+    %% Batch Add
+    lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
+    Routes = lists:sort(?R:match(<<"a/b/c">>)),
+
+    %% Del
+    ?R:del_route(<<"a/b/c">>),
+    [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)),
+    {atomic, []} = mnesia:transaction(fun emqttd_trie:lookup/1, [<<"a/b/c">>]),
+
+    %% Batch Del
+    R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'},
+    ?R:add_route(R3),
+    ?R:del_route(R1),
+    ?R:del_route(R2),
+    ?R:del_route(R3),
+    [] = lists:sort(?R:match(<<"a/b/c">>)).
+
+t_print(_) ->
+    Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()},
+              #mqtt_route{topic = <<"#">>,     node = node()},
+              #mqtt_route{topic = <<"+/#">>,   node = node()}],
+    lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
+    ?R:print(<<"a/b/c">>),
+    ?R:del_route(<<"+/#">>),
+    ?R:del_route(<<"a/b/c">>),
+    ?R:del_route(<<"#">>).
+
+router_unused(_) ->
+    gen_server:call(emqttd_router, bad_call),
+    gen_server:cast(emqttd_router, bad_msg),
+    emqttd_router ! bad_info.

+ 1 - 49
test/emqx_SUITE.erl

@@ -60,7 +60,6 @@
 all() ->
     [{group, protocol},
      {group, pubsub},
-     {group, router},
      {group, session},
      {group, broker},
      {group, metrics},
@@ -83,10 +82,6 @@ groups() ->
        t_local_subscribe,
        t_shared_subscribe,
        'pubsub#', 'pubsub+']},
-     {router, [sequence],
-      [router_add_del,
-       router_print,
-       router_unused]},
      {session, [sequence],
       [start_session]},
      {broker, [sequence],
@@ -297,50 +292,6 @@ loop_recv(Topic, Timeout, Acc) ->
         Timeout -> {ok, Acc}
     end.
 
-%%--------------------------------------------------------------------
-%% Router Test
-%%--------------------------------------------------------------------
-
-router_add_del(_) ->
-    %% Add
-    emqx_router:add_route(<<"#">>),
-    emqx_router:add_route(<<"a/b/c">>),
-    emqx_router:add_route(<<"+/#">>),
-    Routes = [R1, R2 | _] = [
-            #mqtt_route{topic = <<"#">>,     node = node()},
-            #mqtt_route{topic = <<"+/#">>,   node = node()},
-            #mqtt_route{topic = <<"a/b/c">>, node = node()}],
-    Routes = lists:sort(emqx_router:match(<<"a/b/c">>)),
-
-    %% Batch Add
-    lists:foreach(fun(R) -> emqx_router:add_route(R) end, Routes),
-    Routes = lists:sort(emqx_router:match(<<"a/b/c">>)),
-
-    %% Del
-    emqx_router:del_route(<<"a/b/c">>),
-    [R1, R2] = lists:sort(emqx_router:match(<<"a/b/c">>)),
-    {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
-
-    %% Batch Del
-    R3 = #mqtt_route{topic = <<"#">>, node = 'a@127.0.0.1'},
-    emqx_router:add_route(R3),
-    emqx_router:del_route(R1),
-    emqx_router:del_route(R2),
-    emqx_router:del_route(R3),
-    [] = lists:sort(emqx_router:match(<<"a/b/c">>)).
-
-router_print(_) ->
-    Routes = [#mqtt_route{topic = <<"a/b/c">>, node = node()},
-              #mqtt_route{topic = <<"#">>,     node = node()},
-              #mqtt_route{topic = <<"+/#">>,   node = node()}],
-    lists:foreach(fun(R) -> emqx_router:add_route(R) end, Routes),
-    emqx_router:print(<<"a/b/c">>).
-
-router_unused(_) ->
-    gen_server:call(emqx_router, bad_call),
-    gen_server:cast(emqx_router, bad_msg),
-    emqx_router ! bad_info.
-
 recv_loop(Msgs) ->
     receive
         {dispatch, _Topic, Msg} ->
@@ -604,6 +555,7 @@ conflict_listeners(_) ->
     L = proplists:get_value("mqtt:tcp:0.0.0.0:1883", Listeners),
     ?assertEqual(1, proplists:get_value(current_clients, L)),
     ?assertEqual(1, proplists:get_value(conflict, proplists:get_value(shutdown_count, L))),
+    timer:sleep(100),
     emqttc:disconnect(C2).
 
 cli_vm(_) ->