|
|
@@ -133,7 +133,7 @@ t_handle_in_connect_auth_failed(_) ->
|
|
|
clientid = <<"clientid">>,
|
|
|
username = <<"username">>
|
|
|
},
|
|
|
- {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} =
|
|
|
+ {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} =
|
|
|
emqx_channel:handle_in(?CONNECT_PACKET(ConnPkt), channel(#{conn_state => idle})).
|
|
|
|
|
|
t_handle_in_continue_auth(_) ->
|
|
|
@@ -144,19 +144,33 @@ t_handle_in_continue_auth(_) ->
|
|
|
{shutdown, bad_authentication_method, ?CONNACK_PACKET(?RC_BAD_AUTHENTICATION_METHOD), _} =
|
|
|
emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel()),
|
|
|
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} =
|
|
|
- emqx_channel:handle_in(?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})).
|
|
|
+ emqx_channel:handle_in(
|
|
|
+ ?AUTH_PACKET(?RC_CONTINUE_AUTHENTICATION,Properties),
|
|
|
+ channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})
|
|
|
+ ).
|
|
|
|
|
|
t_handle_in_re_auth(_) ->
|
|
|
Properties = #{
|
|
|
'Authentication-Method' => <<"failed_auth_method">>,
|
|
|
'Authentication-Data' => <<"failed_auth_data">>
|
|
|
},
|
|
|
- {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} =
|
|
|
- emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel()),
|
|
|
- {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} =
|
|
|
- emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => undefined}})),
|
|
|
+ {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)},
|
|
|
+ {close, bad_authentication_method}], _} =
|
|
|
+ emqx_channel:handle_in(
|
|
|
+ ?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties),
|
|
|
+ channel()
|
|
|
+ ),
|
|
|
+ {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)},
|
|
|
+ {close, bad_authentication_method}], _} =
|
|
|
+ emqx_channel:handle_in(
|
|
|
+ ?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties),
|
|
|
+ channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => undefined}})
|
|
|
+ ),
|
|
|
{ok, [{outgoing, ?DISCONNECT_PACKET(?RC_NOT_AUTHORIZED)}, {close, not_authorized}], _} =
|
|
|
- emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})).
|
|
|
+ emqx_channel:handle_in(
|
|
|
+ ?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties),
|
|
|
+ channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})
|
|
|
+ ).
|
|
|
|
|
|
t_handle_in_qos0_publish(_) ->
|
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
|
|
|
@@ -348,7 +362,8 @@ t_process_publish_qos1(_) ->
|
|
|
t_process_subscribe(_) ->
|
|
|
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
|
TopicFilters = [ TopicFilter = {<<"+">>, ?DEFAULT_SUBOPTS}],
|
|
|
- {[{TopicFilter, ?RC_SUCCESS}], _Channel} = emqx_channel:process_subscribe(TopicFilters, #{}, channel()).
|
|
|
+ {[{TopicFilter, ?RC_SUCCESS}], _Channel} =
|
|
|
+ emqx_channel:process_subscribe(TopicFilters, #{}, channel()).
|
|
|
|
|
|
t_process_unsubscribe(_) ->
|
|
|
ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end),
|
|
|
@@ -358,8 +373,16 @@ t_process_unsubscribe(_) ->
|
|
|
t_quota_qos0(_) ->
|
|
|
esockd_limiter:start_link(), Cnter = counters:new(1, []),
|
|
|
ok = meck:expect(emqx_broker, publish, fun(_) -> [{node(), <<"topic">>, {ok, 4}}] end),
|
|
|
- ok = meck:expect(emqx_metrics, inc, fun('packets.publish.dropped') -> counters:add(Cnter, 1, 1) end),
|
|
|
- ok = meck:expect(emqx_metrics, val, fun('packets.publish.dropped') -> counters:get(Cnter, 1) end),
|
|
|
+ ok = meck:expect(
|
|
|
+ emqx_metrics,
|
|
|
+ inc,
|
|
|
+ fun('packets.publish.dropped') -> counters:add(Cnter, 1, 1) end
|
|
|
+ ),
|
|
|
+ ok = meck:expect(
|
|
|
+ emqx_metrics,
|
|
|
+ val,
|
|
|
+ fun('packets.publish.dropped') -> counters:get(Cnter, 1) end
|
|
|
+ ),
|
|
|
Chann = channel(#{conn_state => connected, quota => quota()}),
|
|
|
Pub = ?PUBLISH_PACKET(?QOS_0, <<"topic">>, undefined, <<"payload">>),
|
|
|
|
|
|
@@ -454,8 +477,12 @@ t_handle_out_connack_response_information(_) ->
|
|
|
end),
|
|
|
ok = meck:expect(emqx_zone, response_information, fun(_) -> test end),
|
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
|
- {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], _} =
|
|
|
- emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 1})), IdleChannel).
|
|
|
+ {ok, [{event, connected},
|
|
|
+ {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}],
|
|
|
+ _} = emqx_channel:handle_in(
|
|
|
+ ?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 1})),
|
|
|
+ IdleChannel
|
|
|
+ ).
|
|
|
|
|
|
t_handle_out_connack_not_response_information(_) ->
|
|
|
ok = meck:expect(emqx_cm, open_session,
|
|
|
@@ -465,7 +492,10 @@ t_handle_out_connack_not_response_information(_) ->
|
|
|
ok = meck:expect(emqx_zone, response_information, fun(_) -> test end),
|
|
|
IdleChannel = channel(#{conn_state => idle}),
|
|
|
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} =
|
|
|
- emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 0})), IdleChannel),
|
|
|
+ emqx_channel:handle_in(
|
|
|
+ ?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 0})),
|
|
|
+ IdleChannel
|
|
|
+ ),
|
|
|
?assertEqual(false, maps:is_key('Response-Information', AckProps)).
|
|
|
|
|
|
t_handle_out_connack_failure(_) ->
|
|
|
@@ -530,7 +560,10 @@ t_handle_call_takeover_end(_) ->
|
|
|
emqx_channel:handle_call({takeover, 'end'}, channel()).
|
|
|
|
|
|
t_handle_call_quota(_) ->
|
|
|
- {reply, ok, _Chan} = emqx_channel:handle_call({quota, [{conn_messages_routing, {100,1}}]}, channel()).
|
|
|
+ {reply, ok, _Chan} = emqx_channel:handle_call(
|
|
|
+ {quota, [{conn_messages_routing, {100,1}}]},
|
|
|
+ channel()
|
|
|
+ ).
|
|
|
|
|
|
t_handle_call_unexpected(_) ->
|
|
|
{reply, ignored, _Chan} = emqx_channel:handle_call(unexpected_req, channel()).
|
|
|
@@ -634,9 +667,15 @@ t_packing_alias(_) ->
|
|
|
}}}, RePacket2),
|
|
|
|
|
|
{RePacket3, _} = emqx_channel:packing_alias(Packet2, NChannel2),
|
|
|
- ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = #{}}}, RePacket3),
|
|
|
-
|
|
|
- ?assertMatch({#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, _}, emqx_channel:packing_alias(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, channel())).
|
|
|
+ ?assertEqual(
|
|
|
+ #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = #{}}},
|
|
|
+ RePacket3
|
|
|
+ ),
|
|
|
+
|
|
|
+ ?assertMatch({#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, _},
|
|
|
+ emqx_channel:packing_alias(
|
|
|
+ #mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}},
|
|
|
+ channel())).
|
|
|
|
|
|
t_check_pub_acl(_) ->
|
|
|
ok = meck:new(emqx_zone, [passthrough, no_history]),
|