Browse Source

Fix conflicts

turtled 6 years ago
parent
commit
91a8dbda56
12 changed files with 150 additions and 62 deletions
  1. 32 0
      etc/emqx.conf
  2. 2 6
      include/emqx.hrl
  3. 40 0
      priv/emqx.schema
  4. 1 1
      rebar.config
  5. 1 1
      src/emqx.erl
  6. 50 38
      src/emqx_broker.erl
  7. 2 2
      src/emqx_packet.erl
  8. 2 1
      src/emqx_rpc.erl
  9. 2 2
      src/emqx_session.erl
  10. 4 4
      src/emqx_shared_sub.erl
  11. 11 5
      src/emqx_types.erl
  12. 3 2
      test/emqx_broker_SUITE.erl

+ 32 - 0
etc/emqx.conf

@@ -294,6 +294,18 @@ node.dist_listen_max = 6369
 ##--------------------------------------------------------------------
 ## RPC
 ##--------------------------------------------------------------------
+## RPC Mode.
+##
+## Value: sync | async
+rpc.mode = async
+
+## Max batch size of async RPC requests.
+##
+## Value: Integer
+## Zero or negative value disables rpc batching.
+##
+## NOTE: RPC batch won't work when rpc.mode = sync
+rpc.async_batch_size = 256
 
 ## TCP server port for RPC.
 ##
@@ -305,6 +317,11 @@ rpc.tcp_server_port = 5369
 ## Value: Port [1024-65535]
 rpc.tcp_client_port = 5369
 
+## Number of utgoing RPC connections.
+##
+## Value: Interger [1-256]
+rpc.tcp_client_num = 32
+
 ## RCP Client connect timeout.
 ##
 ## Value: Seconds
@@ -340,6 +357,21 @@ rpc.socket_keepalive_interval = 75s
 ## Value: Integer
 rpc.socket_keepalive_count = 9
 
+## Size of TCP send buffer.
+##
+## Value: Bytes
+rpc.socket_sndbuf = 1MB
+
+## Size of TCP receive buffer.
+##
+## Value: Seconds
+rpc.socket_recbuf = 1MB
+
+## Size of user-level software socket buffer.
+##
+## Value: Seconds
+rpc.socket_buffer = 1MB
+
 ##--------------------------------------------------------------------
 ## Log
 ##--------------------------------------------------------------------

+ 2 - 6
include/emqx.hrl

@@ -74,12 +74,8 @@
         }).
 
 -record(delivery, {
-          %% Sender of the delivery
-          sender  :: pid(),
-          %% The message delivered
-          message :: #message{},
-          %% Dispatches of the message
-          results :: list()
+          sender  :: pid(),      %% Sender of the delivery
+          message :: #message{}  %% The message delivered
         }).
 
 %%--------------------------------------------------------------------

+ 40 - 0
priv/emqx.schema

@@ -340,6 +340,17 @@ end}.
 %% RPC
 %%--------------------------------------------------------------------
 
+%% RPC Mode.
+{mapping, "rpc.mode", "emqx.rpc_mode", [
+  {default, async},
+  {datatype, {enum, [sync, async]}}
+]}.
+
+{mapping, "rpc.async_batch_size", "gen_rpc.max_batch_size", [
+  {default, 256},
+  {datatype, integer}
+]}.
+
 %% RPC server port.
 {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [
   {default, 5369},
@@ -352,6 +363,13 @@ end}.
   {datatype, integer}
 ]}.
 
+%% Default TCP port for outgoing connections
+{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
+  {default, 32},
+  {datatype, integer},
+  {validators, ["range:gt_0_lt_256"]}
+]}.
+
 %% Client connect timeout
 {mapping, "rpc.connect_timeout", "gen_rpc.connect_timeout", [
   {default, "5s"},
@@ -394,6 +412,28 @@ end}.
   {datatype, integer}
 ]}.
 
+%% Size of TCP send buffer
+{mapping, "rpc.socket_sndbuf", "gen_rpc.socket_sndbuf", [
+  {default, "1MB"},
+  {datatype, bytesize}
+]}.
+
+%% Size of TCP receive buffer
+{mapping, "rpc.socket_recbuf", "gen_rpc.socket_recbuf", [
+  {default, "1MB"},
+  {datatype, bytesize}
+]}.
+
+%% Size of TCP receive buffer
+{mapping, "rpc.socket_buffer", "gen_rpc.socket_buffer", [
+  {default, "1MB"},
+  {datatype, bytesize}
+]}.
+
+{validator, "range:gt_0_lt_256", "must greater than 0 and less than 256",
+  fun(X) -> X > 0 andalso X < 256 end
+}.
+
 %%--------------------------------------------------------------------
 %% Log
 %%--------------------------------------------------------------------

