Browse Source

Ordered messaging via multiple gen_rpc clients

terry-xiaoyu 6 years ago
parent
commit
3405dbaf5c
3 changed files with 57 additions and 16 deletions
  1. 2 2
      src/emqx_broker.erl
  2. 17 2
      src/emqx_rpc.erl
  3. 38 12
      test/emqx_rpc_SUITE.erl

+ 2 - 2
src/emqx_broker.erl

@@ -258,7 +258,7 @@ aggre(Routes) ->
 -spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RPCMode::sync|async)
     -> emqx_types:deliver_result()).
 forward(Node, To, Delivery, async) ->
-    case emqx_rpc:cast(Node, ?BROKER, dispatch, [To, Delivery]) of
+    case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
         true -> ok;
         {badrpc, Reason} ->
             ?LOG(error, "Ansync forward msg to ~s failed: ~p", [Node, Reason]),
@@ -266,7 +266,7 @@ forward(Node, To, Delivery, async) ->
     end;
 
 forward(Node, To, Delivery, sync) ->
-    case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
+    case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
         {badrpc, Reason} ->
             ?LOG(error, "Sync forward msg to ~s failed: ~p", [Node, Reason]),
             {error, badrpc};

+ 17 - 2
src/emqx_rpc.erl

@@ -18,8 +18,11 @@
 -module(emqx_rpc).
 
 -export([ call/4
+        , call/5
         , cast/4
+        , cast/5
         , multicall/4
+        , multicall/5
         ]).
 
 -compile({inline,
@@ -34,15 +37,27 @@
 call(Node, Mod, Fun, Args) ->
     filter_result(?RPC:call(rpc_node(Node), Mod, Fun, Args)).
 
+call(Key, Node, Mod, Fun, Args) ->
+    filter_result(?RPC:call(rpc_node({Key, Node}), Mod, Fun, Args)).
+
 multicall(Nodes, Mod, Fun, Args) ->
     filter_result(?RPC:multicall(rpc_nodes(Nodes), Mod, Fun, Args)).
 
+multicall(Key, Nodes, Mod, Fun, Args) ->
+    filter_result(?RPC:multicall(rpc_nodes([{Key, Node} || Node <- Nodes]), Mod, Fun, Args)).
+
 cast(Node, Mod, Fun, Args) ->
     filter_result(?RPC:cast(rpc_node(Node), Mod, Fun, Args)).
 
-rpc_node(Node) ->
+cast(Key, Node, Mod, Fun, Args) ->
+    filter_result(?RPC:cast(rpc_node({Key, Node}), Mod, Fun, Args)).
+
+rpc_node(Node) when is_atom(Node) ->
+    ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
+    {Node, rand:uniform(ClientNum)};
+rpc_node({Key, Node}) when is_atom(Node) ->
     ClientNum = application:get_env(gen_rpc, tcp_client_num, ?DefaultClientNum),
-    {Node, rand:uniform(ClientNum)}.
+    {Node, erlang:phash2(Key, ClientNum) + 1}.
 
 rpc_nodes(Nodes) ->
     rpc_nodes(Nodes, []).

+ 38 - 12
test/emqx_rpc_SUITE.erl

@@ -24,17 +24,6 @@
 
 all() -> emqx_ct:all(?MODULE).
 
-t_multicall(_) ->
-    error('TODO').
-
-t_cast(_) ->
-    error('TODO').
-
-t_call(_) ->
-    error('TODO').
-
-
-
 t_prop_rpc(_) ->
     ok = load(),
     Opts = [{to_file, user}, {numtests, 10}],
@@ -42,7 +31,9 @@ t_prop_rpc(_) ->
     ok = application:set_env(gen_rpc, call_receive_timeout, 1),
     ok = emqx_logger:set_log_level(emergency),
     ?assert(proper:quickcheck(prop_node(), Opts)),
+    ?assert(proper:quickcheck(prop_node_with_key(), Opts)),
     ?assert(proper:quickcheck(prop_nodes(), Opts)),
+    ?assert(proper:quickcheck(prop_nodes_with_key(), Opts)),
     ok = application:stop(gen_rpc),
     ok = unload().
 
@@ -57,6 +48,17 @@ prop_node() ->
                 end
             end).
 
+prop_node_with_key() ->
+    ?FORALL({Node, Key}, nodename_with_key(),
+            begin
+                ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])),
+                case emqx_rpc:call(Key, Node, erlang, system_time, []) of
+                    {badrpc, _Reason} -> true;
+                    Delivery when is_integer(Delivery) -> true;
+                    _Other -> false
+                end
+            end).
+
 prop_nodes() ->
     ?FORALL(Nodes, nodesname(),
             begin
@@ -70,6 +72,19 @@ prop_nodes() ->
                 end
             end).
 
+prop_nodes_with_key() ->
+    ?FORALL({Nodes, Key}, nodesname_with_key(),
+            begin
+                case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of
+                    {badrpc, _Reason} -> true;
+                    {RealResults, RealBadNodes}
+                      when is_list(RealResults);
+                           is_list(RealBadNodes) ->
+                        true;
+                    _Other -> false
+                end
+            end).
+
 %%--------------------------------------------------------------------
 %%  helper
 %%--------------------------------------------------------------------
@@ -96,8 +111,19 @@ nodename() ->
              list_to_atom(Node)
          end).
 
+nodename_with_key() ->
+    ?LET({NodePrefix, HostName, Key},
+         {node_prefix(), hostname(), choose(0, 10)},
+         begin
+             Node = NodePrefix ++ "@" ++ HostName,
+             {list_to_atom(Node), Key}
+         end).
+
 nodesname() ->
-    oneof([list(nodename()), ["emqxct@127.0.0.1"]]).
+    oneof([list(nodename()), ['emqxct@127.0.0.1']]).
+
+nodesname_with_key() ->
+    oneof([{list(nodename()), choose(0, 10)}, {['emqxct@127.0.0.1'], 1}]).
 
 node_prefix() ->
     oneof(["emqxct", text_like()]).