Browse Source

Merge branch 'emqx30' into improve_connect

tigercl 7 years ago
parent
commit
64cb920b3b

+ 3 - 4
Makefile

@@ -35,13 +35,12 @@ EUNIT_OPTS = verbose
 # CT_SUITES = emqx_frame
 ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
 
-
-CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_connection emqx_session \
+CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
 			emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight emqx_json \
-			emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mqtt_caps \
+			emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mod_sup emqx_mqtt_caps \
 			emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
-		 	emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
+		 	emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge emqx_hooks
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

+ 3 - 3
etc/emqx.conf

@@ -1159,10 +1159,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
 ## Value: on | off
 ## listener.ssl.external.honor_cipher_order = on
 
-## Use the CN field from the client certificate as a username.
+## Use the CN, EN or CRT field from the client certificate as a username.
 ## Notice that 'verify' should be set as 'verify_peer'.
 ##
-## Value: cn | en
+## Value: cn | en | crt
 ## listener.ssl.external.peer_cert_as_username = cn
 
 ## TCP backlog for the SSL connection.
@@ -1522,7 +1522,7 @@ listener.wss.external.certfile = {{ platform_etc_dir }}/certs/cert.pem
 
 ## See: listener.ssl.$name.peer_cert_as_username
 ##
-## Value: cn | dn
+## Value: cn | dn | crt
 ## listener.wss.external.peer_cert_as_username = cn
 
 ## TCP backlog for the WebSocket/SSL connection.

+ 3 - 3
priv/emqx.schema

@@ -949,7 +949,7 @@ end}.
 ]}.
 
 {mapping, "listener.tcp.$name.peer_cert_as_username", "emqx.listeners", [
-  {datatype, {enum, [cn, dn]}}
+  {datatype, {enum, [cn, dn, crt]}}
 ]}.
 
 {mapping, "listener.tcp.$name.backlog", "emqx.listeners", [
@@ -1139,7 +1139,7 @@ end}.
 ]}.
 
 {mapping, "listener.ssl.$name.peer_cert_as_username", "emqx.listeners", [
-  {datatype, {enum, [cn, dn]}}
+  {datatype, {enum, [cn, dn, crt]}}
 ]}.
 
 %%--------------------------------------------------------------------
@@ -1400,7 +1400,7 @@ end}.
 ]}.
 
 {mapping, "listener.wss.$name.peer_cert_as_username", "emqx.listeners", [
-  {datatype, {enum, [cn, dn]}}
+  {datatype, {enum, [cn, dn, crt]}}
 ]}.
 
 {translation, "emqx.listeners", fun(Conf) ->

+ 5 - 0
src/emqx_banned.erl

@@ -102,8 +102,13 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
+-ifdef(TEST).
+ensure_expiry_timer(State) ->
+    State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}.
+-else.
 ensure_expiry_timer(State) ->
     State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
+-endif.
 
 expire_banned_items(Now) ->
     mnesia:foldl(fun

+ 12 - 2
src/emqx_broker.erl

@@ -260,9 +260,19 @@ subscription(Topic, Subscriber) ->
 
 -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
 subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
-    length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1;
+    case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of
+        {Match, _} ->
+            length(Match) >= 1;
+        '$end_of_table' ->
+            false
+    end;
 subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
-    length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1;
+    case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of
+        {Match, _} ->
+            length(Match) >= 1;
+    '$end_of_table' ->
+            false
+    end;
 subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
     ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
 

+ 0 - 1
src/emqx_mod_rewrite.erl

@@ -75,4 +75,3 @@ compile(Rules) ->
                   {ok, MP} = re:compile(Re),
                   {rewrite, Topic, MP, Dest}
               end, Rules).
-

+ 37 - 34
src/emqx_protocol.erl

