Kaynağa Gözat

Import emqtt library into emqx for test cases

GilbertWong 6 yıl önce
ebeveyn
işleme
4e0aeee93d

+ 2 - 1
rebar.config

@@ -31,7 +31,8 @@
         [{deps,
             [{meck, "0.8.13"}, % hex
              {bbmustache, "1.7.0"}, % hex
-             {emqx_ct_helpers, "1.1.3"} % hex
+             {emqx_ct_helpers, "1.1.3"}, % hex
+             {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "v1.0.1"}}}
             ]}
         ]}
     ]}.

+ 0 - 97
test/emqx_request_handler.erl

@@ -1,97 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc This module implements a request handler based on emqx_client.
-%% A request handler is a MQTT client which subscribes to a request topic,
-%% processes the requests then send response to another topic which is
-%% subscribed by the request sender.
-%% This code is in test directory because request and response are pure
-%% client-side behaviours.
-
--module(emqx_request_handler).
-
--export([start_link/4, stop/1]).
-
--type qos() :: emqx_mqtt_types:qos_name() | emqx_mqtt_types:qos().
--type topic() :: emqx_topic:topic().
--type handler() :: fun((CorrData :: binary(), ReqPayload :: binary()) -> RspPayload :: binary()).
-
--spec start_link(topic(), qos(), handler(), emqx_client:options()) ->
-        {ok, pid()} | {error, any()}.
-start_link(RequestTopic, QoS, RequestHandler, Options0) ->
-    Parent = self(),
-    MsgHandler = make_msg_handler(RequestHandler, Parent),
-    Options = [{msg_handler, MsgHandler} | Options0],
-    case emqx_client:start_link(Options) of
-        {ok, Pid} ->
-            {ok, _} = emqx_client:connect(Pid),
-            try subscribe(Pid, RequestTopic, QoS) of
-                ok -> {ok, Pid};
-                {error, _} = Error -> Error
-            catch
-                C : E : S ->
-                    emqx_client:stop(Pid),
-                    {error, {C, E, S}}
-            end;
-        {error, _} = Error -> Error
-    end.
-
-stop(Pid) ->
-    emqx_client:disconnect(Pid).
-
-make_msg_handler(RequestHandler, Parent) ->
-    #{publish => fun(Msg) -> handle_msg(Msg, RequestHandler, Parent) end,
-      puback => fun(_Ack) -> ok end,
-      disconnected => fun(_Reason) -> ok end
-     }.
-
-handle_msg(ReqMsg, RequestHandler, Parent) ->
-    #{qos := QoS, properties := Props, payload := ReqPayload} = ReqMsg,
-    case maps:find('Response-Topic', Props) of
-        {ok, RspTopic} when RspTopic =/= <<>> ->
-            CorrData = maps:get('Correlation-Data', Props),
-            RspProps = maps:without(['Response-Topic'], Props),
-            RspPayload = RequestHandler(CorrData, ReqPayload),
-            emqx_logger:debug("~p sending response msg to topic ~s with~n"
-                              "corr-data=~p~npayload=~p",
-                              [?MODULE, RspTopic, CorrData, RspPayload]),
-            ok = send_response(RspTopic, RspProps, RspPayload, QoS);
-        _ ->
-            Parent ! {discarded, ReqPayload},
-            ok
-    end.
-
-send_response(Topic, Properties, Payload, QoS) ->
-    %% This function is evaluated by emqx_client itself.
-    %% hence delegate to another temp process for the loopback gen_statem call.
-    Client = self(),
-    _ = spawn_link(fun() ->
-                           case emqx_client:publish(Client, Topic, Properties, Payload, [{qos, QoS}]) of
-                               ok -> ok;
-                               {ok, _} -> ok;
-                               {error, Reason} -> exit({failed_to_publish_response, Reason})
-                           end
-                   end),
-    ok.
-
-subscribe(Client, Topic, QoS) ->
-    {ok, _Props, _QoS} =
-        emqx_client:subscribe(Client, [{Topic, [{rh, 2}, {rap, false},
-                                       {nl, true}, {qos, QoS}]}]),
-    ok.
-
-
-

+ 0 - 71
test/emqx_request_response_SUITE.erl

