|
|
@@ -24,6 +24,8 @@
|
|
|
|
|
|
-include_lib("common_test/include/ct.hrl").
|
|
|
|
|
|
+-define(APP, emqttd).
|
|
|
+
|
|
|
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
|
|
|
|
|
|
-define(MQTT_SSL_TWOWAY, [{cacertfile, "certs/cacert.pem"},
|
|
|
@@ -45,7 +47,6 @@ all() ->
|
|
|
{group, stats},
|
|
|
{group, hook},
|
|
|
{group, http},
|
|
|
- {group, cluster},
|
|
|
{group, alarms},
|
|
|
{group, cli},
|
|
|
{group, cleanSession}].
|
|
|
@@ -53,8 +54,9 @@ all() ->
|
|
|
groups() ->
|
|
|
[{protocol, [sequence],
|
|
|
[mqtt_connect,
|
|
|
- mqtt_ssl_oneway,
|
|
|
- mqtt_ssl_twoway]},
|
|
|
+ mqtt_ssl_twoway,
|
|
|
+ mqtt_ssl_oneway
|
|
|
+ ]},
|
|
|
{pubsub, [sequence],
|
|
|
[subscribe_unsubscribe,
|
|
|
publish, pubsub,
|
|
|
@@ -81,14 +83,6 @@ groups() ->
|
|
|
request_publish
|
|
|
% websocket_test
|
|
|
]},
|
|
|
- {cluster, [sequence],
|
|
|
- [cluster_test,
|
|
|
- cluster_join,
|
|
|
- cluster_leave,
|
|
|
- cluster_remove,
|
|
|
- cluster_remove2,
|
|
|
- cluster_node_down
|
|
|
- ]},
|
|
|
{alarms, [sequence],
|
|
|
[set_alarms]
|
|
|
},
|
|
|
@@ -109,24 +103,17 @@ groups() ->
|
|
|
]},
|
|
|
cli_vm]},
|
|
|
{cleanSession, [sequence],
|
|
|
- [cleanSession_validate,
|
|
|
- cleanSession_validate1
|
|
|
+ [cleanSession_validate
|
|
|
]}].
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- application:start(lager),
|
|
|
- DataDir = proplists:get_value(data_dir, Config),
|
|
|
- NewConfig = emqttd_config(DataDir),
|
|
|
- Vals = change_opts(ssl_oneway, DataDir, proplists:get_value(emqttd, NewConfig)),
|
|
|
- [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals],
|
|
|
- application:ensure_all_started(emqttd),
|
|
|
- [{config, NewConfig} | Config].
|
|
|
+ NewConfig = generate_config(),
|
|
|
+ lists:foreach(fun set_app_env/1, NewConfig),
|
|
|
+ application:ensure_all_started(?APP),
|
|
|
+ Config.
|
|
|
|
|
|
end_per_suite(_Config) ->
|
|
|
- application:stop(emqttd),
|
|
|
- application:stop(esockd),
|
|
|
- application:stop(gproc),
|
|
|
- emqttd_mnesia:ensure_stopped().
|
|
|
+ emqttd:shutdown().
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Protocol Test
|
|
|
@@ -147,31 +134,32 @@ connect_broker_(Packet, RecvSize) ->
|
|
|
Data.
|
|
|
|
|
|
mqtt_ssl_oneway(_) ->
|
|
|
+ emqttd:stop(),
|
|
|
+ change_opts(ssl_oneway),
|
|
|
+ emqttd:start(),
|
|
|
{ok, SslOneWay} = emqttc:start_link([{host, "localhost"},
|
|
|
{port, 8883},
|
|
|
{client_id, <<"ssloneway">>}, ssl]),
|
|
|
- timer:sleep(10),
|
|
|
+ timer:sleep(100),
|
|
|
emqttc:subscribe(SslOneWay, <<"topic">>, qos1),
|
|
|
{ok, Pub} = emqttc:start_link([{host, "localhost"},
|
|
|
{client_id, <<"pub">>}]),
|
|
|
emqttc:publish(Pub, <<"topic">>, <<"SSL oneWay test">>, [{qos, 1}]),
|
|
|
- timer:sleep(10),
|
|
|
+ timer:sleep(100),
|
|
|
receive {publish, _Topic, RM} ->
|
|
|
?assertEqual(<<"SSL oneWay test">>, RM)
|
|
|
after 1000 -> false
|
|
|
end,
|
|
|
+ timer:sleep(100),
|
|
|
emqttc:disconnect(SslOneWay),
|
|
|
emqttc:disconnect(Pub).
|
|
|
|
|
|
-mqtt_ssl_twoway(Config) ->
|
|
|
- emqttd_cluster:prepare(),
|
|
|
- DataDir = proplists:get_value(data_dir, Config),
|
|
|
- EmqConfig = proplists:get_value(config, Config),
|
|
|
- Vals = change_opts(ssl_twoway, DataDir, proplists:get_value(emqttd, EmqConfig)),
|
|
|
- [application:set_env(emqttd, Par, Value) || {Par, Value} <- Vals],
|
|
|
- emqttd_cluster:reboot(),
|
|
|
- ClientSSl = [{Key, filename:join([DataDir, File])} ||
|
|
|
- {Key, File} <- ?MQTT_SSL_CLIENT ],
|
|
|
+mqtt_ssl_twoway(_) ->
|
|
|
+ emqttd:stop(),
|
|
|
+ change_opts(ssl_twoway),
|
|
|
+ emqttd:start(),
|
|
|
+ ClientSSl = [{Key, local_path(["etc", File])} ||
|
|
|
+ {Key, File} <- ?MQTT_SSL_CLIENT],
|
|
|
{ok, SslTwoWay} = emqttc:start_link([{host, "localhost"},
|
|
|
{port, 8883},
|
|
|
{client_id, <<"ssltwoway">>},
|
|
|
@@ -427,7 +415,7 @@ hook_fun8(arg, initArg) -> stop.
|
|
|
request_status(_) ->
|
|
|
{InternalStatus, _ProvidedStatus} = init:get_status(),
|
|
|
AppStatus =
|
|
|
- case lists:keysearch(emqttd, 1, application:which_applications()) of
|
|
|
+ case lists:keysearch(?APP, 1, application:which_applications()) of
|
|
|
false -> not_running;
|
|
|
{value, _Val} -> running
|
|
|
end,
|
|
|
@@ -469,79 +457,6 @@ websocket_test(_) ->
|
|
|
|
|
|
ct:log("Req:~p", [Req]),
|
|
|
emqttd_http:handle_request(Req).
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% cluster group
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-cluster_test(_Config) ->
|
|
|
- Z = slave(emqttd, cluster_test_z),
|
|
|
- wait_running(Z),
|
|
|
- true = emqttd:is_running(Z),
|
|
|
- Node = node(),
|
|
|
- ok = rpc:call(Z, emqttd_cluster, join, [Node]),
|
|
|
- [Z, Node] = lists:sort(mnesia:system_info(running_db_nodes)),
|
|
|
- ct:log("Z:~p, Node:~p", [Z, Node]),
|
|
|
- ok = rpc:call(Z, emqttd_cluster, leave, []),
|
|
|
- [Node] = lists:sort(mnesia:system_info(running_db_nodes)),
|
|
|
- ok = slave:stop(Z).
|
|
|
-
|
|
|
-cluster_join(_) ->
|
|
|
- Z = slave(emqttd, cluster_join_z),
|
|
|
- N = slave(node, cluster_join_n),
|
|
|
- wait_running(Z),
|
|
|
- true = emqttd:is_running(Z),
|
|
|
- Node = node(),
|
|
|
- {error, {cannot_join_with_self, Node}} = emqttd_cluster:join(Node),
|
|
|
- {error, {node_not_running, N}} = emqttd_cluster:join(N),
|
|
|
- ok = emqttd_cluster:join(Z),
|
|
|
- slave:stop(Z),
|
|
|
- slave:stop(N).
|
|
|
-
|
|
|
-cluster_leave(_) ->
|
|
|
- Z = slave(emqttd, cluster_leave_z),
|
|
|
- wait_running(Z),
|
|
|
- {error, node_not_in_cluster} = emqttd_cluster:leave(),
|
|
|
- ok = emqttd_cluster:join(Z),
|
|
|
- Node = node(),
|
|
|
- [Z, Node] = emqttd_mnesia:running_nodes(),
|
|
|
- ok = emqttd_cluster:leave(),
|
|
|
- [Node] = emqttd_mnesia:running_nodes(),
|
|
|
- slave:stop(Z).
|
|
|
-
|
|
|
-cluster_remove(_) ->
|
|
|
- Z = slave(emqttd, cluster_remove_z),
|
|
|
- wait_running(Z),
|
|
|
- Node = node(),
|
|
|
- {error, {cannot_remove_self, Node}} = emqttd_cluster:remove(Node),
|
|
|
- ok = emqttd_cluster:join(Z),
|
|
|
- [Z, Node] = emqttd_mnesia:running_nodes(),
|
|
|
- ok = emqttd_cluster:remove(Z),
|
|
|
- [Node] = emqttd_mnesia:running_nodes(),
|
|
|
- slave:stop(Z).
|
|
|
-
|
|
|
-cluster_remove2(_) ->
|
|
|
- Z = slave(emqttd, cluster_remove2_z),
|
|
|
- wait_running(Z),
|
|
|
- ok = emqttd_cluster:join(Z),
|
|
|
- Node = node(),
|
|
|
- [Z, Node] = emqttd_mnesia:running_nodes(),
|
|
|
- ok = emqttd_cluster:remove(Z),
|
|
|
- ok = rpc:call(Z, emqttd_mnesia, ensure_stopped, []),
|
|
|
- [Node] = emqttd_mnesia:running_nodes(),
|
|
|
- slave:stop(Z).
|
|
|
-
|
|
|
-cluster_node_down(_) ->
|
|
|
- Z = slave(emqttd, cluster_node_down),
|
|
|
- timer:sleep(1000),
|
|
|
- wait_running(Z),
|
|
|
- ok = emqttd_cluster:join(Z),
|
|
|
- ok = rpc:call(Z, emqttd_router, add_route, [<<"a/b/c">>]),
|
|
|
- ok = rpc:call(Z, emqttd_router, add_route, [<<"#">>]),
|
|
|
- Routes = lists:sort(emqttd_router:match(<<"a/b/c">>)),
|
|
|
- ct:log("Routes: ~p~n", [Routes]),
|
|
|
- [<<"#">>, <<"a/b/c">>] = [Topic || #mqtt_route{topic = Topic} <- Routes],
|
|
|
- slave:stop(Z),
|
|
|
- timer:sleep(1000),
|
|
|
- [] = lists:sort(emqttd_router:match(<<"a/b/c">>)).
|
|
|
|
|
|
set_alarms(_) ->
|
|
|
AlarmTest = #mqtt_alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
|
|
|
@@ -551,8 +466,6 @@ set_alarms(_) ->
|
|
|
emqttd_alarm:clear_alarm(<<"1">>),
|
|
|
[] = emqttd_alarm:get_alarms().
|
|
|
|
|
|
-
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Cli group
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -680,87 +593,56 @@ cleanSession_validate(_) ->
|
|
|
emqttc:disconnect(Pub),
|
|
|
emqttc:disconnect(C11).
|
|
|
|
|
|
-cleanSession_validate1(_) ->
|
|
|
- {ok, C1} = emqttc:start_link([{host, "localhost"},
|
|
|
- {port, 1883},
|
|
|
- {client_id, <<"c1">>},
|
|
|
- {clean_sess, true}]),
|
|
|
- timer:sleep(10),
|
|
|
- emqttc:subscribe(C1, <<"topic">>, qos1),
|
|
|
- emqttc:disconnect(C1),
|
|
|
- {ok, Pub} = emqttc:start_link([{host, "localhost"},
|
|
|
- {port, 1883},
|
|
|
- {client_id, <<"pub">>}]),
|
|
|
-
|
|
|
- emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]),
|
|
|
- timer:sleep(10),
|
|
|
- {ok, C11} = emqttc:start_link([{host, "localhost"},
|
|
|
- {port, 1883},
|
|
|
- {client_id, <<"c1">>},
|
|
|
- {clean_sess, false}]),
|
|
|
- timer:sleep(100),
|
|
|
- Metrics = emqttd_metrics:all(),
|
|
|
- ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)),
|
|
|
- ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)),
|
|
|
- emqttc:disconnect(Pub),
|
|
|
- emqttc:disconnect(C11).
|
|
|
-
|
|
|
-
|
|
|
-ensure_ok(ok) -> ok;
|
|
|
-ensure_ok({error, {already_started, _}}) -> ok.
|
|
|
-
|
|
|
-host() -> [_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
|
|
|
-
|
|
|
-wait_running(Node) ->
|
|
|
- wait_running(Node, 30000).
|
|
|
-
|
|
|
-wait_running(Node, Timeout) when Timeout < 0 ->
|
|
|
- throw({wait_timeout, Node});
|
|
|
-
|
|
|
-wait_running(Node, Timeout) ->
|
|
|
- case rpc:call(Node, emqttd, is_running, [Node]) of
|
|
|
- true -> ok;
|
|
|
- false -> timer:sleep(100),
|
|
|
- wait_running(Node, Timeout - 100)
|
|
|
- end.
|
|
|
-
|
|
|
-slave(emqttd, Node) ->
|
|
|
- {ok, Emq} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
|
|
|
- rpc:call(Emq, application, ensure_all_started, [emqttd]),
|
|
|
- Emq;
|
|
|
-
|
|
|
-slave(node, Node) ->
|
|
|
- {ok, N} = slave:start(host(), Node, "-pa ../../ebin -pa ../../deps/*/ebin"),
|
|
|
- N.
|
|
|
-
|
|
|
-emqttd_config(DataDir) ->
|
|
|
- Schema = cuttlefish_schema:files([filename:join([DataDir, "emqttd.schema"])]),
|
|
|
- Conf = conf_parse:file(filename:join([DataDir, "emqttd.conf"])),
|
|
|
- cuttlefish_generator:map(Schema, Conf).
|
|
|
-
|
|
|
-change_opts(SslType, DataDir, Vals) ->
|
|
|
- Listeners = proplists:get_value(listeners, Vals),
|
|
|
+change_opts(SslType) ->
|
|
|
+ {ok, Listeners} = application:get_env(?APP, listeners),
|
|
|
NewListeners =
|
|
|
lists:foldl(fun({Protocol, Port, Opts} = Listener, Acc) ->
|
|
|
case Protocol of
|
|
|
ssl ->
|
|
|
SslOpts = proplists:get_value(sslopts, Opts),
|
|
|
- Keyfile = filename:join([DataDir, proplists:get_value(keyfile, SslOpts)]),
|
|
|
- Certfile = filename:join([DataDir, proplists:get_value(certfile, SslOpts)]),
|
|
|
+ Keyfile = local_path(["etc/certs", "key.pem"]),
|
|
|
+ Certfile = local_path(["etc/certs", "cert.pem"]),
|
|
|
TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
|
|
|
TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
|
|
|
TupleList3 =
|
|
|
case SslType of
|
|
|
ssl_twoway->
|
|
|
- CAfile = filename:join([DataDir, proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
|
|
|
+ CAfile = local_path(["etc", proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)]),
|
|
|
MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
|
|
|
lists:merge(TupleList2, MutSslList);
|
|
|
_ ->
|
|
|
- TupleList2
|
|
|
+ lists:filter(fun ({cacertfile, _}) -> false;
|
|
|
+ ({verify, _}) -> false;
|
|
|
+ ({fail_if_no_peer_cert, _}) -> false;
|
|
|
+ (_) -> true
|
|
|
+ end, TupleList2)
|
|
|
end,
|
|
|
- [{Protocol, Port, [{ssl, TupleList3}]} | Acc];
|
|
|
+ [{Protocol, Port, lists:keyreplace(sslopts, 1, Opts, {sslopts, TupleList3})} | Acc];
|
|
|
_ ->
|
|
|
[Listener | Acc]
|
|
|
end
|
|
|
end, [], Listeners),
|
|
|
- lists:keyreplace(listeners, 1, Vals, {listeners, NewListeners}).
|
|
|
+ application:set_env(?APP, listeners, NewListeners).
|
|
|
+
|
|
|
+generate_config() ->
|
|
|
+ Schema = cuttlefish_schema:files([local_path(["priv", "emq.schema"])]),
|
|
|
+ Conf = conf_parse:file([local_path(["etc", "emq.conf"])]),
|
|
|
+ cuttlefish_generator:map(Schema, Conf).
|
|
|
+
|
|
|
+get_base_dir(Module) ->
|
|
|
+ {file, Here} = code:is_loaded(Module),
|
|
|
+ filename:dirname(filename:dirname(Here)).
|
|
|
+
|
|
|
+get_base_dir() ->
|
|
|
+ get_base_dir(?MODULE).
|
|
|
+
|
|
|
+local_path(Components, Module) ->
|
|
|
+ filename:join([get_base_dir(Module) | Components]).
|
|
|
+
|
|
|
+local_path(Components) ->
|
|
|
+ local_path(Components, ?MODULE).
|
|
|
+
|
|
|
+set_app_env({App, Lists}) ->
|
|
|
+ lists:foreach(fun({Par, Var}) ->
|
|
|
+ application:set_env(App, Par, Var)
|
|
|
+ end, Lists).
|