huangdan 9 سال پیش
والد
کامیت
c5bb1e031e
1فایلهای تغییر یافته به همراه113 افزوده شده و 0 حذف شده
  1. 113 0
      test/emqttd_SUITE.erl

+ 113 - 0
test/emqttd_SUITE.erl

@@ -37,6 +37,7 @@ all() ->
      {group, stats},
      {group, hook},
      {group, http},
+     {group, cluster},
      %%{group, backend},
      {group, cli}].
 
@@ -70,6 +71,14 @@ groups() ->
      [request_status,
       request_publish
      ]},
+    {cluster, [sequence],
+     [cluster_test,
+      cluster_join,
+      cluster_leave,
+      cluster_remove,
+      cluster_remove2,
+      cluster_node_down
+     ]},
      {cli, [sequence],
       [ctl_register_cmd,
        cli_status,
@@ -378,6 +387,81 @@ auth_header_(User, Pass) ->
     Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
     {"Authorization","Basic " ++ Encoded}.
 
+%%--------------------------------------------------------------------
+%% cluster group
+%%--------------------------------------------------------------------
+cluster_test(_Config) ->
+    Z = slave(emqttd, cluster_test_z),
+    wait_running(Z),
+    true = emqttd:is_running(Z),
+    Node = node(),
+    ok = rpc:call(Z, emqttd_cluster, join, [Node]),
+    [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)),
+    ct:log("Z:~p, Node:~p", [Z, Node]),
+    ok = rpc:call(Z, emqttd_cluster, leave, []),
+    [Node] = lists:sort(mnesia:system_info(running_db_nodes)),
+    ok = slave:stop(Z).
+
+cluster_join(_) ->
+    Z = slave(emqttd, cluster_join_z),
+    N = slave(node, cluster_join_n),
+    wait_running(Z),
+    true = emqttd:is_running(Z),
+    Node = node(),
+    {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node),
+    {error, {node_not_running, N}} = emqttd_cluster:join(N),
+    ok = emqttd_cluster:join(Z),
+    slave:stop(Z),
+    slave:stop(N).
+ 
+cluster_leave(_) ->
+    Z = slave(emqttd, cluster_leave_z),
+    wait_running(Z),
+    {error, node_not_in_cluster} = emqttd_cluster:leave(),
+    ok = emqttd_cluster:join(Z),
+    Node = node(),
+    [Z, Node] = emqttd_mnesia:running_nodes(),
+    ok = emqttd_cluster:leave(),
+    [Node] = emqttd_mnesia:running_nodes(),
+    slave:stop(Z).
+
+cluster_remove(_) ->
+    Z = slave(emqttd, cluster_remove_z),
+    wait_running(Z),
+    Node = node(),
+    {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node),
+    ok = emqttd_cluster:join(Z),
+    [Z, Node] = emqttd_mnesia:running_nodes(),
+    ok = emqttd_cluster:remove(Z),
+    [Node] = emqttd_mnesia:running_nodes(),
+    slave:stop(Z).
+
+cluster_remove2(_) ->
+    Z = slave(emqttd, cluster_remove2_z),
+    wait_running(Z),
+    ok = emqttd_cluster:join(Z),
+    Node = node(),
+    [Z, Node] = emqttd_mnesia:running_nodes(),
+    ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []),
+    ok = emqttd_cluster:remove(Z),
+    [Node] = emqttd_mnesia:running_nodes(),
+    slave:stop(Z).
+
+cluster_node_down(_) ->
+    Z = slave(emqttd, cluster_node_down),
+    timer:sleep(1000),
+    wait_running(Z),
+    ok = emqttd_cluster:join(Z),
+    ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]),
+    ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]),
+    Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)),
+    ct:log("Routes: ~p~n", [Routes]),
+    [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes],
+    slave:stop(Z),
+    timer:sleep(1000),
+    Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)).
+
+ 
 %%--------------------------------------------------------------------
 %% Cli group
 %%--------------------------------------------------------------------
@@ -451,3 +535,32 @@ cli_vm(_) ->
     emqttd_cli:vm([]),
     emqttd_cli:vm(["ports"]).
 
+
+ensure_ok(ok) -> ok;
+ensure_ok({error, {already_started, _}}) -> ok.
+
+host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
+
+wait_running(Node) ->
+    wait_running(Node, 30000).
+
+wait_running(Node, Timeout) when Timeout < 0 ->
+    throw({wait_timeout, Node});
+
+wait_running(Node, Timeout) ->
+    case rpc:call(Node, emqttd, is_running, [Node]) of
+        true  -> ok;
+        false -> timer:sleep(100),
+                 wait_running(Node, Timeout - 100)
+    end.
+
+slave(emqttd, Node) ->
+    {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
+    rpc:call(Emq, application, ensure_all_started, [emqttd]),
+    Emq;
+
+slave(node, Node) ->
+    {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
+    N.
+
+