소스 검색

Support inter-node messages via RPC cast

terry-xiaoyu 6 년 전
부모
커밋
b0e2b7db0c
10개의 변경된 파일82개의 추가작업 그리고 54개의 파일을 삭제
  1. 4 0
      etc/emqx.conf
  2. 1 2
      include/emqx.hrl
  3. 6 0
      priv/emqx.schema
  4. 1 1
      src/emqx.erl
  5. 50 38
      src/emqx_broker.erl
  6. 2 2
      src/emqx_packet.erl
  7. 1 1
      src/emqx_session.erl
  8. 4 4
      src/emqx_shared_sub.erl
  9. 10 4
      src/emqx_types.erl
  10. 3 2
      test/emqx_broker_SUITE.erl

+ 4 - 0
etc/emqx.conf

@@ -292,6 +292,10 @@ node.dist_listen_max = 6369
 ##--------------------------------------------------------------------
 ## RPC
 ##--------------------------------------------------------------------
+## RPC Mode.
+##
+## Value: sync | async
+rpc.mode = async
 
 ## TCP server port for RPC.
 ##

+ 1 - 2
include/emqx.hrl

@@ -73,8 +73,7 @@
 
 -record(delivery, {
           sender  :: pid(),      %% Sender of the delivery
-          message :: #message{}, %% The message delivered
-          results :: list()      %% Dispatches of the message
+          message :: #message{}  %% The message delivered
         }).
 
 %%--------------------------------------------------------------------

+ 6 - 0
priv/emqx.schema

@@ -338,6 +338,12 @@ end}.
 %% RPC
 %%--------------------------------------------------------------------
 
+%% RPC Mode.
+{mapping, "rpc.mode", "emqx.rpc_mode", [
+  {default, async},
+  {datatype, {enum, [sync, async]}}
+]}.
+
 %% RPC server port.
 {mapping, "rpc.tcp_server_port", "gen_rpc.tcp_server_port", [
   {default, 5369},

+ 1 - 1
src/emqx.erl

@@ -114,7 +114,7 @@ subscribe(Topic, SubOpts) when is_map(SubOpts) ->
 subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
     emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
 
--spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
+-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
 publish(Msg) ->
     emqx_broker:publish(Msg).
 

+ 50 - 38
src/emqx_broker.erl

@@ -191,7 +191,7 @@ do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
 %% Publish
 %%------------------------------------------------------------------------------
 
--spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
+-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
 publish(Msg) when is_record(Msg, message) ->
     _ = emqx_tracer:trace(publish, Msg),
     Headers = Msg#message.headers,
@@ -200,8 +200,7 @@ publish(Msg) when is_record(Msg, message) ->
             ?LOG(notice, "Publishing interrupted: ~s", [emqx_message:format(Msg)]),
             [];
         #message{topic = Topic} = Msg1 ->
-            Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
-            Delivery#delivery.results
+            route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
     end.
 
 %% Called internally
@@ -217,27 +216,27 @@ safe_publish(Msg) when is_record(Msg, message) ->
     end.
 
 delivery(Msg) ->
-    #delivery{sender = self(), message = Msg, results = []}.
+    #delivery{sender = self(), message = Msg}.
 
 %%------------------------------------------------------------------------------
 %% Route
 %%------------------------------------------------------------------------------
-
-route([], Delivery = #delivery{message = Msg}) ->
+-spec(route([emqx_types:route_entry()], emqx_types:delivery()) -> emqx_types:publish_result()).
+route([], #delivery{message = Msg}) ->
     emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
-    inc_dropped_cnt(Msg#message.topic), Delivery;
-
-route([{To, Node}], Delivery) when Node =:= node() ->
-    dispatch(To, Delivery);
-
-route([{To, Node}], Delivery = #delivery{results = Results}) when is_atom(Node) ->
-    forward(Node, To, Delivery#delivery{results = [{route, Node, To}|Results]});
-
-route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) ->
-    emqx_shared_sub:dispatch(Group, To, Delivery);
-
+    inc_dropped_cnt(Msg#message.topic),
+    [];
 route(Routes, Delivery) ->
-    lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
+    lists:foldl(fun(Route, Acc) ->
+            [do_route(Route, Delivery) | Acc]
+        end, [], Routes).
+
+do_route({To, Node}, Delivery) when Node =:= node() ->
+    {Node, To, dispatch(To, Delivery)};
+do_route({To, Node}, Delivery) when is_atom(Node) ->
+    {Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))};
+do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
+    {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
 
 aggre([]) ->
     [];
@@ -254,45 +253,58 @@ aggre(Routes) ->
       end, [], Routes).
 
 %% @doc Forward message to another node.
-forward(Node, To, Delivery) ->
-    %% rpc:call to ensure the delivery, but the latency:(
+-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
+    -> emqx_types:deliver_result()).
+forward(Node, To, Delivery, async) ->
+    case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
+        true -> ok;
+        {badrpc, Reason} ->
+            ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
+            {error, badrpc}
+    end;
+
+forward(Node, To, Delivery, sync) ->
     case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
         {badrpc, Reason} ->
-            ?LOG(error, "Failed to forward msg to ~s: ~p", [Node, Reason]),
-            Delivery;
-        Delivery1 -> Delivery1
+            ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
+            {error, badrpc};
+        Result -> Result
     end.
 
--spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:delivery()).
-dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
+-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
+dispatch(Topic, #delivery{message = Msg}) ->
     case subscribers(Topic) of
         [] ->
             emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
             inc_dropped_cnt(Topic),
-            Delivery;
+            {error, no_subscribers};
         [Sub] -> %% optimize?
-            Cnt = dispatch(Sub, Topic, Msg),
-            Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]};
+            dispatch(Sub, Topic, Msg);
         Subs ->
-            Cnt = lists:foldl(
-                    fun(Sub, Acc) ->
-                            dispatch(Sub, Topic, Msg) + Acc
-                    end, 0, Subs),
-            Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}
+            lists:foldl(
+                fun(Sub, Res) ->
+                    case dispatch(Sub, Topic, Msg) of
+                        ok -> Res;
+                        Err -> Err
+                    end
+                end, ok, Subs)
     end.
 
 dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
     case erlang:is_process_alive(SubPid) of
         true ->
             SubPid ! {dispatch, Topic, Msg},
-            1;
-        false -> 0
+            ok;
+        false -> {error, subscriber_die}
     end;
 dispatch({shard, I}, Topic, Msg) ->
     lists:foldl(
-      fun(SubPid, Cnt) ->
-              dispatch(SubPid, Topic, Msg) + Cnt
-      end, 0, subscribers({shard, Topic, I})).
+        fun(SubPid, Res) ->
+            case dispatch(SubPid, Topic, Msg) of
+                ok -> Res;
+                Err -> Err
+            end
+        end, ok, subscribers({shard, Topic, I})).
 
 inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
     ok;

+ 2 - 2
src/emqx_packet.erl

@@ -233,11 +233,11 @@ format_variable(#mqtt_packet_puback{packet_id = PacketId}) ->
 
 format_variable(#mqtt_packet_subscribe{packet_id     = PacketId,
                                        topic_filters = TopicFilters}) ->
-    io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, TopicFilters]);
+    io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, TopicFilters]);
 
 format_variable(#mqtt_packet_unsubscribe{packet_id     = PacketId,
                                          topic_filters = Topics}) ->
