| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(prop_exhook_hooks).
- -include_lib("proper/include/proper.hrl").
- -include_lib("eunit/include/eunit.hrl").
- -include_lib("emqx/include/emqx_access_control.hrl").
- -import(
- emqx_proper_types,
- [
- conninfo/0,
- clientinfo/0,
- sessioninfo/0,
- message/0,
- connack_return_code/0,
- topictab/0,
- topic/0,
- subopts/0,
- pubsub/0
- ]
- ).
- -define(CONF_DEFAULT, <<
- "\n"
- "exhook {\n"
- " servers =\n"
- " [ { name = default,\n"
- " url = \"http://127.0.0.1:9000\"\n"
- " }\n"
- " ]\n"
- "}\n"
- >>).
- -define(ALL(Vars, Types, Exprs),
- ?SETUP(
- fun() ->
- State = do_setup(),
- fun() -> do_teardown(State) end
- end,
- ?FORALL(Vars, Types, Exprs)
- )
- ).
- -define(DEFAULT_CLUSTER_NAME_ATOM, emqxcl).
- -define(DEFAULT_CLUSTER_NAME_BIN, <<"emqxcl">>).
- %%--------------------------------------------------------------------
- %% Properties
- %%--------------------------------------------------------------------
- prop_client_connect() ->
- ?ALL(
- {ConnInfo, ConnProps, Meta},
- {conninfo(), conn_properties(), request_meta()},
- begin
- ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
- {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- props => properties(ConnProps),
- conninfo => from_conninfo(ConnInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_connack() ->
- ?ALL(
- {ConnInfo, Rc, AckProps, Meta},
- {conninfo(), connack_return_code(), ack_properties(), request_meta()},
- begin
- ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
- {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- props => properties(AckProps),
- result_code => atom_to_binary(Rc, utf8),
- conninfo => from_conninfo(ConnInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_authenticate() ->
- ?ALL(
- {ClientInfo0, AuthResult, Meta},
- {clientinfo(), authresult(), request_meta()},
- begin
- ClientInfo = inject_magic_into(username, ClientInfo0),
- OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
- ExpectedAuthResult =
- case maps:get(username, ClientInfo) of
- <<"baduser">> ->
- {error, not_authorized};
- <<"gooduser">> ->
- ok;
- <<"normaluser">> ->
- ok;
- _ ->
- case AuthResult of
- ok -> ok;
- _ -> {error, not_authorized}
- end
- end,
- ?assertEqual(ExpectedAuthResult, OutAuthResult),
- {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- result => authresult_to_bool(AuthResult),
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_authorize() ->
- MkResult = fun(Result) -> #{result => Result, from => exhook} end,
- ?ALL(
- {ClientInfo0, PubSub, Topic, Result, Meta},
- {
- clientinfo(),
- pubsub(),
- topic(),
- oneof([MkResult(allow), MkResult(deny)]),
- request_meta()
- },
- begin
- ClientInfo = inject_magic_into(username, ClientInfo0),
- OutResult = emqx_hooks:run_fold(
- 'client.authorize',
- [ClientInfo, PubSub, Topic],
- Result
- ),
- ExpectedOutResult =
- case maps:get(username, ClientInfo) of
- <<"baduser">> -> MkResult(deny);
- <<"gooduser">> -> MkResult(allow);
- <<"normaluser">> -> MkResult(allow);
- _ -> Result
- end,
- ?assertEqual(ExpectedOutResult, OutResult),
- {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- result => aclresult_to_bool(Result),
- type => pubsub_to_enum(PubSub),
- topic => Topic,
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_connected() ->
- ?ALL(
- {ClientInfo, ConnInfo, Meta},
- {clientinfo(), conninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
- {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_disconnected() ->
- ?ALL(
- {ClientInfo, Reason, ConnInfo, Meta},
- {clientinfo(), shutdown_reason(), conninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
- {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- reason => stringfy(Reason),
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_subscribe() ->
- ?ALL(
- {ClientInfo, SubProps, TopicTab, Meta},
- {clientinfo(), sub_properties(), topictab(), request_meta()},
- begin
- ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
- {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- props => properties(SubProps),
- topic_filters => topicfilters(TopicTab),
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_client_unsubscribe() ->
- ?ALL(
- {ClientInfo, UnSubProps, TopicTab, Meta},
- {clientinfo(), unsub_properties(), topictab(), request_meta()},
- begin
- ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
- {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- props => properties(UnSubProps),
- topic_filters => topicfilters(TopicTab),
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_created() ->
- ?ALL(
- {ClientInfo, SessInfo, Meta},
- {clientinfo(), sessioninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
- {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_subscribed() ->
- ?ALL(
- {ClientInfo, Topic, SubOpts, Meta},
- {clientinfo(), topic(), subopts(), request_meta()},
- begin
- ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
- {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- topic => Topic,
- subopts => subopts(SubOpts),
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_unsubscribed() ->
- ?ALL(
- {ClientInfo, Topic, SubOpts, Meta},
- {clientinfo(), topic(), subopts(), request_meta()},
- begin
- ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
- {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- topic => Topic,
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_resumed() ->
- ?ALL(
- {ClientInfo, SessInfo, Meta},
- {clientinfo(), sessioninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
- {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_discared() ->
- ?ALL(
- {ClientInfo, SessInfo, Meta},
- {clientinfo(), sessioninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
- {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_takenover() ->
- ?ALL(
- {ClientInfo, SessInfo, Meta},
- {clientinfo(), sessioninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('session.takenover', [ClientInfo, SessInfo]),
- {'on_session_takenover', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{clientinfo => from_clientinfo(ClientInfo), meta => Meta},
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_session_terminated() ->
- ?ALL(
- {ClientInfo, Reason, SessInfo, Meta},
- {clientinfo(), shutdown_reason(), sessioninfo(), request_meta()},
- begin
- ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
- {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- reason => stringfy(Reason),
- clientinfo => from_clientinfo(ClientInfo),
- meta => Meta
- },
- ?assertEqual(Expected, Resp),
- true
- end
- ).
- prop_message_publish() ->
- ?ALL(
- {Msg0, Meta},
- {message(), request_meta()},
- begin
- Msg = emqx_message:from_map(
- inject_magic_into(from, emqx_message:to_map(Msg0))
- ),
- OutMsg = emqx_hooks:run_fold('message.publish', [], Msg),
- case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
- true ->
- ?assertEqual(Msg, OutMsg),
- skip;
- _ ->
- ExpectedOutMsg =
- case emqx_message:from(Msg) of
- <<"baduser">> ->
- MsgMap =
- #{headers := Headers} =
- emqx_message:to_map(Msg),
- emqx_message:from_map(
- MsgMap#{
- qos => 0,
- topic => <<"">>,
- payload => <<"">>,
- headers => maps:put(allow_publish, false, Headers)
- }
- );
- <<"gooduser">> = From ->
- MsgMap =
- #{headers := Headers} =
- emqx_message:to_map(Msg),
- emqx_message:from_map(
- MsgMap#{
- topic => From,
- payload => From,
- headers => maps:put(allow_publish, true, Headers)
- }
- );
- _ ->
- Msg
- end,
- ?assertEqual(ExpectedOutMsg, OutMsg),
- {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- message => from_message(Msg),
- meta => Meta
- },
- ?assertEqual(Expected, Resp)
- end,
- true
- end
- ).
- prop_message_dropped() ->
- ?ALL(
- {Msg, By, Reason, Meta},
- {message(), hardcoded, shutdown_reason(), request_meta()},
- begin
- ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
- case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
- true ->
- skip;
- _ ->
- {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- reason => stringfy(Reason),
- message => from_message(Msg),
- meta => Meta
- },
- ?assertEqual(Expected, Resp)
- end,
- true
- end
- ).
- prop_message_delivered() ->
- ?ALL(
- {ClientInfo, Msg, Meta},
- {clientinfo(), message(), request_meta()},
- begin
- ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
- case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
- true ->
- skip;
- _ ->
- {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- clientinfo => from_clientinfo(ClientInfo),
- message => from_message(Msg),
- meta => Meta
- },
- ?assertEqual(Expected, Resp)
- end,
- true
- end
- ).
- prop_message_acked() ->
- ?ALL(
- {ClientInfo, Msg, Meta},
- {clientinfo(), message(), request_meta()},
- begin
- ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
- case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
- true ->
- skip;
- _ ->
- {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
- Expected =
- #{
- clientinfo => from_clientinfo(ClientInfo),
- message => from_message(Msg),
- meta => Meta
- },
- ?assertEqual(Expected, Resp)
- end,
- true
- end
- ).
- nodestr() ->
- stringfy(node()).
- peerhost(#{peername := {Host, _}}) ->
- ntoa(Host).
- peerport(#{peername := {_, Port}}) ->
- Port.
- sockport(#{sockname := {_, Port}}) ->
- Port.
- %% copied from emqx_exhook
- ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
- list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
- ntoa(IP) ->
- list_to_binary(inet_parse:ntoa(IP)).
- option(undefined) -> <<>>;
- option(B) -> B.
- properties(undefined) ->
- [];
- properties(M) when is_map(M) ->
- maps:fold(
- fun(K, V, Acc) ->
- [
- #{
- name => stringfy(K),
- value => stringfy(V)
- }
- | Acc
- ]
- end,
- [],
- M
- ).
- topicfilters(Tfs) when is_list(Tfs) ->
- [
- #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
- || {Topic, SubOpts} <- Tfs
- ].
- %% @private
- stringfy(Term) when is_binary(Term) ->
- Term;
- stringfy(Term) when is_integer(Term) ->
- integer_to_binary(Term);
- stringfy(Term) when is_atom(Term) ->
- atom_to_binary(Term, utf8);
- stringfy(Term) when is_list(Term) ->
- list_to_binary(Term);
- stringfy(Term) ->
- unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
- subopts(SubOpts) ->
- #{
- qos => maps:get(qos, SubOpts, 0),
- rh => maps:get(rh, SubOpts, 0),
- rap => maps:get(rap, SubOpts, 0),
- nl => maps:get(nl, SubOpts, 0)
- }.
- authresult_to_bool(AuthResult) ->
- AuthResult == ok.
- aclresult_to_bool(#{result := Result}) ->
- Result == allow.
- pubsub_to_enum(?authz_action(publish)) -> 'PUBLISH';
- pubsub_to_enum(?authz_action(subscribe)) -> 'SUBSCRIBE'.
- from_conninfo(ConnInfo) ->
- #{
- node => nodestr(),
- clientid => maps:get(clientid, ConnInfo),
- username => option(maps:get(username, ConnInfo, <<>>)),
- peerhost => peerhost(ConnInfo),
- peerport => peerport(ConnInfo),
- sockport => sockport(ConnInfo),
- proto_name => maps:get(proto_name, ConnInfo),
- proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
- keepalive => maps:get(keepalive, ConnInfo)
- }.
- from_clientinfo(ClientInfo) ->
- #{
- node => nodestr(),
- clientid => maps:get(clientid, ClientInfo),
- username => option(maps:get(username, ClientInfo, <<>>)),
- password => option(maps:get(password, ClientInfo, <<>>)),
- peerhost => ntoa(maps:get(peerhost, ClientInfo)),
- peerport => maps:get(peerport, ClientInfo),
- sockport => maps:get(sockport, ClientInfo),
- protocol => stringfy(maps:get(protocol, ClientInfo)),
- mountpoint => option(maps:get(mountpoint, ClientInfo, <<>>)),
- is_superuser => maps:get(is_superuser, ClientInfo, false),
- anonymous => maps:get(anonymous, ClientInfo, true),
- cn => option(maps:get(cn, ClientInfo, <<>>)),
- dn => option(maps:get(dn, ClientInfo, <<>>))
- }.
- from_message(Msg) ->
- #{
- node => nodestr(),
- id => emqx_guid:to_hexstr(emqx_message:id(Msg)),
- qos => emqx_message:qos(Msg),
- from => stringfy(emqx_message:from(Msg)),
- topic => emqx_message:topic(Msg),
- payload => emqx_message:payload(Msg),
- timestamp => emqx_message:timestamp(Msg),
- headers => emqx_exhook_handler:headers(
- emqx_message:get_headers(Msg)
- )
- }.
- %%--------------------------------------------------------------------
- %% Helper
- %%--------------------------------------------------------------------
- do_setup() ->
- logger:set_primary_config(#{level => warning}),
- ok = ekka:start(),
- application:set_env(ekka, cluster_name, ?DEFAULT_CLUSTER_NAME_ATOM),
- _ = emqx_exhook_demo_svr:start(),
- ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
- emqx_common_test_helpers:start_apps([emqx_exhook]),
- %% waiting first loaded event
- {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
- ok.
- do_teardown(_) ->
- emqx_common_test_helpers:stop_apps([emqx_exhook]),
- %% waiting last unloaded event
- {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
- _ = emqx_exhook_demo_svr:stop(),
- logger:set_primary_config(#{level => notice}),
- timer:sleep(2000),
- ok.
- %%--------------------------------------------------------------------
- %% Generators
- %%--------------------------------------------------------------------
- conn_properties() ->
- #{}.
- ack_properties() ->
- #{}.
- sub_properties() ->
- #{}.
- unsub_properties() ->
- #{}.
- shutdown_reason() ->
- oneof([utf8(), {shutdown, emqx_proper_types:limited_latin_atom()}]).
- authresult() ->
- ?LET(
- RC,
- connack_return_code(),
- case RC of
- success -> ok;
- _ -> {error, RC}
- end
- ).
- inject_magic_into(Key, Object) ->
- case castspell() of
- muggles -> Object;
- Spell -> Object#{Key => Spell}
- end.
- castspell() ->
- L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles],
- lists:nth(rand:uniform(length(L)), L).
- request_meta() ->
- #{
- node => nodestr(),
- version => stringfy(emqx_sys:version()),
- sysdescr => stringfy(emqx_sys:sysdescr()),
- cluster_name => ?DEFAULT_CLUSTER_NAME_BIN
- }.
|