| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_exhook_handler).
- -include_lib("emqx/include/emqx.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include_lib("emqx/include/emqx_access_control.hrl").
- -export([
- on_client_connect/2,
- on_client_connack/3,
- on_client_connected/2,
- on_client_disconnected/3,
- on_client_authenticate/2,
- on_client_authorize/4,
- on_client_subscribe/3,
- on_client_unsubscribe/3
- ]).
- %% Session Lifecircle Hooks
- -export([
- on_session_created/2,
- on_session_subscribed/3,
- on_session_unsubscribed/3,
- on_session_resumed/2,
- on_session_discarded/2,
- on_session_takenover/2,
- on_session_terminated/3
- ]).
- -export([
- on_message_publish/1,
- on_message_dropped/3,
- on_message_delivered/2,
- on_message_acked/2
- ]).
- %% Utils
- -export([
- message/1,
- headers/1,
- stringfy/1,
- merge_responsed_bool/2,
- merge_responsed_message/2,
- assign_to_message/2,
- clientinfo/1,
- request_meta/0
- ]).
- -import(
- emqx_exhook,
- [
- cast/2,
- call_fold/3
- ]
- ).
- -elvis([{elvis_style, god_modules, disable}]).
- %%--------------------------------------------------------------------
- %% Clients
- %%--------------------------------------------------------------------
- on_client_connect(ConnInfo, Props) ->
- Req = #{
- conninfo => conninfo(ConnInfo),
- props => properties(Props)
- },
- cast('client.connect', Req).
- on_client_connack(ConnInfo, Rc, Props) ->
- Req = #{
- conninfo => conninfo(ConnInfo),
- result_code => stringfy(Rc),
- props => properties(Props)
- },
- cast('client.connack', Req).
- on_client_connected(ClientInfo, _ConnInfo) ->
- Req = #{clientinfo => clientinfo(ClientInfo)},
- cast('client.connected', Req).
- on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- reason => stringfy(Reason)
- },
- cast('client.disconnected', Req).
- on_client_authenticate(ClientInfo, AuthResult) ->
- %% XXX: Bool is missing more information about the atom of the result
- %% So, the `Req` has missed detailed info too.
- %%
- %% The return value of `call_fold` just a bool, that has missed
- %% detailed info too.
- %%
- Bool = AuthResult == ok,
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- result => Bool
- },
- case
- call_fold(
- 'client.authenticate',
- Req,
- fun merge_responsed_bool/2
- )
- of
- {StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
- Result =
- case Result0 of
- true -> ok;
- _ -> {error, not_authorized}
- end,
- {StopOrOk, Result};
- _ ->
- {ok, AuthResult}
- end.
- on_client_authorize(ClientInfo, Action, Topic, Result) ->
- Bool = maps:get(result, Result, deny) == allow,
- %% TODO: Support full action in major release
- Type =
- case Action of
- ?authz_action(publish) -> 'PUBLISH';
- ?authz_action(subscribe) -> 'SUBSCRIBE'
- end,
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- type => Type,
- topic => emqx_topic:get_shared_real_topic(Topic),
- result => Bool
- },
- case
- call_fold(
- 'client.authorize',
- Req,
- fun merge_responsed_bool/2
- )
- of
- {StopOrOk, #{result := Result0}} when is_boolean(Result0) ->
- NResult =
- case Result0 of
- true -> allow;
- _ -> deny
- end,
- {StopOrOk, #{result => NResult, from => exhook}};
- _ ->
- {ok, Result}
- end.
- on_client_subscribe(ClientInfo, Props, TopicFilters) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- props => properties(Props),
- topic_filters => topicfilters(TopicFilters)
- },
- cast('client.subscribe', Req).
- on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- props => properties(Props),
- topic_filters => topicfilters(TopicFilters)
- },
- cast('client.unsubscribe', Req).
- %%--------------------------------------------------------------------
- %% Session
- %%--------------------------------------------------------------------
- on_session_created(ClientInfo, _SessInfo) ->
- Req = #{clientinfo => clientinfo(ClientInfo)},
- cast('session.created', Req).
- on_session_subscribed(ClientInfo, Topic, SubOpts) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- topic => emqx_topic:maybe_format_share(Topic),
- subopts => subopts(SubOpts)
- },
- cast('session.subscribed', Req).
- on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- topic => emqx_topic:maybe_format_share(Topic)
- %% no subopts when unsub
- },
- cast('session.unsubscribed', Req).
- on_session_resumed(ClientInfo, _SessInfo) ->
- Req = #{clientinfo => clientinfo(ClientInfo)},
- cast('session.resumed', Req).
- on_session_discarded(ClientInfo, _SessInfo) ->
- Req = #{clientinfo => clientinfo(ClientInfo)},
- cast('session.discarded', Req).
- on_session_takenover(ClientInfo, _SessInfo) ->
- Req = #{clientinfo => clientinfo(ClientInfo)},
- cast('session.takenover', Req).
- on_session_terminated(ClientInfo, Reason, _SessInfo) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- reason => stringfy(Reason)
- },
- cast('session.terminated', Req).
- %%--------------------------------------------------------------------
- %% Message
- %%--------------------------------------------------------------------
- on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
- ok;
- on_message_publish(Message) ->
- Req = #{message => message(Message)},
- case
- call_fold(
- 'message.publish',
- Req,
- fun emqx_exhook_handler:merge_responsed_message/2
- )
- of
- {StopOrOk, #{message := NMessage}} ->
- {StopOrOk, assign_to_message(NMessage, Message)};
- _ ->
- {ok, Message}
- end.
- on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
- ok;
- on_message_dropped(Message, _By, Reason) ->
- Req = #{
- message => message(Message),
- reason => stringfy(Reason)
- },
- cast('message.dropped', Req).
- on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
- ok;
- on_message_delivered(ClientInfo, Message) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- message => message(Message)
- },
- cast('message.delivered', Req).
- on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
- ok;
- on_message_acked(ClientInfo, Message) ->
- Req = #{
- clientinfo => clientinfo(ClientInfo),
- message => message(Message)
- },
- cast('message.acked', Req).
- %%--------------------------------------------------------------------
- %% Types
- properties(undefined) ->
- [];
- properties(M) when is_map(M) ->
- maps:fold(
- fun(K, V, Acc) ->
- [
- #{
- name => stringfy(K),
- value => stringfy(V)
- }
- | Acc
- ]
- end,
- [],
- M
- ).
- conninfo(
- ConnInfo =
- #{
- clientid := ClientId,
- peername := {Peerhost, PeerPort},
- sockname := {_, SockPort}
- }
- ) ->
- Username = maps:get(username, ConnInfo, undefined),
- ProtoName = maps:get(proto_name, ConnInfo, undefined),
- ProtoVer = maps:get(proto_ver, ConnInfo, undefined),
- Keepalive = maps:get(keepalive, ConnInfo, 0),
- #{
- node => stringfy(node()),
- clientid => ClientId,
- username => maybe(Username),
- peerhost => ntoa(Peerhost),
- peerport => PeerPort,
- sockport => SockPort,
- proto_name => ProtoName,
- proto_ver => stringfy(ProtoVer),
- keepalive => Keepalive
- }.
- clientinfo(
- ClientInfo =
- #{
- clientid := ClientId,
- username := Username,
- peerhost := PeerHost,
- peerport := PeerPort,
- sockport := SockPort,
- protocol := Protocol,
- mountpoint := Mountpoiont
- }
- ) ->
- #{
- node => stringfy(node()),
- clientid => ClientId,
- username => maybe(Username),
- password => maybe(maps:get(password, ClientInfo, undefined)),
- peerhost => ntoa(PeerHost),
- peerport => PeerPort,
- sockport => SockPort,
- protocol => stringfy(Protocol),
- mountpoint => maybe(Mountpoiont),
- is_superuser => maps:get(is_superuser, ClientInfo, false),
- anonymous => maps:get(anonymous, ClientInfo, true),
- cn => maybe(maps:get(cn, ClientInfo, undefined)),
- dn => maybe(maps:get(dn, ClientInfo, undefined))
- }.
- message(#message{
- id = Id,
- qos = Qos,
- from = From,
- topic = Topic,
- payload = Payload,
- timestamp = Ts,
- headers = Headers
- }) ->
- #{
- node => stringfy(node()),
- id => emqx_guid:to_hexstr(Id),
- qos => Qos,
- from => stringfy(From),
- topic => Topic,
- payload => Payload,
- timestamp => Ts,
- headers => headers(Headers)
- }.
- headers(Headers) ->
- Ls = [username, protocol, peerhost, allow_publish],
- maps:fold(
- fun
- (_, undefined, Acc) ->
- %% Ignore undefined value
- Acc;
- (K, V, Acc) ->
- case lists:member(K, Ls) of
- true ->
- Acc#{atom_to_binary(K) => bin(K, V)};
- _ ->
- Acc
- end
- end,
- #{},
- Headers
- ).
- bin(K, V) when
- K == username;
- K == protocol;
- K == allow_publish
- ->
- bin(V);
- bin(peerhost, V) ->
- bin(inet:ntoa(V)).
- bin(V) when is_binary(V) -> V;
- bin(V) when is_atom(V) -> atom_to_binary(V);
- bin(V) when is_list(V) -> iolist_to_binary(V).
- assign_to_message(
- InMessage = #{
- qos := Qos,
- topic := Topic,
- payload := Payload
- },
- Message
- ) ->
- NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
- enrich_header(maps:get(headers, InMessage, #{}), NMsg).
- enrich_header(Headers, Message) ->
- case maps:get(<<"allow_publish">>, Headers, undefined) of
- <<"false">> ->
- emqx_message:set_header(allow_publish, false, Message);
- <<"true">> ->
- emqx_message:set_header(allow_publish, true, Message);
- _ ->
- Message
- end.
- topicfilters(Tfs) when is_list(Tfs) ->
- [
- #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)}
- || {Topic, SubOpts} <- Tfs
- ].
- subopts(SubOpts) ->
- #{
- qos => maps:get(qos, SubOpts, 0),
- rh => maps:get(rh, SubOpts, 0),
- rap => maps:get(rap, SubOpts, 0),
- nl => maps:get(nl, SubOpts, 0)
- }.
- ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) ->
- list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
- ntoa(IP) ->
- list_to_binary(inet_parse:ntoa(IP)).
- maybe(undefined) -> <<>>;
- maybe(B) -> B.
- %% @private
- stringfy(Term) when is_binary(Term) ->
- Term;
- stringfy(Term) when is_integer(Term) ->
- integer_to_binary(Term);
- stringfy(Term) when is_atom(Term) ->
- atom_to_binary(Term, utf8);
- stringfy(Term) ->
- unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
- %%--------------------------------------------------------------------
- %% Acc funcs
- %% see exhook.proto
- merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
- ignore;
- merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when
- is_boolean(NewBool)
- ->
- {ret(Type), Req#{result => NewBool}};
- merge_responsed_bool(_Req, Resp) ->
- ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
- ignore.
- merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
- ignore;
- merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
- {ret(Type), Req#{message => NMessage}};
- merge_responsed_message(_Req, Resp) ->
- ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}),
- ignore.
- ret('CONTINUE') -> ok;
- ret('STOP_AND_RETURN') -> stop.
- request_meta() ->
- #{
- node => stringfy(node()),
- version => emqx_sys:version(),
- sysdescr => emqx_sys:sysdescr(),
- cluster_name => emqx_sys:cluster_name()
- }.
|