|
|
@@ -94,18 +94,9 @@ t_chan_info(_) ->
|
|
|
t_chan_attrs(_) ->
|
|
|
#{conn_state := connected} = emqx_channel:attrs(channel()).
|
|
|
|
|
|
-t_chan_stats(_) ->
|
|
|
- [] = emqx_channel:stats(channel()).
|
|
|
-
|
|
|
t_chan_caps(_) ->
|
|
|
Caps = emqx_channel:caps(channel()).
|
|
|
|
|
|
-t_chan_recvd(_) ->
|
|
|
- _Channel = emqx_channel:recvd(10, channel()).
|
|
|
-
|
|
|
-t_chan_sent(_) ->
|
|
|
- _Channel = emqx_channel:sent(10, channel()).
|
|
|
-
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Test cases for channel init
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -140,86 +131,86 @@ t_handle_in_qos0_publish(_) ->
|
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Publish = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
|
|
- {ok, NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
- ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, _NChannel} = emqx_channel:handle_in(Publish, Channel).
|
|
|
+ % ?assertEqual(#{publish_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_in_qos1_publish(_) ->
|
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> ok end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Publish = ?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, <<"payload">>),
|
|
|
- {ok, ?PUBACK_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
- ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
|
|
|
- ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, ?PUBACK_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
+ ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)).
|
|
|
+ % ?assertEqual(#{publish_in => 1, puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_in_qos2_publish(_) ->
|
|
|
ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
|
|
|
ok = meck:expect(emqx_session, info, fun(awaiting_rel_timeout, _Session) -> 300000 end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
|
|
|
- {ok, ?PUBREC_PACKET(1, RC), NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
- ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)),
|
|
|
- ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, ?PUBREC_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
|
|
|
+ ?assert((RC == ?RC_SUCCESS) orelse (RC == ?RC_NO_MATCHING_SUBSCRIBERS)).
|
|
|
+ % ?assertEqual(#{publish_in => 1, pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_in_puback_ok(_) ->
|
|
|
Msg = emqx_message:make(<<"t">>, <<"payload">>),
|
|
|
ok = meck:expect(emqx_session, puback,
|
|
|
fun(PacketId, Session) -> {ok, Msg, Session} end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel),
|
|
|
- ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
+ % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_in_puback_id_in_use(_) ->
|
|
|
ok = meck:expect(emqx_session, puback,
|
|
|
fun(_, _Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
|
|
end),
|
|
|
- {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()),
|
|
|
- ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
+ % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
|
|
t_handle_in_puback_id_not_found(_) ->
|
|
|
ok = meck:expect(emqx_session, puback,
|
|
|
fun(_, _Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
|
end),
|
|
|
- {ok, Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()),
|
|
|
- ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, _Channel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
+ % ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
|
|
t_handle_in_pubrec_ok(_) ->
|
|
|
Msg = emqx_message:make(test,?QOS_2, <<"t">>, <<"payload">>),
|
|
|
ok = meck:expect(emqx_session, pubrec, fun(_, Session) -> {ok, Msg, Session} end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), Channel1}
|
|
|
- = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel),
|
|
|
- ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
- emqx_channel:info(pub_stats, Channel1)).
|
|
|
+ {ok, ?PUBREL_PACKET(1, ?RC_SUCCESS), _Channel1}
|
|
|
+ = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
+ % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
+ % emqx_channel:info(pub_stats, Channel1)).
|
|
|
|
|
|
t_handle_in_pubrec_id_in_use(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrec,
|
|
|
fun(_, Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
|
|
|
end),
|
|
|
- {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), Channel}
|
|
|
- = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()),
|
|
|
- ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
- emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel}
|
|
|
+ = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
+ % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
+ % emqx_channel:info(pub_stats, Channel)).
|
|
|
|
|
|
t_handle_in_pubrec_id_not_found(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrec,
|
|
|
fun(_, Session) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
|
end),
|
|
|
- {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), Channel}
|
|
|
- = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()),
|
|
|
- ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
- emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
|
|
|
+ = emqx_channel:handle_in(?PUBREC_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
+ % ?assertEqual(#{pubrec_in => 1, pubrel_out => 1},
|
|
|
+ % emqx_channel:info(pub_stats, Channel)).
|
|
|
|
|
|
t_handle_in_pubrel_ok(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrel, fun(_, Session) -> {ok, Session} end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel1}
|
|
|
- = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel),
|
|
|
- ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1},
|
|
|
- emqx_channel:info(pub_stats, Channel1)).
|
|
|
+ {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel1}
|
|
|
+ = emqx_channel:handle_in(?PUBREL_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
+ % ?assertEqual(#{pubrel_in => 1, pubcomp_out => 1},
|
|
|
+ % emqx_channel:info(pub_stats, Channel1)).
|
|
|
|
|
|
t_handle_in_pubrel_not_found_error(_) ->
|
|
|
ok = meck:expect(emqx_session, pubrel,
|
|
|
@@ -231,8 +222,8 @@ t_handle_in_pubrel_not_found_error(_) ->
|
|
|
|
|
|
t_handle_in_pubcomp_ok(_) ->
|
|
|
ok = meck:expect(emqx_session, pubcomp, fun(_, Session) -> {ok, Session} end),
|
|
|
- {ok, Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()),
|
|
|
- ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, _Channel} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), channel()).
|
|
|
+ % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
|
|
t_handle_in_pubcomp_not_found_error(_) ->
|
|
|
ok = meck:expect(emqx_session, pubcomp,
|
|
|
@@ -240,8 +231,8 @@ t_handle_in_pubcomp_not_found_error(_) ->
|
|
|
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
|
|
|
end),
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel),
|
|
|
- ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
|
|
|
+ {ok, _Channel1} = emqx_channel:handle_in(?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel).
|
|
|
+ % ?assertEqual(#{pubcomp_in => 1}, emqx_channel:info(pub_stats, Channel1)).
|
|
|
|
|
|
t_handle_in_subscribe(_) ->
|
|
|
ok = meck:expect(emqx_session, subscribe,
|
|
|
@@ -351,10 +342,10 @@ t_handle_out_publishes(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
|
|
|
Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
|
|
|
- {ok, {outgoing, Packets}, NChannel}
|
|
|
+ {ok, {outgoing, Packets}, _NChannel}
|
|
|
= emqx_channel:handle_out({publish, [Pub0, Pub1]}, Channel),
|
|
|
- ?assertEqual(2, length(Packets)),
|
|
|
- ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ ?assertEqual(2, length(Packets)).
|
|
|
+ % ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_out_publish(_) ->
|
|
|
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
|
|
|
@@ -378,28 +369,28 @@ t_handle_out_connack_failure(_) ->
|
|
|
|
|
|
t_handle_out_puback(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), NChannel}
|
|
|
- = emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel),
|
|
|
- ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, ?PUBACK_PACKET(1, ?RC_SUCCESS), _NChannel}
|
|
|
+ = emqx_channel:handle_out(puback, {1, ?RC_SUCCESS}, Channel).
|
|
|
+ % ?assertEqual(#{puback_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_out_pubrec(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
- {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), NChannel}
|
|
|
- = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel),
|
|
|
- ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
+ {ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel}
|
|
|
+ = emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
|
|
|
+ % ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
|
|
|
|
|
|
t_handle_out_pubrel(_) ->
|
|
|
Channel = channel(#{conn_state => connected}),
|
|
|
{ok, ?PUBREL_PACKET(1), Channel1}
|
|
|
= emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
|
|
|
- {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), Channel2}
|
|
|
- = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1),
|
|
|
- ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
|
|
|
+ {ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2}
|
|
|
+ = emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
|
|
|
+ % ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
|
|
|
|
|
|
t_handle_out_pubcomp(_) ->
|
|
|
- {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), Channel}
|
|
|
- = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()),
|
|
|
- ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
+ {ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel}
|
|
|
+ = emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
|
|
|
+ % ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
|
|
|
|
|
|
t_handle_out_suback(_) ->
|
|
|
{ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel}
|