| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- %% @doc Gateway Interface Module for HTTP-APIs
- -module(emqx_gateway_http).
- -include("emqx_gateway.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include_lib("emqx_auth/include/emqx_authn_chains.hrl").
- -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
- -import(emqx_gateway_utils, [listener_id/3]).
- %% Mgmt APIs - gateway
- -export([gateways/1]).
- %% Mgmt APIs
- -export([
- add_listener/2,
- remove_listener/1,
- update_listener/2
- ]).
- -export([
- authn/1,
- authn/2,
- add_authn/2,
- add_authn/3,
- update_authn/2,
- update_authn/3,
- remove_authn/1,
- remove_authn/2
- ]).
- %% Mgmt APIs - clients
- -export([
- lookup_client/3,
- kickout_client/2,
- list_client_subscriptions/2,
- client_subscribe/4,
- client_unsubscribe/3
- ]).
- %% Utils for http, swagger, etc.
- -export([
- return_http_error/2,
- with_gateway/2,
- with_authn/2,
- with_listener_authn/3,
- checks/2,
- reason2resp/1,
- reason2msg/1,
- sum_cluster_connections/1
- ]).
- %% RPC
- -export([gateway_status/1, cluster_gateway_status/1]).
- -type gateway_summary() ::
- #{
- name := binary(),
- status := running | stopped | unloaded,
- created_at => binary(),
- started_at => binary(),
- stopped_at => binary(),
- max_connections => integer(),
- current_connections => integer(),
- listeners => []
- }.
- -elvis([
- {elvis_style, god_modules, disable},
- {elvis_style, no_nested_try_catch, disable},
- {elvis_style, invalid_dynamic_call, disable}
- ]).
- -define(DEFAULT_CALL_TIMEOUT, 15000).
- %%--------------------------------------------------------------------
- %% Mgmt APIs - gateway
- %%--------------------------------------------------------------------
- -spec gateways(Status :: all | running | stopped | unloaded) ->
- [gateway_summary()].
- gateways(Status) ->
- Gateways = lists:map(
- fun({GwName, _}) ->
- case emqx_gateway:lookup(GwName) of
- undefined ->
- #{name => GwName, status => unloaded};
- GwInfo = #{config := Config} ->
- GwInfo0 = emqx_gateway_utils:unix_ts_to_rfc3339(
- [created_at, started_at, stopped_at],
- GwInfo
- ),
- GwInfo1 = maps:with(
- [
- name,
- status,
- created_at,
- started_at,
- stopped_at
- ],
- GwInfo0
- ),
- NodeStatus = cluster_gateway_status(GwName),
- {MaxCons, CurrCons} = sum_cluster_connections(NodeStatus),
- GwInfo1#{
- max_connections => MaxCons,
- current_connections => CurrCons,
- listeners => get_listeners_status(GwName, Config),
- node_status => NodeStatus
- }
- end
- end,
- emqx_gateway_registry:list()
- ),
- case Status of
- all -> Gateways;
- _ -> [Gw || Gw = #{status := S} <- Gateways, S == Status]
- end.
- gateway_status(GwName) ->
- case emqx_gateway:lookup(GwName) of
- undefined ->
- #{node => node(), status => unloaded};
- #{status := Status, config := Config} ->
- #{
- node => node(),
- status => Status,
- max_connections => max_connections_count(Config),
- current_connections => current_connections_count(GwName)
- }
- end.
- cluster_gateway_status(GwName) ->
- Nodes = mria:running_nodes(),
- case emqx_gateway_http_proto_v1:get_cluster_status(Nodes, GwName) of
- {Results, []} ->
- Results;
- {_, _BadNodes} ->
- error(badrpc)
- end.
- %% @private
- max_connections_count(Config) ->
- Listeners = emqx_gateway_utils:normalize_config(Config),
- lists:foldl(
- fun({_, _, _, Conf0}, Acc) ->
- emqx_gateway_utils:plus_max_connections(
- Acc,
- maps:get(max_connections, Conf0, 0)
- )
- end,
- 0,
- Listeners
- ).
- %% @private
- current_connections_count(GwName) ->
- try
- InfoTab = emqx_gateway_cm:tabname(info, GwName),
- ets:info(InfoTab, size)
- catch
- _:_ ->
- 0
- end.
- %% @private
- get_listeners_status(GwName, Config) ->
- Listeners = emqx_gateway_utils:normalize_config(Config),
- lists:map(
- fun({Type, LisName, ListenOn, _}) ->
- Name0 = listener_id(GwName, Type, LisName),
- Name = {Name0, ListenOn},
- LisO = #{id => Name0, type => Type, name => LisName},
- case catch esockd:listener(Name) of
- _Pid when is_pid(_Pid) ->
- LisO#{running => true};
- _ ->
- LisO#{running => false}
- end
- end,
- Listeners
- ).
- %%--------------------------------------------------------------------
- %% Mgmt APIs - listeners
- %%--------------------------------------------------------------------
- -spec add_listener(atom() | binary(), map()) -> {ok, map()}.
- add_listener(ListenerId, NewConf0) ->
- {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
- NewConf = maps:without(
- [
- <<"id">>,
- <<"name">>,
- <<"type">>,
- <<"running">>
- ],
- NewConf0
- ),
- confexp(emqx_gateway_conf:add_listener(GwName, {Type, Name}, NewConf)).
- -spec update_listener(atom() | binary(), map()) -> {ok, map()}.
- update_listener(ListenerId, NewConf0) ->
- {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
- NewConf = maps:without(
- [
- <<"id">>,
- <<"name">>,
- <<"type">>,
- <<"running">>
- ],
- NewConf0
- ),
- confexp(emqx_gateway_conf:update_listener(GwName, {Type, Name}, NewConf)).
- -spec remove_listener(binary()) -> ok.
- remove_listener(ListenerId) ->
- {GwName, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
- confexp(emqx_gateway_conf:remove_listener(GwName, {Type, Name})).
- -spec authn(gateway_name()) -> map().
- authn(GwName) ->
- %% XXX: Need append chain-nanme, authenticator-id?
- Path = [gateway, GwName, ?AUTHN],
- ChainName = emqx_gateway_utils:global_chain(GwName),
- wrap_chain_name(
- ChainName,
- emqx_utils_maps:jsonable_map(emqx:get_raw_config(Path))
- ).
- -spec authn(gateway_name(), binary()) -> map().
- authn(GwName, ListenerId) ->
- {_, Type, Name} = emqx_gateway_utils:parse_listener_id(ListenerId),
- Path = [gateway, GwName, listeners, Type, Name, ?AUTHN],
- ChainName = emqx_gateway_utils:listener_chain(GwName, Type, Name),
- wrap_chain_name(
- ChainName,
- emqx_utils_maps:jsonable_map(emqx:get_raw_config(Path))
- ).
- wrap_chain_name(ChainName, Conf) ->
- case emqx_authn_chains:list_authenticators(ChainName) of
- {ok, [#{id := Id} | _]} ->
- Conf#{chain_name => ChainName, id => Id};
- _ ->
- Conf
- end.
- -spec add_authn(gateway_name(), map()) -> {ok, map()}.
- add_authn(GwName, AuthConf) ->
- confexp(emqx_gateway_conf:add_authn(GwName, AuthConf)).
- -spec add_authn(gateway_name(), binary(), map()) -> {ok, map()}.
- add_authn(GwName, ListenerId, AuthConf) ->
- {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
- confexp(emqx_gateway_conf:add_authn(GwName, {LType, LName}, AuthConf)).
- -spec update_authn(gateway_name(), map()) -> {ok, map()}.
- update_authn(GwName, AuthConf) ->
- confexp(emqx_gateway_conf:update_authn(GwName, AuthConf)).
- -spec update_authn(gateway_name(), binary(), map()) -> {ok, map()}.
- update_authn(GwName, ListenerId, AuthConf) ->
- {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
- confexp(emqx_gateway_conf:update_authn(GwName, {LType, LName}, AuthConf)).
- -spec remove_authn(gateway_name()) -> ok.
- remove_authn(GwName) ->
- confexp(emqx_gateway_conf:remove_authn(GwName)).
- -spec remove_authn(gateway_name(), binary()) -> ok.
- remove_authn(GwName, ListenerId) ->
- {_, LType, LName} = emqx_gateway_utils:parse_listener_id(ListenerId),
- confexp(emqx_gateway_conf:remove_authn(GwName, {LType, LName})).
- confexp(ok) -> ok;
- confexp({ok, Res}) -> {ok, Res};
- confexp({error, Reason}) -> error(Reason).
- %%--------------------------------------------------------------------
- %% Mgmt APIs - clients
- %%--------------------------------------------------------------------
- -spec lookup_client(
- gateway_name(),
- emqx_types:clientid(),
- {module(), atom()}
- ) -> list().
- lookup_client(GwName, ClientId, {M, F}) ->
- [
- begin
- Info = emqx_gateway_cm:get_chan_info(GwName, ClientId, Pid),
- Stats = emqx_gateway_cm:get_chan_stats(GwName, ClientId, Pid),
- M:F({{ClientId, Pid}, Info, Stats})
- end
- || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)
- ].
- -spec kickout_client(gateway_name(), emqx_types:clientid()) ->
- {error, any()}
- | ok.
- kickout_client(GwName, ClientId) ->
- Results = [
- emqx_gateway_cm:kick_session(GwName, ClientId, Pid)
- || Pid <- emqx_gateway_cm:lookup_by_clientid(GwName, ClientId)
- ],
- IsOk = lists:any(fun(Item) -> Item =:= ok end, Results),
- case {IsOk, Results} of
- {true, _} -> ok;
- {_, []} -> {error, not_found};
- {false, _} -> lists:last(Results)
- end.
- -spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) ->
- {error, any()}
- | {ok, list()}.
- list_client_subscriptions(GwName, ClientId) ->
- case client_call(GwName, ClientId, subscriptions) of
- {error, Reason} ->
- {error, Reason};
- {ok, Subs} ->
- {ok,
- lists:map(
- fun({Topic, SubOpts}) ->
- SubOpts#{topic => Topic}
- end,
- Subs
- )}
- end.
- -spec client_subscribe(
- gateway_name(),
- emqx_types:clientid(),
- emqx_types:topic(),
- emqx_types:subopts()
- ) ->
- {error, any()}
- | {ok, {emqx_types:topic(), emqx_types:subopts()}}.
- client_subscribe(GwName, ClientId, Topic, SubOpts) ->
- client_call(GwName, ClientId, {subscribe, Topic, SubOpts}).
- -spec client_unsubscribe(
- gateway_name(),
- emqx_types:clientid(),
- emqx_types:topic()
- ) ->
- {error, any()}
- | ok.
- client_unsubscribe(GwName, ClientId, Topic) ->
- client_call(GwName, ClientId, {unsubscribe, Topic}).
- client_call(GwName, ClientId, Req) ->
- try
- emqx_gateway_cm:call(
- GwName,
- ClientId,
- Req,
- ?DEFAULT_CALL_TIMEOUT
- )
- of
- undefined ->
- {error, not_found};
- ignored ->
- {error, ignored};
- Res ->
- Res
- catch
- throw:noproc ->
- {error, not_found};
- throw:{badrpc, Reason} ->
- {error, {badrpc, Reason}}
- end.
- %%--------------------------------------------------------------------
- %% Utils
- %%--------------------------------------------------------------------
- -spec reason2resp({atom(), map()} | any()) -> binary() | any().
- reason2resp(R) ->
- case reason2msg(R) of
- error ->
- return_http_error(500, R);
- Msg ->
- return_http_error(400, Msg)
- end.
- -spec return_http_error(integer(), any()) -> {integer(), atom(), binary()}.
- return_http_error(Code, Msg) ->
- {Code, codestr(Code), emqx_gateway_utils:stringfy(Msg)}.
- -spec reason2msg({atom(), map()} | any()) -> error | string().
- reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
- NValue =
- case emqx_utils_json:safe_encode(Value) of
- {ok, Str} -> Str;
- {error, _} -> emqx_gateway_utils:stringfy(Value)
- end,
- fmtstr(
- "Bad config value '~s' for '~s', reason: ~s",
- [NValue, Key, emqx_gateway_utils:stringfy(Reason)]
- );
- reason2msg(
- {badres, #{
- resource := gateway,
- gateway := GwName,
- reason := not_found
- }}
- ) ->
- fmtstr("The ~s gateway is unloaded", [GwName]);
- reason2msg(
- {badres, #{
- resource := gateway,
- gateway := GwName,
- reason := already_exist
- }}
- ) ->
- fmtstr("The ~s gateway already loaded", [GwName]);
- reason2msg(
- {badres, #{
- resource := listener,
- listener := {GwName, LType, LName},
- reason := not_found
- }}
- ) ->
- fmtstr("Listener ~s not found", [listener_id(GwName, LType, LName)]);
- reason2msg(
- {badres, #{
- resource := listener,
- listener := {GwName, LType, LName},
- reason := already_exist
- }}
- ) ->
- fmtstr(
- "The listener ~s of ~s already exist",
- [listener_id(GwName, LType, LName), GwName]
- );
- reason2msg(
- {badres, #{
- resource := authn,
- gateway := GwName,
- reason := not_found
- }}
- ) ->
- fmtstr("The authentication not found on ~s", [GwName]);
- reason2msg(
- {badres, #{
- resource := authn,
- gateway := GwName,
- reason := already_exist
- }}
- ) ->
- fmtstr("The authentication already exist on ~s", [GwName]);
- reason2msg(
- {badres, #{
- resource := listener_authn,
- listener := {GwName, LType, LName},
- reason := not_found
- }}
- ) ->
- fmtstr(
- "The authentication not found on ~s",
- [listener_id(GwName, LType, LName)]
- );
- reason2msg(
- {badres, #{
- resource := listener_authn,
- listener := {GwName, LType, LName},
- reason := already_exist
- }}
- ) ->
- fmtstr(
- "The authentication already exist on ~s",
- [listener_id(GwName, LType, LName)]
- );
- reason2msg(
- {bad_ssl_config, #{
- reason := Reason,
- which_options := Options
- }}
- ) ->
- fmtstr("Bad TLS configuration for ~p, reason: ~s", [Options, Reason]);
- reason2msg(
- {#{roots := [{gateway, _}]}, [_ | _]} = Error
- ) ->
- Bin = emqx_utils:readable_error_msg(Error),
- <<"Invalid configurations: ", Bin/binary>>;
- reason2msg(_) ->
- error.
- codestr(400) -> 'BAD_REQUEST';
- codestr(404) -> 'RESOURCE_NOT_FOUND';
- codestr(405) -> 'METHOD_NOT_ALLOWED';
- codestr(409) -> 'NOT_SUPPORT';
- codestr(500) -> 'UNKNOWN_ERROR';
- codestr(501) -> 'NOT_IMPLEMENTED'.
- fmtstr(Fmt, Args) ->
- lists:flatten(io_lib:format(Fmt, Args)).
- -spec with_authn(atom(), function()) -> any().
- with_authn(GwName0, Fun) ->
- with_gateway(GwName0, fun(GwName, _GwConf) ->
- Authn = emqx_gateway_http:authn(GwName),
- Fun(GwName, Authn)
- end).
- -spec with_listener_authn(atom(), binary(), function()) -> any().
- with_listener_authn(GwName0, Id, Fun) ->
- with_gateway(GwName0, fun(GwName, _GwConf) ->
- Authn = emqx_gateway_http:authn(GwName, Id),
- Fun(GwName, Authn)
- end).
- -spec with_gateway(atom(), function()) -> any().
- with_gateway(GwName, Fun) ->
- try
- case emqx_gateway:lookup(GwName) of
- undefined ->
- return_http_error(404, "Gateway not loaded");
- Gateway ->
- Fun(GwName, Gateway)
- end
- catch
- error:badname ->
- return_http_error(404, "Bad gateway name");
- %% Exceptions from: checks/2
- error:{miss_param, K} ->
- return_http_error(400, [K, " is required"]);
- %% Exceptions from emqx_gateway_utils:parse_listener_id/1
- error:{invalid_listener_id, Id} ->
- return_http_error(404, ["Listener not found: ", Id]);
- %% Exceptions from emqx:get_config/1
- error:{config_not_found, Path0} ->
- Path = lists:concat(
- lists:join(".", lists:map(fun to_list/1, Path0))
- ),
- return_http_error(404, "Resource not found. path: " ++ Path);
- error:{badmatch, {error, einval}} ->
- return_http_error(400, "Invalid bind address");
- error:{badauth, Reason} ->
- Reason1 = emqx_gateway_utils:stringfy(Reason),
- return_http_error(400, ["Bad authentication config: ", Reason1]);
- Class:Reason:Stk ->
- ?SLOG(error, #{
- msg => "uncaught_exception",
- exception => Class,
- reason => Reason,
- stacktrace => Stk
- }),
- reason2resp(Reason)
- end.
- -spec checks(list(), map()) -> ok.
- checks([], _) ->
- ok;
- checks([K | Ks], Map) ->
- case maps:is_key(K, Map) of
- true -> checks(Ks, Map);
- false -> error({miss_param, K})
- end.
- to_list(A) when is_atom(A) ->
- atom_to_list(A);
- to_list(B) when is_binary(B) ->
- binary_to_list(B).
- sum_cluster_connections(List) ->
- sum_cluster_connections(List, 0, 0).
- %%--------------------------------------------------------------------
- %% Internal funcs
- sum_cluster_connections(
- [#{max_connections := Max, current_connections := Current} | T], MaxAcc, CurrAcc
- ) ->
- NMaxAcc = emqx_gateway_utils:plus_max_connections(MaxAcc, Max),
- sum_cluster_connections(T, NMaxAcc, Current + CurrAcc);
- sum_cluster_connections([_ | T], MaxAcc, CurrAcc) ->
- sum_cluster_connections(T, MaxAcc, CurrAcc);
- sum_cluster_connections([], MaxAcc, CurrAcc) ->
- {MaxAcc, CurrAcc}.
|