|
|
@@ -23,20 +23,6 @@
|
|
|
-include("emqx_mqtt.hrl").
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
|
|
--define(DEFAULT_CONNINFO,
|
|
|
- #{peername => {{127,0,0,1}, 3456},
|
|
|
- sockname => {{127,0,0,1}, 1883},
|
|
|
- conn_mod => emqx_connection,
|
|
|
- proto_name => <<"MQTT">>,
|
|
|
- proto_ver => ?MQTT_PROTO_V5,
|
|
|
- clean_start => true,
|
|
|
- keepalive => 30,
|
|
|
- clientid => <<"clientid">>,
|
|
|
- username => <<"username">>,
|
|
|
- conn_props => #{},
|
|
|
- receive_maximum => 100,
|
|
|
- expiry_interval => 0
|
|
|
- }).
|
|
|
|
|
|
all() -> emqx_ct:all(?MODULE).
|
|
|
|
|
|
@@ -45,40 +31,40 @@ all() -> emqx_ct:all(?MODULE).
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
init_per_suite(Config) ->
|
|
|
- Config.
|
|
|
-
|
|
|
-end_per_suite(_Config) ->
|
|
|
- ok.
|
|
|
-
|
|
|
-init_per_testcase(_TestCase, Config) ->
|
|
|
%% CM Meck
|
|
|
- ok = meck:new(emqx_cm, [passthrough, no_history]),
|
|
|
+ ok = meck:new(emqx_cm, [passthrough, no_history, no_link]),
|
|
|
%% Access Control Meck
|
|
|
- ok = meck:new(emqx_access_control, [passthrough, no_history]),
|
|
|
+ ok = meck:new(emqx_access_control, [passthrough, no_history, no_link]),
|
|
|
ok = meck:expect(emqx_access_control, authenticate,
|
|
|
fun(_) -> {ok, #{auth_result => success}} end),
|
|
|
ok = meck:expect(emqx_access_control, check_acl, fun(_, _, _) -> allow end),
|
|
|
%% Broker Meck
|
|
|
- ok = meck:new(emqx_broker, [passthrough, no_history]),
|
|
|
+ ok = meck:new(emqx_broker, [passthrough, no_history, no_link]),
|
|
|
%% Hooks Meck
|
|
|
- ok = meck:new(emqx_hooks, [passthrough, no_history]),
|
|
|
+ ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]),
|
|
|
ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end),
|
|
|
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
|
|
|
%% Session Meck
|
|
|
- ok = meck:new(emqx_session, [passthrough, no_history]),
|
|
|
+ ok = meck:new(emqx_session, [passthrough, no_history, no_link]),
|
|
|
%% Metrics
|
|
|
- ok = meck:new(emqx_metrics, [passthrough, no_history]),
|
|
|
+ ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
|
|
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
|
|
ok = meck:expect(emqx_metrics, inc, fun(_, _) -> ok end),
|
|
|
Config.
|
|
|
|
|
|
-end_per_testcase(_TestCase, Config) ->
|
|
|
+end_per_suite(_Config) ->
|
|
|
ok = meck:unload(emqx_access_control),
|
|
|
ok = meck:unload(emqx_metrics),
|
|
|
ok = meck:unload(emqx_session),
|
|
|
ok = meck:unload(emqx_broker),
|
|
|
ok = meck:unload(emqx_hooks),
|
|
|
ok = meck:unload(emqx_cm),
|
|
|
+ ok.
|
|
|
+
|
|
|
+init_per_testcase(_TestCase, Config) ->
|
|
|
+ Config.
|
|
|
+
|
|
|
+end_per_testcase(_TestCase, Config) ->
|
|
|
Config.
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -92,17 +78,15 @@ t_chan_info(_) ->
|
|
|
?assertEqual(clientinfo(), ClientInfo).
|
|
|
|
|
|
t_chan_caps(_) ->
|
|
|
- Caps = emqx_mqtt_caps:default(),
|
|
|
- ?assertEqual(Caps#{max_packet_size => 1048576},
|
|
|
- emqx_channel:caps(channel())).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Test cases for channel init
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-%% TODO:
|
|
|
-t_chan_init(_) ->
|
|
|
- _Channel = channel().
|
|
|
+ #{max_clientid_len := 65535,
|
|
|
+ max_qos_allowed := 2,
|
|
|
+ max_topic_alias := 65535,
|
|
|
+ max_topic_levels := 0,
|
|
|
+ retain_available := true,
|
|
|
+ shared_subscription := true,
|
|
|
+ subscription_identifiers := true,
|
|
|
+ wildcard_subscription := true
|
|
|
+ } = emqx_channel:caps(channel()).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test cases for channel handle_in
|
|
|
@@ -114,8 +98,8 @@ t_handle_in_connect_packet_sucess(_) ->
|
|
|
{ok, #{session => session(), present => false}}
|
|
|
end),
|
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
|
- {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
|
|
|
- = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
|
|
+ {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
|
|
+ emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
|
|
|
ClientInfo = emqx_channel:info(clientinfo, Channel),
|
|
|
?assertMatch(#{clientid := <<"clientid">>,
|
|
|
username := <<"username">>
|
|
|
@@ -125,32 +109,47 @@ t_handle_in_connect_packet_sucess(_) ->
|
|
|
t_handle_in_unexpected_connect_packet(_) ->
|
|
|
Channel = emqx_channel:set_field(conn_state, connected, channel()),
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
|
|
- {ok, [{outgoing, Packet}, {close, protocol_error}], Channel}
|
|
|
- = emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
|
|
|
+ {ok, [{outgoing, Packet}, {close, protocol_error}], Channel} =
|
|
|
+ emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
|
|
|
|
|
|
t_handle_in_qos0_publish(_) ->
|
|
|
- ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
|
|
|
+ ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
|
|
{ok, _NChannel} = emqx_channel:handle_in(Publish, Channel).
|
|
|
- % ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_in_qos1_publish(_) ->
|
|
|
- ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
|
|
|
- Channel = channel(#{conn_state => connected}),
|
|
|
+ ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
|
|
- {ok, ?PUBACK_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
- ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)).
|
|
|
- % ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} =
|
|
|
+ emqx_channel:handle_in(Publish, channel(#{conn_state => connected})).
|
|
|
|
|
|
t_handle_in_qos2_publish(_) ->
|
|
|
- ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
|
|
|
- ok = meck:expect(emqx_session, info, fun(await_rel_timeout, _Session) -> 300 end),
|
|
|
- Channel = channel(#{conn_state => connected}),
|
|
|
- Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
|
- {ok, ?PUBREC_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
- ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)).
|
|
|
- % ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, 1}] end),
|
|
|
+ Channel = channel(#{conn_state => connected, session => session()}),
|
|
|
+ Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
|
+ {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), Channel1} =
|
|
|
+ emqx_channel:handle_in(Publish1, Channel),
|
|
|
+ ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
+ Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
|
|
+ {ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel2} =
|
|
|
+ emqx_channel:handle_in(Publish2, Channel1),
|
|
|
+ ?assertEqual(2, proplists:get_value(awaiting_rel_cnt, emqx_channel:stats(Channel2))).
|
|
|
+
|
|
|
+t_handle_in_qos2_publish_with_error_return(_) ->
|
|
|
+ ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
|
|
+ ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
+ Session = session(#{max_awaiting_rel => 2, awaiting_rel => #{1 => 1}}),
|
|
|
+ Channel = channel(#{conn_state => connected, session => Session}),
|
|
|
+ Publish1 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
|
+ {ok, ?PUBREC_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel} =
|
|
|
+ emqx_channel:handle_in(Publish1, Channel),
|
|
|
+ Publish2 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 2, <<"payload">>),
|
|
|
+ {ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel1} =
|
|
|
+ emqx_channel:handle_in(Publish2, Channel),
|
|
|
+ Publish3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>),
|
|
|
+ {ok, ?PUBREC_PACKET(3, ?RC_RECEIVE_MAXIMUM_EXCEEDED), Channel1} =
|
|
|
+ emqx_channel:handle_in(Publish3, Channel1).
|
|
|
|
|
|
t_handle_in_puback_ok(_) ->
|
|
|
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
|
|
@@ -180,46 +179,38 @@ t_handle_in_pubrec_ok(_) ->
|
|
|
Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
|
|
|
ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1}
|
|
|
- = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
- % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
- % emqx_channel:info(pub_stats, Channel1)).
|
|
|
+ {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1} =
|
|
|
+ emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
|
|
|
t_handle_in_pubrec_id_in_use(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrec,
|
|
|
fun(_, _Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
|
|
end),
|
|
|
- {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel}
|
|
|
- = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
- % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
- % emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel} =
|
|
|
+ emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
|
|
|
t_handle_in_pubrec_id_not_found(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrec,
|
|
|
fun(_, _Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
|
end),
|
|
|
- {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
|
|
|
- = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
- % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
- % emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
|
|
|
+ emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
|
|
|
t_handle_in_pubrel_ok(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1}
|
|
|
- = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
- % ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1},
|
|
|
- % emqx_channel:info(pub_stats, Channel1)).
|
|
|
+ {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1} =
|
|
|
+ emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
|
|
|
t_handle_in_pubrel_not_found_error(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrel,
|
|
|
fun(_PacketId, _Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
|
end),
|
|
|
- {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
|
|
|
- = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
+ {ok, ?PUBCOMP_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel} =
|
|
|
+ emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
|
|
|
t_handle_in_pubcomp_ok(_) ->
|
|
|
ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
|
|
|
@@ -233,7 +224,6 @@ t_handle_in_pubcomp_not_found_error(_) ->
|
|
|
end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
{ok, _Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
- % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
|
|
|
|
|
|
t_handle_in_subscribe(_) ->
|
|
|
ok = meck:expect(emqx_session, subscribe,
|
|
|
@@ -250,12 +240,12 @@ t_handle_in_unsubscribe(_) ->
|
|
|
{ok, Session}
|
|
|
end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan}
|
|
|
- = emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
|
|
|
+ {ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan} =
|
|
|
+ emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
|
|
|
|
|
|
t_handle_in_pingreq(_) ->
|
|
|
- {ok, ?PACKET(?PINGRESP), _Channel}
|
|
|
- = emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
|
|
|
+ {ok, ?PACKET(?PINGRESP), _Channel} =
|
|
|
+ emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
|
|
|
|
|
|
t_handle_in_disconnect(_) ->
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
|
|
|
@@ -266,38 +256,37 @@ t_handle_in_disconnect(_) ->
|
|
|
t_handle_in_auth(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
|
|
|
- {ok, [{outgoing, Packet},
|
|
|
- {close, implementation_specific_error}], Channel}
|
|
|
- = emqx_channel:handle_in(?AUTH_PACKET(), Channel).
|
|
|
+ {ok, [{outgoing, Packet}, {close, implementation_specific_error}], Channel} =
|
|
|
+ emqx_channel:handle_in(?AUTH_PACKET(), Channel).
|
|
|
|
|
|
t_handle_in_frame_error(_) ->
|
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
|
- {shutdown, frame_too_large, _}
|
|
|
- = emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
|
|
|
+ {shutdown, frame_too_large, _Chan} =
|
|
|
+ emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
|
|
|
ConnectingChan = channel(#{conn_state => connecting}),
|
|
|
- ConnackPacket = ?CONNACK_PACKET(?RC_MALFORMED_PACKET),
|
|
|
- {shutdown, frame_too_large, ConnackPacket, _}
|
|
|
- = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
|
|
|
- DisconnectPacket = ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET),
|
|
|
+ ConnackPacket = ?CONNACK_PACKET(?RC_PACKET_TOO_LARGE),
|
|
|
+ {shutdown, frame_too_large, ConnackPacket, _} =
|
|
|
+ emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
|
|
|
+ DisconnectPacket = ?DISCONNECT_PACKET(?RC_PACKET_TOO_LARGE),
|
|
|
ConnectedChan = channel(#{conn_state => connected}),
|
|
|
- {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _}
|
|
|
- = emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
|
|
|
+ {ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _} =
|
|
|
+ emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
|
|
|
DisconnectedChan = channel(#{conn_state => disconnected}),
|
|
|
- {ok, DisconnectedChan}
|
|
|
- = emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
|
|
|
+ {ok, DisconnectedChan} =
|
|
|
+ emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
|
|
|
|
|
|
t_handle_in_expected_packet(_) ->
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
|
|
|
- {ok, [{outgoing, Packet}, {close, protocol_error}], _Chan}
|
|
|
- = emqx_channel:handle_in(packet, channel()).
|
|
|
+ {ok, [{outgoing, Packet}, {close, protocol_error}], _Chan} =
|
|
|
+ emqx_channel:handle_in(packet, channel()).
|
|
|
|
|
|
t_process_connect(_) ->
|
|
|
ok = meck:expect(emqx_cm, open_session,
|
|
|
fun(true, _ClientInfo, _ConnInfo) ->
|
|
|
{ok, #{session => session(), present => false}}
|
|
|
end),
|
|
|
- {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan}
|
|
|
- = emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
|
|
|
+ {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan} =
|
|
|
+ emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
|
|
|
|
|
|
t_process_publish_qos0(_) ->
|
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
@@ -307,8 +296,8 @@ t_process_publish_qos0(_) ->
|
|
|
t_process_publish_qos1(_) ->
|
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
|
|
|
- {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel}
|
|
|
- = emqx_channel:process_publish(Publish, channel()).
|
|
|
+ {ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel} =
|
|
|
+ emqx_channel:process_publish(Publish, channel()).
|
|
|
|
|
|
t_process_subscribe(_) ->
|
|
|
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
|
@@ -325,15 +314,6 @@ t_process_unsubscribe(_) ->
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
t_handle_deliver(_) ->
|
|
|
- WithPacketId = fun(Msgs) ->
|
|
|
- lists:zip(lists:seq(1, length(Msgs)), Msgs)
|
|
|
- end,
|
|
|
- ok = meck:expect(emqx_session, deliver,
|
|
|
- fun(Delivers, Session) ->
|
|
|
- Publishes = WithPacketId([Msg || {deliver, _, Msg} <- Delivers]),
|
|
|
- {ok, Publishes, Session}
|
|
|
- end),
|
|
|
- ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20 end),
|
|
|
Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
|
|
|
Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
|
|
|
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
|
|
|
@@ -348,14 +328,14 @@ t_handle_out_publish(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Pub0 = {undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
|
|
|
Pub1 = {1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
|
|
|
- {ok, {outgoing, Packets}, _NChannel}
|
|
|
- = emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
|
|
|
+ {ok, {outgoing, Packets}, _NChannel} =
|
|
|
+ emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
|
|
|
?assertEqual(2, length(Packets)).
|
|
|
|
|
|
t_handle_out_publish_1(_) ->
|
|
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
|
|
|
- {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan}
|
|
|
- = emqx_channel:handle_out(publish, [{1, Msg}], channel()).
|
|
|
+ {ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan} =
|
|
|
+ emqx_channel:handle_out(publish, [{1, Msg}], channel()).
|
|
|
|
|
|
t_handle_out_publish_nl(_) ->
|
|
|
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
|
|
|
@@ -365,50 +345,47 @@ t_handle_out_publish_nl(_) ->
|
|
|
{ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
|
|
|
|
|
|
t_handle_out_connack_sucess(_) ->
|
|
|
- {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
|
|
|
- = emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
|
|
|
+ {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel} =
|
|
|
+ emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
|
|
|
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
|
|
|
|
|
|
t_handle_out_connack_failure(_) ->
|
|
|
- {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
|
|
|
- = emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
|
|
|
+ {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} =
|
|
|
+ emqx_channel:handle_out(connack, {?RC_NOT_AUTHORIZED, connpkt()}, channel()).
|
|
|
|
|
|
t_handle_out_puback(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel}
|
|
|
- = emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel).
|
|
|
- % ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel} =
|
|
|
+ emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel).
|
|
|
|
|
|
t_handle_out_pubrec(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel}
|
|
|
- = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
|
|
|
+ {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel} =
|
|
|
+ emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
|
|
|
|
|
|
t_handle_out_pubrel(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBREL_PACKET(1), Channel1}
|
|
|
- = emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
|
|
|
- {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2}
|
|
|
- = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
|
|
|
+ {ok, ?PUBREL_PACKET(1), Channel1} =
|
|
|
+ emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
|
|
|
+ {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2} =
|
|
|
+ emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
|
|
|
|
|
|
t_handle_out_pubcomp(_) ->
|
|
|
- {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel}
|
|
|
- = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
|
|
|
+ {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel} =
|
|
|
+ emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
|
|
|
|
|
|
t_handle_out_suback(_) ->
|
|
|
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_2])}, {event, updated}],
|
|
|
- {ok, Replies, _Channel}
|
|
|
- = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
|
|
|
+ {ok, Replies, _Chan} = emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
|
|
|
|
|
|
t_handle_out_unsuback(_) ->
|
|
|
Replies = [{outgoing, ?UNSUBACK_PACKET(1, [?RC_SUCCESS])}, {event, updated}],
|
|
|
- {ok, Replies, _Channel}
|
|
|
- = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
|
|
|
+ {ok, Replies, _Chan} = emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
|
|
|
|
|
|
t_handle_out_disconnect(_) ->
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
|
|
|
- {ok, [{outgoing, Packet}, {close, normal}], _Chan}
|
|
|
- = emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
|
|
|
+ {ok, [{outgoing, Packet}, {close, normal}], _Chan} =
|
|
|
+ emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
|
|
|
|
|
|
t_handle_out_unexpected(_) ->
|
|
|
{ok, _Channel} = emqx_channel:handle_out(unexpected, <<"data">>, channel()).
|
|
|
@@ -422,20 +399,19 @@ t_handle_call_kick(_) ->
|
|
|
|
|
|
t_handle_call_discard(_) ->
|
|
|
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
|
|
- {shutdown, discarded, ok, Packet, _Channel}
|
|
|
- = emqx_channel:handle_call(discard, channel()).
|
|
|
+ {shutdown, discarded, ok, Packet, _Channel} =
|
|
|
+ emqx_channel:handle_call(discard, channel()).
|
|
|
|
|
|
t_handle_call_takeover_begin(_) ->
|
|
|
- {reply, undefined, _Channel}
|
|
|
- = emqx_channel:handle_call({takeover, 'begin'}, channel()).
|
|
|
+ {reply, _Session, _Chan} = emqx_channel:handle_call({takeover, 'begin'}, channel()).
|
|
|
|
|
|
t_handle_call_takeover_end(_) ->
|
|
|
ok = meck:expect(emqx_session, takeover, fun(_) -> ok end),
|
|
|
- {shutdown, takeovered, [], _Channel}
|
|
|
- = emqx_channel:handle_call({takeover, 'end'}, channel()).
|
|
|
+ {shutdown, takeovered, [], _Chan} =
|
|
|
+ emqx_channel:handle_call({takeover, 'end'}, channel()).
|
|
|
|
|
|
t_handle_call_unexpected(_) ->
|
|
|
- {reply, ignored, _Channel} = emqx_channel:handle_call(unexpected_req, channel()).
|
|
|
+ {reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test cases for handle_info
|
|
|
@@ -507,8 +483,8 @@ t_auth_connect(_) ->
|
|
|
t_process_alias(_) ->
|
|
|
Publish = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}},
|
|
|
Channel = emqx_channel:set_field(topic_aliases, #{1 => <<"t">>}, channel()),
|
|
|
- {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan}
|
|
|
- = emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
|
|
|
+ {ok, #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"t">>}}, _Chan} =
|
|
|
+ emqx_channel:process_alias(#mqtt_packet{variable = Publish}, Channel).
|
|
|
|
|
|
t_check_pub_acl(_) ->
|
|
|
ok = meck:new(emqx_zone, [passthrough, no_history]),
|
|
|
@@ -566,14 +542,27 @@ t_terminate(_) ->
|
|
|
|
|
|
channel() -> channel(#{}).
|
|
|
channel(InitFields) ->
|
|
|
+ ConnInfo = #{peername => {{127,0,0,1}, 3456},
|
|
|
+ sockname => {{127,0,0,1}, 1883},
|
|
|
+ conn_mod => emqx_connection,
|
|
|
+ proto_name => <<"MQTT">>,
|
|
|
+ proto_ver => ?MQTT_PROTO_V5,
|
|
|
+ clean_start => true,
|
|
|
+ keepalive => 30,
|
|
|
+ clientid => <<"clientid">>,
|
|
|
+ username => <<"username">>,
|
|
|
+ conn_props => #{},
|
|
|
+ receive_maximum => 100,
|
|
|
+ expiry_interval => 0
|
|
|
+ },
|
|
|
maps:fold(fun(Field, Value, Channel) ->
|
|
|
emqx_channel:set_field(Field, Value, Channel)
|
|
|
- end, default_channel(), InitFields).
|
|
|
-
|
|
|
-default_channel() ->
|
|
|
- Channel = emqx_channel:init(?DEFAULT_CONNINFO, [{zone, zone}]),
|
|
|
- Channel1 = emqx_channel:set_field(conn_state, connected, Channel),
|
|
|
- emqx_channel:set_field(clientinfo, clientinfo(), Channel1).
|
|
|
+ end,
|
|
|
+ emqx_channel:init(ConnInfo, [{zone, zone}]),
|
|
|
+ maps:merge(#{clientinfo => clientinfo(),
|
|
|
+ session => session(),
|
|
|
+ conn_state => connected
|
|
|
+ }, InitFields)).
|
|
|
|
|
|
clientinfo() -> clientinfo(#{}).
|
|
|
clientinfo(InitProps) ->
|
|
|
@@ -608,6 +597,6 @@ session(InitFields) when is_map(InitFields) ->
|
|
|
maps:fold(fun(Field, Value, Session) ->
|
|
|
emqx_session:set_field(Field, Value, Session)
|
|
|
end,
|
|
|
- emqx_session:init(#{zone => zone}, #{receive_maximum => 0}),
|
|
|
+ emqx_session:init(#{zone => channel}, #{receive_maximum => 0}),
|
|
|
InitFields).
|
|
|
|