| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_exhook_api).
- -behaviour(minirest_api).
- -include("emqx_exhook.hrl").
- -include_lib("typerefl/include/types.hrl").
- -include_lib("emqx/include/logger.hrl").
- -include_lib("hocon/include/hoconsc.hrl").
- -export([
- api_spec/0,
- paths/0,
- schema/1,
- fields/1,
- namespace/0
- ]).
- -export([
- exhooks/2,
- action_with_name/2,
- move/2,
- server_hooks/2
- ]).
- -import(hoconsc, [mk/1, mk/2, ref/1, enum/1, array/1, map/2]).
- -import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]).
- -define(TAGS, [<<"ExHook">>]).
- -define(NOT_FOURD, 'NOT_FOUND').
- -define(BAD_REQUEST, 'BAD_REQUEST').
- -define(BAD_RPC, 'BAD_RPC').
- -define(ERR_BADARGS(REASON), begin
- R0 = err_msg(REASON),
- <<"Bad Arguments: ", R0/binary>>
- end).
- -dialyzer([
- {nowarn_function, [
- fill_cluster_server_info/5,
- nodes_server_info/5,
- fill_server_hooks_info/4
- ]}
- ]).
- %%--------------------------------------------------------------------
- %% schema
- %%--------------------------------------------------------------------
- namespace() -> "exhook".
- api_spec() ->
- emqx_dashboard_swagger:spec(?MODULE).
- paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move", "/exhooks/:name/hooks"].
- schema(("/exhooks")) ->
- #{
- 'operationId' => exhooks,
- get => #{
- tags => ?TAGS,
- desc => ?DESC(list_all_servers),
- responses => #{200 => mk(array(ref(detail_server_info)))}
- },
- post => #{
- tags => ?TAGS,
- desc => ?DESC(add_server),
- 'requestBody' => server_conf_schema(),
- responses => #{
- 200 => mk(ref(detail_server_info)),
- 400 => error_codes([?BAD_REQUEST], <<"Already exists">>),
- 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
- }
- }
- };
- schema("/exhooks/:name") ->
- #{
- 'operationId' => action_with_name,
- get => #{
- tags => ?TAGS,
- desc => ?DESC(get_detail),
- parameters => params_server_name_in_path(),
- responses => #{
- 200 => mk(ref(detail_server_info)),
- 404 => error_codes([?NOT_FOURD], <<"Server not found">>)
- }
- },
- put => #{
- tags => ?TAGS,
- desc => ?DESC(update_server),
- parameters => params_server_name_in_path(),
- 'requestBody' => server_conf_schema(),
- responses => #{
- 200 => mk(ref(detail_server_info)),
- 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
- 404 => error_codes([?NOT_FOURD], <<"Server not found">>),
- 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
- }
- },
- delete => #{
- tags => ?TAGS,
- desc => ?DESC(delete_server),
- parameters => params_server_name_in_path(),
- responses => #{
- 204 => <<>>,
- 404 => error_codes([?NOT_FOURD], <<"Server not found">>),
- 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
- }
- }
- };
- schema("/exhooks/:name/hooks") ->
- #{
- 'operationId' => server_hooks,
- get => #{
- tags => ?TAGS,
- desc => ?DESC(get_hooks),
- parameters => params_server_name_in_path(),
- responses => #{
- 200 => mk(array(ref(list_hook_info))),
- 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
- }
- }
- };
- schema("/exhooks/:name/move") ->
- #{
- 'operationId' => move,
- post => #{
- tags => ?TAGS,
- desc => ?DESC(move_api),
- parameters => params_server_name_in_path(),
- 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
- ref(move_req),
- position_example()
- ),
- responses => #{
- 204 => <<"No Content">>,
- 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
- 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
- }
- }
- }.
- fields(move_req) ->
- [
- {position,
- mk(string(), #{
- required => true,
- desc => ?DESC(move_position),
- example => <<"front">>
- })}
- ];
- fields(detail_server_info) ->
- [
- {metrics, mk(ref(metrics), #{desc => ?DESC(server_metrics)})},
- {node_metrics, mk(array(ref(node_metrics)), #{desc => ?DESC(node_metrics)})},
- {node_status, mk(array(ref(node_status)), #{desc => ?DESC(node_status)})},
- {hooks, mk(array(ref(hook_info)))}
- ] ++ emqx_exhook_schema:server_config();
- fields(list_hook_info) ->
- [
- {name, mk(binary(), #{desc => ?DESC(hook_name)})},
- {params, mk(map(name, binary()), #{desc => ?DESC(hook_params)})},
- {metrics, mk(ref(metrics), #{desc => ?DESC(hook_metrics)})},
- {node_metrics, mk(array(ref(node_metrics)), #{desc => ?DESC(node_hook_metrics)})}
- ];
- fields(node_metrics) ->
- [
- {node, mk(string(), #{desc => ?DESC(node)})},
- {metrics, mk(ref(metrics), #{desc => ?DESC(metrics)})}
- ];
- fields(node_status) ->
- [
- {node, mk(string(), #{desc => ?DESC(node)})},
- {status,
- mk(enum([connected, connecting, disconnected, disabled, error]), #{
- desc => ?DESC(status)
- })}
- ];
- fields(hook_info) ->
- [
- {name, mk(binary(), #{desc => ?DESC(hook_name)})},
- {params, mk(map(name, binary()), #{desc => ?DESC(hook_params)})}
- ];
- fields(metrics) ->
- [
- {succeed, mk(integer(), #{desc => ?DESC(metric_succeed)})},
- {failed, mk(integer(), #{desc => ?DESC(metric_failed)})},
- {rate, mk(integer(), #{desc => ?DESC(metric_rate)})},
- {max_rate, mk(integer(), #{desc => ?DESC(metric_max_rate)})}
- ];
- fields(server_config) ->
- emqx_exhook_schema:server_config().
- params_server_name_in_path() ->
- [
- {name,
- mk(string(), #{
- desc => ?DESC(server_name),
- in => path,
- required => true,
- example => <<"default">>
- })}
- ].
- server_conf_schema() ->
- SSL = #{
- enable => false,
- cacertfile => <<"/etc/emqx/certs/cacert.pem">>,
- certfile => <<"/etc/emqx/certs/cert.pem">>,
- keyfile => <<"/etc/emqx/certs/key.pem">>
- },
- schema_with_example(
- ref(server_config),
- #{
- name => "default",
- enable => true,
- url => <<"http://127.0.0.1:8081">>,
- request_timeout => <<"5s">>,
- failed_action => deny,
- auto_reconnect => <<"60s">>,
- pool_size => 8,
- ssl => SSL
- }
- ).
- %%--------------------------------------------------------------------
- %% API
- %%--------------------------------------------------------------------
- exhooks(get, _) ->
- Confs = get_raw_config(),
- Infos = nodes_all_server_info(Confs),
- {200, Infos};
- exhooks(post, #{body := #{<<"name">> := Name} = Body}) ->
- case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of
- {ok, _} ->
- get_nodes_server_info(Name);
- {error, already_exists} ->
- {400, #{
- code => <<"BAD_REQUEST">>,
- message => <<"Already exists">>
- }};
- {error, Reason} ->
- {400, #{
- code => <<"BAD_REQUEST">>,
- message => ?ERR_BADARGS(Reason)
- }}
- end.
- action_with_name(get, #{bindings := #{name := Name}}) ->
- get_nodes_server_info(Name);
- action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
- case
- emqx_exhook_mgr:update_config(
- [exhook, servers],
- {update, Name, Body}
- )
- of
- {ok, _} ->
- get_nodes_server_info(Name);
- {error, not_found} ->
- {404, #{
- code => <<"NOT_FOUND">>,
- message => <<"Server not found">>
- }};
- {error, Reason} ->
- {400, #{
- code => <<"BAD_REQUEST">>,
- message => ?ERR_BADARGS(Reason)
- }}
- end;
- action_with_name(delete, #{bindings := #{name := Name}}) ->
- case
- emqx_exhook_mgr:update_config(
- [exhook, servers],
- {delete, Name}
- )
- of
- {ok, _} ->
- {204};
- {error, not_found} ->
- {404, #{
- code => <<"BAD_REQUEST">>,
- message => <<"Server not found">>
- }};
- {error, Reason} ->
- {400, #{
- code => <<"BAD_REQUEST">>,
- message => ?ERR_BADARGS(Reason)
- }}
- end.
- move(post, #{bindings := #{name := Name}, body := #{<<"position">> := RawPosition}}) ->
- case parse_position(RawPosition) of
- {ok, Position} ->
- case
- emqx_exhook_mgr:update_config(
- [exhook, servers],
- {move, Name, Position}
- )
- of
- {ok, ok} ->
- {204};
- {error, not_found} ->
- {404, #{
- code => <<"BAD_REQUEST">>,
- message => <<"Server not found">>
- }};
- {error, Error} ->
- {500, #{
- code => <<"BAD_RPC">>,
- message => Error
- }}
- end;
- {error, invalid_position} ->
- {400, #{
- code => <<"BAD_REQUEST">>,
- message => <<"Invalid Position">>
- }}
- end.
- server_hooks(get, #{bindings := #{name := Name}}) ->
- Confs = get_raw_config(),
- case lists:search(fun(#{<<"name">> := CfgName}) -> CfgName =:= Name end, Confs) of
- false ->
- {400, #{
- code => <<"BAD_REQUEST">>,
- message => <<"Server not found">>
- }};
- _ ->
- Info = get_nodes_server_hooks_info(Name),
- {200, Info}
- end.
- get_nodes_server_info(Name) ->
- Confs = get_raw_config(),
- case lists:search(fun(#{<<"name">> := CfgName}) -> CfgName =:= Name end, Confs) of
- false ->
- {404, #{
- code => <<"BAD_REQUEST">>,
- message => <<"Server not found">>
- }};
- {value, Conf} ->
- NodeStatus = nodes_server_info(Name),
- {200, maps:merge(Conf, NodeStatus)}
- end.
- %%--------------------------------------------------------------------
- %% GET /exhooks
- %%--------------------------------------------------------------------
- nodes_all_server_info(ConfL) ->
- AllInfos = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:all_servers_info(Nodes) end),
- Default = emqx_exhook_metrics:new_metrics_info(),
- node_all_server_info(ConfL, AllInfos, Default, []).
- node_all_server_info([#{<<"name">> := ServerName} = Conf | T], AllInfos, Default, Acc) ->
- Info = fill_cluster_server_info(AllInfos, [], [], ServerName, Default),
- AllInfo = maps:merge(Conf, Info),
- node_all_server_info(T, AllInfos, Default, [AllInfo | Acc]);
- node_all_server_info([], _, _, Acc) ->
- lists:reverse(Acc).
- fill_cluster_server_info([{Node, {error, _}} | T], StatusL, MetricsL, ServerName, Default) ->
- fill_cluster_server_info(
- T,
- [#{node => Node, status => error} | StatusL],
- [#{node => Node, metrics => Default} | MetricsL],
- ServerName,
- Default
- );
- fill_cluster_server_info([{Node, Result} | T], StatusL, MetricsL, ServerName, Default) ->
- #{status := Status, metrics := Metrics} = Result,
- fill_cluster_server_info(
- T,
- [#{node => Node, status => maps:get(ServerName, Status, error)} | StatusL],
- [#{node => Node, metrics => maps:get(ServerName, Metrics, Default)} | MetricsL],
- ServerName,
- Default
- );
- fill_cluster_server_info([], StatusL, MetricsL, ServerName, _) ->
- Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
- #{
- metrics => Metrics,
- node_metrics => MetricsL,
- node_status => StatusL,
- hooks => emqx_exhook_mgr:hooks(ServerName)
- }.
- %%--------------------------------------------------------------------
- %% GET /exhooks/{name}
- %%--------------------------------------------------------------------
- nodes_server_info(Name) ->
- InfoL = call_cluster(fun(Nodes) -> emqx_exhook_proto_v1:server_info(Nodes, Name) end),
- Default = emqx_exhook_metrics:new_metrics_info(),
- nodes_server_info(InfoL, Name, Default, [], []).
- nodes_server_info([{Node, {error, _}} | T], Name, Default, StatusL, MetricsL) ->
- nodes_server_info(
- T,
- Name,
- Default,
- [#{node => Node, status => error} | StatusL],
- [#{node => Node, metrics => Default} | MetricsL]
- );
- nodes_server_info([{Node, Result} | T], Name, Default, StatusL, MetricsL) ->
- #{status := Status, metrics := Metrics} = Result,
- nodes_server_info(
- T,
- Name,
- Default,
- [#{node => Node, status => Status} | StatusL],
- [#{node => Node, metrics => Metrics} | MetricsL]
- );
- nodes_server_info([], Name, _, StatusL, MetricsL) ->
- #{
- metrics => emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
- node_status => StatusL,
- node_metrics => MetricsL,
- hooks => emqx_exhook_mgr:hooks(Name)
- }.
- %%--------------------------------------------------------------------
- %% GET /exhooks/{name}/hooks
- %%--------------------------------------------------------------------
- get_nodes_server_hooks_info(Name) ->
- case emqx_exhook_mgr:hooks(Name) of
- [] ->
- [];
- Hooks ->
- AllInfos = call_cluster(fun(Nodes) ->
- emqx_exhook_proto_v1:server_hooks_metrics(Nodes, Name)
- end),
- Default = emqx_exhook_metrics:new_metrics_info(),
- get_nodes_server_hooks_info(Hooks, AllInfos, Default, [])
- end.
- get_nodes_server_hooks_info([#{name := Name} = Spec | T], AllInfos, Default, Acc) ->
- Info = fill_server_hooks_info(AllInfos, Name, Default, []),
- AllInfo = maps:merge(Spec, Info),
- get_nodes_server_hooks_info(T, AllInfos, Default, [AllInfo | Acc]);
- get_nodes_server_hooks_info([], _, _, Acc) ->
- Acc.
- fill_server_hooks_info([{_, {error, _}} | T], Name, Default, MetricsL) ->
- fill_server_hooks_info(T, Name, Default, MetricsL);
- fill_server_hooks_info([{Node, MetricsMap} | T], Name, Default, MetricsL) ->
- Metrics = maps:get(Name, MetricsMap, Default),
- NodeMetrics = #{node => Node, metrics => Metrics},
- fill_server_hooks_info(T, Name, Default, [NodeMetrics | MetricsL]);
- fill_server_hooks_info([], _Name, _Default, MetricsL) ->
- Metrics = emqx_exhook_metrics:metrics_aggregate_by_key(metrics, MetricsL),
- #{metrics => Metrics, node_metrics => MetricsL}.
- %%--------------------------------------------------------------------
- %% cluster call
- %%--------------------------------------------------------------------
- -spec call_cluster(fun(([node()]) -> emqx_rpc:erpc_multicall(A))) ->
- [{node(), A | {error, _Err}}].
- call_cluster(Fun) ->
- Nodes = mria:running_nodes(),
- Ret = Fun(Nodes),
- lists:zip(Nodes, lists:map(fun emqx_rpc:unwrap_erpc/1, Ret)).
- %%--------------------------------------------------------------------
- %% Internal Funcs
- %%--------------------------------------------------------------------
- err_msg(Msg) -> emqx_utils:readable_error_msg(Msg).
- get_raw_config() ->
- RawConfig = emqx:get_raw_config([exhook, servers], []),
- Schema = #{roots => emqx_exhook_schema:fields(exhook), fields => #{}},
- Conf = #{<<"servers">> => RawConfig},
- #{<<"servers">> := Servers} = hocon_tconf:make_serializable(Schema, Conf, #{}),
- Servers.
- position_example() ->
- #{
- front =>
- #{
- summary => <<"absolute position 'front'">>,
- value => #{<<"position">> => <<"front">>}
- },
- rear =>
- #{
- summary => <<"absolute position 'rear'">>,
- value => #{<<"position">> => <<"rear">>}
- },
- related_before =>
- #{
- summary => <<"relative position 'before'">>,
- value => #{<<"position">> => <<"before:default">>}
- },
- related_after =>
- #{
- summary => <<"relative position 'after'">>,
- value => #{<<"position">> => <<"after:default">>}
- }
- }.
- parse_position(<<"front">>) ->
- {ok, ?CMD_MOVE_FRONT};
- parse_position(<<"rear">>) ->
- {ok, ?CMD_MOVE_REAR};
- parse_position(<<"before:">>) ->
- {error, invalid_position};
- parse_position(<<"after:">>) ->
- {error, invalid_position};
- parse_position(<<"before:", Related/binary>>) ->
- {ok, ?CMD_MOVE_BEFORE(Related)};
- parse_position(<<"after:", Related/binary>>) ->
- {ok, ?CMD_MOVE_AFTER(Related)};
- parse_position(_) ->
- {error, invalid_position}.
|