@@ -33,34 +33,34 @@
 -export([shutdown/2]).
 
 -record(pstate, {
-         zone,
-         sendfun,
-         peername,
-         peercert,
-         proto_ver,
-         proto_name,
-         ackprops,
-         client_id,
-         is_assigned,
-         conn_pid,
-         conn_props,
-         ack_props,
-         username,
-         session,
-         clean_start,
-         topic_aliases,
-         packet_size,
-         will_msg,
-         keepalive,
-         mountpoint,
-         is_super,
-         is_bridge,
-         enable_ban,
-         enable_acl,
-         recv_stats,
-         send_stats,
-         connected,
-         connected_at
+          zone,
+          sendfun,
+          peername,
+          peercert,
+          proto_ver,
+          proto_name,
+          client_id,
+          is_assigned,
+          conn_pid,
+          conn_props,
+          ack_props,
+          username,
+          session,
+          clean_start,
+          topic_aliases,
+          packet_size,
+          will_topic,
+          will_msg,
+          keepalive,
+          mountpoint,
+          is_super,
+          is_bridge,
+          enable_ban,
+          enable_acl,
+          recv_stats,
+          send_stats,
+          connected,
+          connected_at
         }).
 
 -type(state() :: #pstate{}).
@@ -70,6 +70,8 @@
 -compile(export_all).
 -endif.
 
+-define(NO_PROPS, undefined).
+
 -define(LOG(Level, Format, Args, PState),
         emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
                           [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
@@ -105,9 +107,10 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
 
 init_username(Peercert, Options) ->
     case proplists:get_value(peer_cert_as_username, Options) of
-        cn -> esockd_peercert:common_name(Peercert);
-        dn -> esockd_peercert:subject(Peercert);
-        _  -> undefined
+        cn  -> esockd_peercert:common_name(Peercert);
+        dn  -> esockd_peercert:subject(Peercert);
+        crt -> Peercert;
+        _   -> undefined
     end.
 
 set_username(Username, PState = #pstate{username = undefined}) ->
@@ -597,10 +600,10 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun
 %%------------------------------------------------------------------------------
 %% Assign a clientid
 
-maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
+maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps}) ->
     ClientId = emqx_guid:to_base62(emqx_guid:gen()),
     AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
-    PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1};
+    PState#pstate{client_id = ClientId, is_assigned = true, ack_props = AckProps1};
 maybe_assign_client_id(PState) ->
     PState.
 
@@ -667,7 +670,7 @@ authenticate(Credentials, Password) ->
             {error, Error}
     end.
 
-set_property(Name, Value, undefined) ->
+set_property(Name, Value, ?NO_PROPS) ->
     #{Name => Value};
 set_property(Name, Value, Props) ->
     Props#{Name => Value}.

+ 10 - 6
test/emqx_banned_SUITE.erl

@@ -29,13 +29,17 @@ t_banned_all(_) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     emqx_banned:start_link(),
     TimeNow = erlang:system_time(second),
-    ok = emqx_banned:add(#banned{who = {client_id, <<"TestClient">>},
-                                 reason = <<"test">>,
-                                 by = <<"banned suite">>,
-                                 desc = <<"test">>,
-                                 until = TimeNow + 10}),
+    Banned = #banned{who    = {client_id, <<"TestClient">>},
+                     reason = <<"test">>,
+                     by     = <<"banned suite">>,
+                     desc   = <<"test">>,
+                     until  = TimeNow + 1},
+    ok = emqx_banned:add(Banned),
     % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed
-    timer:sleep(100),
+    ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    timer:sleep(2500),
+    ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    ok = emqx_banned:add(Banned),
     ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
     emqx_banned:del({client_id, <<"TestClient">>}),
     ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),

+ 57 - 0
test/emqx_bridge_SUITE.erl

@@ -0,0 +1,57 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_bridge_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    [bridge_test].
+
+init_per_suite(Config) ->
+    emqx_ct_broker_helpers:run_setup_steps(),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_broker_helpers:run_teardown_steps().
+
+bridge_test(_) ->
+    {ok, _Pid} = emqx_bridge:start_link(emqx, []),
+    #{msg := <<"start bridge successfully">>}
+        = emqx_bridge:start_bridge(emqx),
+    test_forwards(),
+    test_subscriptions(0),
+    test_subscriptions(1),
+    test_subscriptions(2),
+    #{msg := <<"stop bridge successfully">>}
+        = emqx_bridge:stop_bridge(emqx),
+    ok.
+
+test_forwards() ->
+    emqx_bridge:add_forward(emqx, <<"test_forwards">>),
+    [<<"test_forwards">>] = emqx_bridge:show_forwards(emqx),
+    emqx_bridge:del_forward(emqx, <<"test_forwards">>),
+    [] = emqx_bridge:show_forwards(emqx),
+    ok.
+
+test_subscriptions(QoS) ->
+    emqx_bridge:add_subscription(emqx, <<"test_subscriptions">>, QoS),
+    [{<<"test_subscriptions">>, QoS}] = emqx_bridge:show_subscriptions(emqx),
+    emqx_bridge:del_subscription(emqx, <<"test_subscriptions">>),
+    [] = emqx_bridge:show_subscriptions(emqx),
+    ok.

+ 11 - 2
test/emqx_broker_SUITE.erl