+ 1 - 1
rebar.config

@@ -4,7 +4,7 @@
      {gproc, "0.8.0"}, % hex
      {esockd, "5.5.0"}, %hex
      {ekka, {git, "https://github.com/emqx/ekka", {tag, "v0.5.8"}}},
-     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.0"}}},
+     {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
      {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
     ]}.
 

+ 1 - 1
src/emqx.erl

@@ -120,7 +120,7 @@ subscribe(Topic, SubOpts) when is_map(SubOpts) ->
 subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
     emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
 
--spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
+-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
 publish(Msg) ->
     emqx_broker:publish(Msg).
 

+ 50 - 38
src/emqx_broker.erl

@@ -193,7 +193,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
 %% Publish
 %%------------------------------------------------------------------------------
 
--spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
+-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
 publish(Msg) when is_record(Msg, message) ->
     _ = emqx_tracer:trace(publish, Msg),
     Headers = Msg#message.headers,
@@ -202,8 +202,7 @@ publish(Msg) when is_record(Msg, message) ->
             ?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]),
             [];
         #message{topic = Topic} = Msg1 ->
-            Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
-            Delivery#delivery.results
+            route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
     end.
 
 %% Called internally
@@ -219,27 +218,27 @@ safe_publish(Msg) when is_record(Msg, message) ->
     end.
 
 delivery(Msg) ->
-    #delivery{sender = self(), message = Msg, results = []}.
+    #delivery{sender = self(), message = Msg}.
 
 %%------------------------------------------------------------------------------
 %% Route
 %%------------------------------------------------------------------------------
-
-route([], Delivery = #delivery{message = Msg}) ->
+-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()).
+route([], #delivery{message = Msg}) ->
     emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
-    inc_dropped_cnt(Msg#message.topic), Delivery;
-
-route([{To, Node}], Delivery) when Node =:= node() ->
-    dispatch(To, Delivery);
-
-route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) ->
-    forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]});
-
-route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) ->
-    emqx_shared_sub:dispatch(Group, To, Delivery);
-
+    inc_dropped_cnt(Msg#message.topic),
+    [];
 route(Routes, Delivery) ->
-    lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
+    lists:foldl(fun(Route, Acc) ->
+            [do_route(Route, Delivery) | Acc]
+        end, [], Routes).
+
+do_route({To, Node}, Delivery) when Node =:= node() ->
+    {Node, To, dispatch(To, Delivery)};
+do_route({To, Node}, Delivery) when is_atom(Node) ->
+    {Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))};
+do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
+    {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
 
 aggre([]) ->
     [];
@@ -256,45 +255,58 @@ aggre(Routes) ->
       end, [], Routes).
 
 %% @doc Forward message to another node.
-forward(Node, To, Delivery) ->
-    %% rpc:call to ensure the delivery, but the latency:(
+-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
+    -> emqx_types:deliver_result()).
+forward(Node, To, Delivery, async) ->
+    case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
+        true -> ok;
+        {badrpc, Reason} ->
+            ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
+            {error, badrpc}
+    end;
+
+forward(Node, To, Delivery, sync) ->
     case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
         {badrpc, Reason} ->
-            ?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]),
-            Delivery;
-        Delivery1 -> Delivery1
+            ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
+            {error, badrpc};
+        Result -> Result
     end.
 
--spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()).
-dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
+-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
+dispatch(Topic, #delivery{message = Msg}) ->
     case subscribers(Topic) of
         [] ->
             emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
             inc_dropped_cnt(Topic),
-            Delivery;
+            {error, no_subscribers};
         [Sub] -> %% optimize?
-            Cnt = dispatch(Sub, Topic, Msg),
-            Delivery#delivery{results = [{deliver, Topic, Cnt}|Results]};
+            dispatch(Sub, Topic, Msg);
         Subs ->
-            Cnt = lists:foldl(
-                    fun(Sub, Acc) ->
-                            dispatch(Sub, Topic, Msg) + Acc
-                    end, 0, Subs),
-            Delivery#delivery{results = [{deliver, Topic, Cnt}|Results]}
+            lists:foldl(
+                fun(Sub, Res) ->
+                    case dispatch(Sub, Topic, Msg) of
+                        ok -> Res;
+                        Err -> Err
+                    end
+                end, ok, Subs)
     end.
 
 dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
         true ->
             SubPid ! {deliver, Topic, Msg},
-            1;
-        false -> 0
+            ok;
+        false -> {error, subscriber_die}
     end;
 dispatch({shard, I}, Topic, Msg) ->
     lists:foldl(
-      fun(SubPid, Cnt) ->
-              dispatch(SubPid, Topic, Msg) + Cnt
-      end, 0, subscribers({shard, Topic, I})).
+        fun(SubPid, Res) ->
+            case dispatch(SubPid, Topic, Msg) of
+                ok -> Res;
+                Err -> Err
+            end
+        end, ok, subscribers({shard, Topic, I})).
 
 inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
     ok;