-    io_lib:format("PacketId=~p, TopicFilters=~p", [PacketId, Topics]);
+    io_lib:format("PacketId=~p, TopicFilters=~0p", [PacketId, Topics]);
 
 format_variable(#mqtt_packet_suback{packet_id = PacketId,
                                     reason_codes = ReasonCodes}) ->

+ 1 - 1
src/emqx_session.erl

@@ -257,7 +257,7 @@ subscribe(SPid, PacketId, Properties, TopicFilters) ->
 
 %% @doc Called by connection processes when publishing messages
 -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message())
-      -> {ok, emqx_types:deliver_results()} | {error, term()}).
+      -> {ok, emqx_types:publish_result()} | {error, term()}).
 publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
     %% Publish QoS0 message directly
     {ok, emqx_broker:publish(Msg)};

+ 4 - 4
src/emqx_shared_sub.erl

@@ -103,19 +103,19 @@ record(Group, Topic, SubPid) ->
     #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
 
 -spec(dispatch(emqx_topic:group(), emqx_topic:topic(), emqx_types:delivery())
-      -> emqx_types:delivery()).
+      -> emqx_topic:deliver_result()).
 dispatch(Group, Topic, Delivery) ->
     dispatch(Group, Topic, Delivery, _FailedSubs = []).
 
-dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, FailedSubs) ->
+dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
     #message{from = ClientId} = Msg,
     case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
         false ->
-            Delivery;
+            {error, no_subscribers};
         {Type, SubPid} ->
             case do_dispatch(SubPid, Topic, Msg, Type) of
                 ok ->
-                    Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]};
+                    ok;
                 {error, _Reason} ->
                     %% Failed to dispatch to this sub, try next.
                     dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])

+ 10 - 4
src/emqx_types.erl

@@ -46,10 +46,13 @@
              ]).
 
 -export_type([ delivery/0
-             , deliver_results/0
+             , publish_result/0
+             , deliver_result/0
              ]).
 
--export_type([route/0]).
+-export_type([ route/0
+             , route_entry/0
+             ]).
 
 -export_type([ alarm/0
              , plugin/0
@@ -99,9 +102,12 @@
 -type(message() :: #message{}).
 -type(banned() :: #banned{}).
 -type(delivery() :: #delivery{}).
--type(deliver_results() :: [{route, node(), topic()} |
-                            {dispatch, topic(), pos_integer()}]).
+-type(deliver_result() :: ok | {error, term()}).
+-type(publish_result() :: [ {node(), topic(), deliver_result()}
+                          | {share, topic(), deliver_result()}]).
 -type(route() :: #route{}).
+-type(sub_group() :: tuple() | binary()).
+-type(route_entry() :: {topic(), node()} | {topic, sub_group()}).
 -type(alarm() :: #alarm{}).
 -type(plugin() :: #plugin{}).
 -type(command() :: #command{}).

+ 3 - 2
test/emqx_broker_SUITE.erl

@@ -74,8 +74,9 @@ publish(_) ->
 
 dispatch_with_no_sub(_) ->
     Msg = emqx_message:make(ct, <<"no_subscribers">>, <<"hello">>),
-    Delivery = #delivery{sender = self(), message = Msg, results = []},
-    ?assertEqual(Delivery, emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
+    Delivery = #delivery{sender = self(), message = Msg},
+    ?assertEqual([{node(),<<"no_subscribers">>,{error,no_subscribers}}],
+                 emqx_broker:route([{<<"no_subscribers">>, node()}], Delivery)).
 
 pubsub(_) ->
     true = emqx:is_running(node()),