@@ -60,6 +60,11 @@ subscribe_unsubscribe(_) ->
     ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
     ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
     ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
+    true = emqx:subscribed(<<"topic">>, <<"clientId">>),
+    Topics = emqx:topics(),
+    lists:foreach(fun(Topic) -> 
+                      ?assert(lists:member(Topic, Topics))
+                  end, Topics),
     ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
     ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
     ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
@@ -72,12 +77,16 @@ publish(_) ->
     ?assert(receive {dispatch, <<"test/+">>, Msg} -> true after 5 -> false end).
 
 pubsub(_) ->
+    true = emqx:is_running(node()),
     Self = self(),
     Subscriber = {Self, <<"clientId">>},
     ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }),
-    #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    #{qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    #{qos := 1} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
+    true = emqx:set_subopts(<<"a/b/c">>, Subscriber, #{qos => 0}),
+    #{qos := 0} = emqx:get_subopts(<<"a/b/c">>, Subscriber),
     ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
-    #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
+    #{qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
     %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
     timer:sleep(10),
     [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),

+ 0 - 47
test/emqx_connection_SUITE.erl

@@ -1,47 +0,0 @@
-%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-
--module(emqx_connection_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include_lib("common_test/include/ct.hrl").
-
-all() -> 
-    [{group, connection}].
-
-groups() ->
-    [{connection, [sequence], [t_attrs]}].
-
-init_per_suite(Config) ->
-    emqx_ct_broker_helpers:run_setup_steps(),
-    Config.
-    
-end_per_suite(_Config) ->
-    emqx_ct_broker_helpers:run_teardown_steps().
-
-
-t_attrs(_) ->
-    {ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]),
-    [{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>),
-    Attrs = emqx_connection:attrs(ConnPid),
-    <<"simpleClient">> = proplists:get_value(client_id, Attrs),
-    <<"plain">> = proplists:get_value(username, Attrs),
-    emqx_client:disconnect(C).
-
-%% t_stats() ->
-%%     {ok, C, _ } = emqx_client;
-%% t_stats() ->
-

+ 18 - 1
test/emqx_hooks_SUITE.erl

@@ -43,7 +43,7 @@ add_delete_hook(_) ->
                   {callback, {?MODULE, hook_fun2, []}, undefined, 8}],
     ?assertEqual(Callbacks2, emqx_hooks:lookup(emqx_hook)),
     ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun1, []}),
-    ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2, []}),
+    ok = emqx:unhook(emqx_hook, {?MODULE, hook_fun2}),
     timer:sleep(1000),
     ?assertEqual([], emqx_hooks:lookup(emqx_hook)),
     ok = emqx_hooks:stop().
@@ -62,6 +62,17 @@ run_hooks(_) ->
     ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun7/2, [initArg]),
     ok = emqx:hook(foreach_hook, fun ?MODULE:hook_fun8/2, [initArg]),
     stop = emqx:run_hooks(foreach_hook, [arg]),
+
+    ok = emqx:hook(foldl_hook2, fun ?MODULE:hook_fun9/2),
+    ok = emqx:hook(foldl_hook2, {?MODULE, hook_fun10, []}),
+    {stop, []} = emqx:run_hooks(foldl_hook2, [arg], []),
+
+    ok = emqx:hook(filter1_hook, {?MODULE, hook_fun1, []}, {?MODULE, hook_filter1, []}, 0),
+    ok = emqx:run_hooks(filter1_hook, [arg]),
+
+    ok = emqx:hook(filter2_hook, {?MODULE, hook_fun2, []}, {?MODULE, hook_filter2, []}),
+    {ok, []} = emqx:run_hooks(filter2_hook, [arg], []),
+
     ok = emqx_hooks:stop().
 
 hook_fun1([]) -> ok.
@@ -75,3 +86,9 @@ hook_fun6(arg, initArg) -> ok.
 hook_fun7(arg, initArg) -> any.
 hook_fun8(arg, initArg) -> stop.
 
+hook_fun9(arg, _Acc)  -> any.
+hook_fun10(arg, _Acc)  -> stop.
+
+hook_filter1(arg) -> true.
+hook_filter2(arg, _Acc) -> true.
+

+ 63 - 0
test/emqx_mod_rewrite_tests.erl

