Kaynağa Gözat

Add first CT test for emqx_portal based on rpc

spring2maz 7 yıl önce
ebeveyn
işleme
b9e8bde3b0

+ 7 - 3
src/portal/emqx_portal_msg.erl

@@ -32,8 +32,8 @@
 %% 1. Mount topic to a prefix
 %% 2. fix QoS to 1
 -spec to_export(msg(), undefined | binary()) -> msg().
-to_export(#{topic := Topic} = Msg, Mountpoint) ->
-    Msg#{topic := topic(Mountpoint, Topic), qos => 1}.
+to_export(#message{topic = Topic} = Msg, Mountpoint) ->
+    Msg#message{topic = topic(Mountpoint, Topic), qos = 1}.
 
 %% @doc Make `binary()' in order to make iodata to be persisted on disk.
 -spec to_binary(msg()) -> binary().
@@ -46,15 +46,19 @@ from_binary(Bin) -> binary_to_term(Bin).
 %% @doc Estimate the size of a message.
 %% Count only the topic length + payload size
 -spec estimate_size(msg()) -> integer().
-estimate_size(#{topic := Topic, payload := Payload}) ->
+estimate_size(#message{topic = Topic, payload = Payload}) ->
     size(Topic) + size(Payload).
 
 %% @doc By message/batch receiver, transform received batch into
 %% messages to dispatch to local brokers.
 to_broker_msgs(Batch) -> lists:map(fun to_broker_msg/1, Batch).
 
+to_broker_msg(#message{} = Msg) ->
+    %% internal format from another EMQX node via rpc
+    Msg;
 to_broker_msg(#{qos := QoS, dup := Dup, retain := Retain, topic := Topic,
                 properties := Props, payload := Payload}) ->
+    %% published from remote node over a MQTT connection
     emqx_message:set_headers(Props,
         emqx_message:set_flags(#{dup => Dup, retain => Retain},
             emqx_message:make(portal, QoS, Topic, Payload))).

+ 48 - 1
test/emqx_ct_helpers.erl

@@ -14,9 +14,56 @@
 
 -module(emqx_ct_helpers).
 
--export([ensure_mnesia_stopped/0]).
+-export([ensure_mnesia_stopped/0, wait_for/4]).
 
 ensure_mnesia_stopped() ->
     ekka_mnesia:ensure_stopped(),
     ekka_mnesia:delete_schema().
 
+%% Help function to wait for Fun to yeild 'true'.
+wait_for(Fn, Ln, F, Timeout) ->
+    {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
+    wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
+
+wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
+    receive
+        {'DOWN', Mref, process, Pid, normal} ->
+            ok;
+        {'DOWN', Mref, process, Pid, {unexpected, Result}} ->
+            erlang:error({unexpected, Fn, Ln, Result});
+        {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
+            erlang:raise(C, {Fn, Ln, E}, S)
+    after
+        Timeout ->
+            case Kill of
+                true ->
+                    erlang:demonitor(Mref, [flush]),
+                    erlang:exit(Pid, kill),
+                    erlang:error({Fn, Ln, timeout});
+                false ->
+                    Pid ! stop,
+                    wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
+            end
+    end.
+
+wait_loop(_F, ok) -> exit(normal);
+wait_loop(F, LastRes) ->
+    receive
+        stop -> erlang:exit(LastRes)
+    after
+        100 ->
+            Res = catch_call(F),
+            wait_loop(F, Res)
+    end.
+
+catch_call(F) ->
+    try
+        case F() of
+            true -> ok;
+            Other -> {unexpected, Other}
+        end
+    catch
+        C : E : S ->
+            {crashed, {C, E, S}}
+    end.
+

+ 73 - 0
test/emqx_portal_SUITE.erl

@@ -0,0 +1,73 @@
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_portal_SUITE).
+
+-export([all/0, init_per_suite/1, end_per_suite/1]).
+-export([t_rpc/1,
+         t_mqtt/1
+        ]).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include("emqx_mqtt.hrl").
+-include("emqx.hrl").
+
+-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
+
+all() -> [t_rpc,
+          t_mqtt
+         ].
+
+init_per_suite(Config) ->
+    case node() of
+        nonode@nohost ->
+            net_kernel:start(['emqx@127.0.0.1', longnames]);
+        _ ->
+            ok
+    end,
+    emqx_ct_broker_helpers:run_setup_steps(),
+    Config.
+
+end_per_suite(_Config) ->
+    emqx_ct_broker_helpers:run_teardown_steps().
+
+t_rpc(Config) when is_list(Config) ->
+    Cfg = #{address => node(),
+            forwards => [<<"t_rpc/#">>],
+            connect_module => emqx_portal_rpc,
+            mountpoint => <<"forwarded">>
+           },
+    {ok, Pid} = emqx_portal:start_link(?FUNCTION_NAME, Cfg),
+    ClientId = <<"ClientId">>,
+    try
+        {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
+        {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
+        %% message from a different client, to avoid getting terminated by no-local
+        Msg1 = emqx_message:make(<<"ClientId-2">>, ?QOS_2, <<"t_rpc/one">>, <<"hello">>),
+        ok = emqx_session:subscribe(SPid, [{<<"forwarded/t_rpc/one">>, #{qos => ?QOS_1}}]),
+        PacketId = 1,
+        emqx_session:publish(SPid, PacketId, Msg1),
+        ?wait(case emqx_mock_client:get_last_message(ConnPid) of
+                  {publish, PacketId, #message{topic = <<"forwarded/t_rpc/one">>}} -> true;
+                  Other -> Other
+              end, 4000),
+        emqx_mock_client:close_session(ConnPid)
+    after
+        ok = emqx_portal:stop(Pid)
+    end.
+
+t_mqtt(Config) when is_list(Config) -> ok.
+
+

+ 1 - 47
test/emqx_shared_sub_SUITE.erl

@@ -29,7 +29,7 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
--define(wait(For, Timeout), wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
+-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
 
 all() -> [t_random_basic,
           t_random,
@@ -259,49 +259,3 @@ ensure_config(Strategy, AckEnabled) ->
 subscribed(Group, Topic, Pid) ->
     lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
 
-wait_for(Fn, Ln, F, Timeout) ->
-    {Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
-    wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
-
-wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
-    receive
-        {'DOWN', Mref, process, Pid, normal} ->
-            ok;
-        {'DOWN', Mref, process, Pid, {unexpected, Result}} ->
-            erlang:error({unexpected, Fn, Ln, Result});
-        {'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
-            erlang:raise(C, {Fn, Ln, E}, S)
-    after
-        Timeout ->
-            case Kill of
-                true ->
-                    erlang:demonitor(Mref, [flush]),
-                    erlang:exit(Pid, kill),
-                    erlang:error({Fn, Ln, timeout});
-                false ->
-                    Pid ! stop,
-                    wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
-            end
-    end.
-
-wait_loop(_F, ok) -> exit(normal);
-wait_loop(F, LastRes) ->
-    receive
-        stop -> erlang:exit(LastRes)
-    after
-        100 ->
-            Res = catch_call(F),
-            wait_loop(F, Res)
-    end.
-
-catch_call(F) ->
-    try
-        case F() of
-            true -> ok;
-            Other -> {unexpected, Other}
-        end
-    catch
-        C : E : S ->
-            {crashed, {C, E, S}}
-    end.
-