+ 2 - 2
src/emqx_packet.erl

@@ -237,11 +237,11 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
 
 format_variable(#mqtt_packet_subscribe{packet_id     = PacketId,
                                        topic_filters = TopicFilters}) ->
-    io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]);
+    io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]);
 
 format_variable(#mqtt_packet_unsubscribe{packet_id     = PacketId,
                                          topic_filters = Topics}) ->
-    io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, Topics]);
+    io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]);
 
 format_variable(#mqtt_packet_suback{packet_id = PacketId,
                                     reason_codes = ReasonCodes}) ->

+ 2 - 1
src/emqx_rpc.erl

@@ -39,7 +39,8 @@ cast(Node, Mod, Fun, Args) ->
     filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
 
 rpc_node(Node) ->
-    {Node, erlang:system_info(scheduler_id)}.
+    {ok, ClientNum} = application:get_env(gen_rpc, tcp_client_num),
+    {Node, rand:uniform(ClientNum)}.
 
 rpc_nodes(Nodes) ->
     rpc_nodes(Nodes, []).

+ 2 - 2
src/emqx_session.erl

@@ -332,8 +332,8 @@ unsubscribe(Client, TopicFilter, Session = #session{subscriptions = Subs}) ->
 %%--------------------------------------------------------------------
 
 -spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
-      -> {ok, emqx_types:deliver_results()} |
-         {ok, emqx_types:deliver_results(), session()} |
+      -> {ok, emqx_types:publish_result()} |
+         {ok, emqx_types:publish_result(), session()} |
          {error, emqx_types:reason_code()}).
 publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
     case is_awaiting_full(Session) of

+ 4 - 4
src/emqx_shared_sub.erl

@@ -106,19 +106,19 @@ record(Group, Topic, SubPid) ->
     #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
 
 -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-      -> emqx_types:delivery()).
+      -> emqx_types:deliver_result()).
 dispatch(Group, Topic, Delivery) ->
     dispatch(Group, Topic, Delivery, _FailedSubs = []).
 
-dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, FailedSubs) ->
+dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
     #message{from = ClientId} = Msg,
     case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
         false ->
-            Delivery;
+            {error, no_subscribers};
         {Type, SubPid} ->
             case do_dispatch(SubPid, Topic, Msg, Type) of
                 ok ->
-                    Delivery#delivery{results = [{deliver, {Group, Topic}, 1} | Results]};
+                    ok;
                 {error, _Reason} ->
                     %% Failed to dispatch to this sub, try next.
                     dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])

+ 11 - 5
src/emqx_types.erl

@@ -61,11 +61,15 @@
              ]).
 
 -export_type([ delivery/0
-             , deliver_results/0
+             , publish_result/0
+             , deliver_result/0
              ]).
 
 -export_type([ route/0
-             , alarm/0
+             , route_entry/0
+             ]).
+
+-export_type([ alarm/0
              , plugin/0
              , banned/0
              , command/0
@@ -148,10 +152,12 @@
 -type(message() :: #message{}).
 -type(banned() :: #banned{}).
 -type(delivery() :: #delivery{}).
--type(deliver_results() :: [{route, node(), topic()} |
-                            {deliver, topic(), non_neg_integer()}
-                           ]).
+-type(deliver_result() :: ok | {error, term()}).
+-type(publish_result() :: [ {node(), topic(), deliver_result()}
+                          | {share, topic(), deliver_result()}]).
 -type(route() :: #route{}).
+-type(sub_group() :: tuple() | binary()).
+-type(route_entry() :: {topic(), node()} | {topic, sub_group()}).
 -type(alarm() :: #alarm{}).
 -type(plugin() :: #plugin{}).
 -type(command() :: #command{}).

+ 3 - 2
test/emqx_broker_SUITE.erl

@@ -81,8 +81,9 @@ t_publish(_) ->
 
 t_dispatch_with_no_sub(_) ->
     Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
-    Delivery = #delivery{sender = self(), message = Msg, results = []},
-    ?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
+    Delivery = #delivery{sender = self(), message = Msg},
+    ?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}],
+                 emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
 
 t_pubsub(_) ->
     true = emqx:is_running(node()),