@@ -0,0 +1,63 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_mod_rewrite_tests).
+
+-include_lib("emqx.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+
+rules() ->
+    Rawrules1 = "x/# ^x/y/(.+)$ z/y/$1",
+    Rawrules2 = "y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2",
+    Rawrules = [Rawrules1, Rawrules2],
+    Rules = lists:map(fun(Rule) ->
+                              [Topic, Re, Dest] = string:tokens(Rule, " "),
+                              {rewrite,
+                               list_to_binary(Topic),
+                               list_to_binary(Re),
+                               list_to_binary(Dest)}
+                      end, Rawrules),
+    lists:map(fun({rewrite, Topic, Re, Dest}) ->
+                      {ok, MP} = re:compile(Re),
+                      {rewrite, Topic, MP, Dest}
+              end, Rules).
+
+rewrite_subscribe_test() ->
+    Rules = rules(),
+    io:format("Rules: ~p",[Rules]),
+    ?assertEqual({ok, [{<<"test">>, opts}]},
+                 emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)),
+    ?assertEqual({ok, [{<<"z/y/test">>, opts}]},
+                 emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)),
+    ?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]},
+                 emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)).
+
+rewrite_unsubscribe_test() ->
+    Rules = rules(),
+    ?assertEqual({ok, [{<<"test">>, opts}]},
+                 emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"test">>, opts}], Rules)),
+    ?assertEqual({ok, [{<<"z/y/test">>, opts}]},
+                 emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"x/y/test">>, opts}], Rules)),
+    ?assertEqual({ok, [{<<"y/z/test_topic">>, opts}]},
+                 emqx_mod_rewrite:rewrite_subscribe(credentials, [{<<"y/test/z/test_topic">>, opts}], Rules)).
+
+rewrite_publish_test() ->
+    Rules = rules(),
+    ?assertMatch({ok, #message{topic = <<"test">>}},
+                 emqx_mod_rewrite:rewrite_publish(#message{topic = <<"test">>}, Rules)),
+    ?assertMatch({ok, #message{topic = <<"z/y/test">>}},
+                 emqx_mod_rewrite:rewrite_publish(#message{topic = <<"x/y/test">>}, Rules)),
+    ?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}},
+                 emqx_mod_rewrite:rewrite_publish(#message{topic = <<"y/test/z/test_topic">>}, Rules)).

+ 28 - 0
test/emqx_mod_sup_SUITE.erl

@@ -0,0 +1,28 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_mod_sup_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include("emqx.hrl").
+
+all() -> [t_child_all].
+
+t_child_all(_) -> 
+    {ok, _Pid} = emqx_mod_sup:start_link(),
+    {ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker),
+    timer:sleep(10),
+    ok = emqx_mod_sup:stop_child(emqx_banned).

+ 55 - 1
test/emqx_pqueue_SUITE.erl

@@ -22,7 +22,7 @@
 
 -define(PQ, emqx_pqueue).
 
-all() -> [t_priority_queue_plen, t_priority_queue_out2].
+all() -> [t_priority_queue_plen, t_priority_queue_out2, t_priority_queues].
 
 t_priority_queue_plen(_) ->
     Q = ?PQ:new(),
@@ -67,3 +67,57 @@ t_priority_queue_out2(_) ->
     {Val5, Q6} = ?PQ:out(Q5),
     {value, a} = Val5,
     {empty, _Q7} = ?PQ:out(Q6).
+
+t_priority_queues(_) ->
+    Q0 = ?PQ:new(),
+    Q1 = ?PQ:new(),
+    PQueue = {pqueue, [{0, Q0}, {1, Q1}]},
+    ?assert(?PQ:is_queue(PQueue)),
+    [] = ?PQ:to_list(PQueue),
+
+    PQueue1 = ?PQ:in(a, 0, ?PQ:new()),
+    PQueue2 = ?PQ:in(b, 0, PQueue1),
+
+    PQueue3 = ?PQ:in(c, 1, PQueue2),
+    PQueue4 = ?PQ:in(d, 1, PQueue3),
+
+    4 = ?PQ:len(PQueue4),
+
+    [{1, c}, {1, d}, {0, a}, {0, b}] = ?PQ:to_list(PQueue4),
+    PQueue4 = ?PQ:from_list([{1, c}, {1, d}, {0, a}, {0, b}]),
+    
+    empty = ?PQ:highest(?PQ:new()),
+    0 = ?PQ:highest(PQueue1),
+    1 = ?PQ:highest(PQueue4),
+
+    PQueue5 = ?PQ:in(e, infinity, PQueue4),
+    PQueue6 = ?PQ:in(f, 1, PQueue5),
+
+    {{value, e}, PQueue7} = ?PQ:out(PQueue6),
+    {empty, _} = ?PQ:out(0, ?PQ:new()), 
+
+    {empty, Q0} = ?PQ:out_p(Q0),
+
+    Q2 = ?PQ:in(a, Q0),
+    Q3 = ?PQ:in(b, Q2),
+    Q4 = ?PQ:in(c, Q3),
+
+    {{value, a, 0}, _Q5} = ?PQ:out_p(Q4),
+
+    {{value,c,1}, PQueue8} = ?PQ:out_p(PQueue7),
+
+    Q4 = ?PQ:join(Q4, ?PQ:new()),
+    Q4 = ?PQ:join(?PQ:new(), Q4),
+
+    {queue, [a], [a], 2} = ?PQ:join(Q2, Q2),
+
+    {pqueue,[{-1,{queue,[f],[d],2}},
+             {0,{queue,[a],[a,b],3}}]} = ?PQ:join(PQueue8, Q2),
+
+    {pqueue,[{-1,{queue,[f],[d],2}},
+             {0,{queue,[b],[a,a],3}}]} = ?PQ:join(Q2, PQueue8),
+
+    {pqueue,[{-1,{queue,[f],[d,f,d],4}},
+             {0,{queue,[b],[a,b,a],4}}]} = ?PQ:join(PQueue8, PQueue8).
+
+

+ 30 - 0
test/emqx_protocol_tests.erl

@@ -0,0 +1,30 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_protocol_tests).
+
+-include_lib("eunit/include/eunit.hrl").
+
+set_property_test() ->
+    ?assertEqual(#{test => test_property}, emqx_protocol:set_property(test, test_property, undefined)),
+    TestMap = #{test => test_property},
+    ?assertEqual(#{test => test_property, test1 => test_property2},
+                 emqx_protocol:set_property(test1, test_property2, TestMap)),
+    ok.
+
+init_username_test() ->
+    ?assertEqual(<<"Peercert">>,
+                 emqx_protocol:init_username(<<"Peercert">>, [{peer_cert_as_username, crt}])),
+    ?assertEqual(undefined,
+                 emqx_protocol:init_username(undefined, [{peer_cert_as_username, undefined}])).

+ 14 - 6
test/emqx_router_SUITE.erl

@@ -29,7 +29,9 @@ all() ->
 groups() ->
     [{route, [sequence],
       [add_del_route,
-       match_routes]}].
+       match_routes,
+       has_routes,
+       router_add_del]}].
 
 init_per_suite(Config) ->
     emqx_ct_broker_helpers:run_setup_steps(),
@@ -81,6 +83,7 @@ match_routes(_) ->
 has_routes(_) ->
     From = {self(), make_ref()},
     ?R:add_route(From, <<"devices/+/messages">>, node()),
+    timer:sleep(200),
     ?assert(?R:has_routes(<<"devices/+/messages">>)).
 
 clear_tables() ->
@@ -88,28 +91,33 @@ clear_tables() ->
 
 router_add_del(_) ->
     ?R:add_route(<<"#">>),
-    ?R:add_route(<<"a/b/c">>),
+    ?R:add_route(<<"a/b/c">>, node()),
     ?R:add_route(<<"+/#">>),
     Routes = [R1, R2 | _] = [
             #route{topic = <<"#">>,     dest = node()},
             #route{topic = <<"+/#">>,   dest = node()},
             #route{topic = <<"a/b/c">>, dest = node()}],
+    timer:sleep(500),
     ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
 
+    ?R:print_routes(<<"a/b/c">>),
+
     %% Batch Add
     lists:foreach(fun(R) -> ?R:add_route(R) end, Routes),
     ?assertEqual(Routes, lists:sort(?R:match_routes(<<"a/b/c">>))),
 
     %% Del
-    ?R:del_route(<<"a/b/c">>),
-    [R1, R2] = lists:sort(?R:match(<<"a/b/c">>)),
+    ?R:del_route(<<"a/b/c">>, node()),
+    timer:sleep(500),
+    [R1, R2] = lists:sort(?R:match_routes(<<"a/b/c">>)),
     {atomic, []} = mnesia:transaction(fun emqx_trie:lookup/1, [<<"a/b/c">>]),
 
     %% Batch Del
     R3 = #route{topic = <<"#">>, dest = 'a@127.0.0.1'},
     ?R:add_route(R3),
-    ?R:del_route(R1),
+    ?R:del_route(<<"#">>),
     ?R:del_route(R2),
     ?R:del_route(R3),
-    [] = lists:sort(?R:match(<<"a/b/c">>)).
+    timer:sleep(500),
+    [] = lists:sort(?R:match_routes(<<"a/b/c">>)).