@@ -1,71 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
--module(emqx_request_response_SUITE).
-
--compile(export_all).
--compile(nowarn_export_all).
-
--include("emqx_mqtt.hrl").
--include_lib("eunit/include/eunit.hrl").
--include_lib("common_test/include/ct.hrl").
-
-init_per_suite(Config) ->
-    emqx_ct_helpers:start_apps([]),
-    Config.
-
-end_per_suite(_Config) ->
-    emqx_ct_helpers:stop_apps([]).
-
-all() ->
-    [request_response].
-
-request_response(_Config) ->
-    request_response_per_qos(?QOS_0),
-    request_response_per_qos(?QOS_1),
-    request_response_per_qos(?QOS_2).
-
-request_response_per_qos(QoS) ->
-    ReqTopic = <<"request_topic">>,
-    RspTopic = <<"response_topic">>,
-    {ok, Requester} = emqx_request_sender:start_link(RspTopic, QoS,
-                                                     [{proto_ver, v5},
-                                                      {client_id, <<"requester">>},
-                                                      {properties, #{ 'Request-Response-Information' => 1}}]),
-    %% This is a square service
-    Square = fun(_CorrData, ReqBin) ->
-                     I = b2i(ReqBin),
-                     i2b(I * I)
-              end,
-    {ok, Responser} = emqx_request_handler:start_link(ReqTopic, QoS, Square,
-                                                      [{proto_ver, v5},
-                                                       {client_id, <<"responser">>}
-                                                      ]),
-    ok = emqx_request_sender:send(Requester, ReqTopic, RspTopic, <<"corr-1">>, <<"2">>, QoS),
-    receive
-        {response, <<"corr-1">>, <<"4">>} ->
-            ok;
-        Other ->
-            erlang:error({unexpected, Other})
-    after
-        100 ->
-            erlang:error(timeout)
-    end,
-    ok = emqx_request_sender:stop(Requester),
-    ok = emqx_request_handler:stop(Responser).
-
-b2i(B) -> binary_to_integer(B).
-i2b(I) -> integer_to_binary(I).

+ 0 - 77
test/emqx_request_sender.erl

@@ -1,77 +0,0 @@
-%%--------------------------------------------------------------------
-%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%%     http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%--------------------------------------------------------------------
-
-%% @doc This module implements a request sender based on emqx_client.
-%% A request sender is a MQTT client which sends messages to a request
-%% topic, and subscribes to another topic for responses.
-%% This code is in test directory because request and response are pure
-%% client-side behaviours.
-
--module(emqx_request_sender).
-
--export([start_link/3, stop/1, send/6]).
-
-start_link(ResponseTopic, QoS, Options0) ->
-    Parent = self(),
-    MsgHandler = make_msg_handler(Parent),
-    Options = [{msg_handler, MsgHandler} | Options0],
-    case emqx_client:start_link(Options) of
-        {ok, Pid} ->
-            {ok, _} = emqx_client:connect(Pid),
-            try subscribe(Pid, ResponseTopic, QoS) of
-                ok -> {ok, Pid};
-                {error, _} = Error -> Error
-            catch
-                C : E : S ->
-                    emqx_client:stop(Pid),
-                    {error, {C, E, S}}
-            end;
-        {error, _} = Error -> Error
-    end.
-
-%% @doc Send a message to request topic with correlation-data `CorrData'.
-%% Response should be delivered as a `{response, CorrData, Payload}'
-send(Client, ReqTopic, RspTopic, CorrData, Payload, QoS) ->
-    Props = #{'Response-Topic' => RspTopic,
-              'Correlation-Data' => CorrData
-             },
-    case emqx_client:publish(Client, ReqTopic, Props, Payload, [{qos, QoS}]) of
-        ok -> ok; %% QoS = 0
-        {ok, _} -> ok;
-        {error, _} = E -> E
-    end.
-
-stop(Pid) ->
-    emqx_client:disconnect(Pid).
-
-subscribe(Client, Topic, QoS) ->
-    case emqx_client:subscribe(Client, Topic, QoS) of
-        {ok, _, _} -> ok;
-        {error, _} = Error -> Error
-    end.
-
-make_msg_handler(Parent) ->
-    #{publish => fun(Msg) -> handle_msg(Msg, Parent) end,
-      puback => fun(_Ack) -> ok end,
-      disconnected => fun(_Reason) -> ok end
-     }.
-
-handle_msg(Msg, Parent) ->
-    #{properties := Props, payload := Payload} = Msg,
-    CorrData = maps:get('Correlation-Data', Props),
-    Parent ! {response, CorrData, Payload},
-    ok.
-