|
|
@@ -1,5 +1,5 @@
|
|
|
%%--------------------------------------------------------------------
|
|
|
-%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
|
%%
|
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
%% you may not use this file except in compliance with the License.
|
|
|
@@ -16,187 +16,72 @@
|
|
|
|
|
|
-module(emqx_authn_jwks_connector).
|
|
|
|
|
|
--behaviour(gen_server).
|
|
|
+-behaviour(emqx_resource).
|
|
|
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
--include_lib("jose/include/jose_jwk.hrl").
|
|
|
--include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
|
|
+%% callbacks of behaviour emqx_resource
|
|
|
-export([
|
|
|
- start_link/1,
|
|
|
- stop/1
|
|
|
+ on_start/2,
|
|
|
+ on_stop/2,
|
|
|
+ on_query/4,
|
|
|
+ on_health_check/2,
|
|
|
+ connect/1
|
|
|
]).
|
|
|
|
|
|
--export([
|
|
|
- get_jwks/1,
|
|
|
- update/2
|
|
|
-]).
|
|
|
-
|
|
|
-%% gen_server callbacks
|
|
|
--export([
|
|
|
- init/1,
|
|
|
- handle_call/3,
|
|
|
- handle_cast/2,
|
|
|
- handle_info/2,
|
|
|
- terminate/2,
|
|
|
- code_change/3
|
|
|
-]).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% APIs
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-start_link(Opts) ->
|
|
|
- gen_server:start_link(?MODULE, [Opts], []).
|
|
|
-
|
|
|
-stop(Pid) ->
|
|
|
- gen_server:stop(Pid).
|
|
|
-
|
|
|
-get_jwks(Pid) ->
|
|
|
- gen_server:call(Pid, get_cached_jwks, 5000).
|
|
|
-
|
|
|
-update(Pid, Opts) ->
|
|
|
- gen_server:call(Pid, {update, Opts}, 5000).
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% gen_server callbacks
|
|
|
-%%--------------------------------------------------------------------
|
|
|
+-define(DEFAULT_POOL_SIZE, 8).
|
|
|
|
|
|
-init([Opts]) ->
|
|
|
- ok = jose:json_module(jiffy),
|
|
|
- State = handle_options(Opts),
|
|
|
- {ok, refresh_jwks(State)}.
|
|
|
-
|
|
|
-handle_call(get_cached_jwks, _From, #{jwks := Jwks} = State) ->
|
|
|
- {reply, {ok, Jwks}, State};
|
|
|
-handle_call({update, Opts}, _From, _State) ->
|
|
|
- NewState = handle_options(Opts),
|
|
|
- {reply, ok, refresh_jwks(NewState)};
|
|
|
-handle_call(_Req, _From, State) ->
|
|
|
- {reply, ok, State}.
|
|
|
-
|
|
|
-handle_cast(_Msg, State) ->
|
|
|
- {noreply, State}.
|
|
|
-
|
|
|
-handle_info({refresh_jwks, _TRef, refresh}, #{request_id := RequestID} = State) ->
|
|
|
- case RequestID of
|
|
|
- undefined ->
|
|
|
- ok;
|
|
|
+on_start(InstId, Opts) ->
|
|
|
+ PoolName = emqx_plugin_libs_pool:pool_name(InstId),
|
|
|
+ PoolOpts = [
|
|
|
+ {pool_size, maps:get(pool_size, Opts, ?DEFAULT_POOL_SIZE)},
|
|
|
+ {connector_opts, Opts}
|
|
|
+ ],
|
|
|
+ case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, PoolOpts) of
|
|
|
+ ok -> {ok, #{pool_name => PoolName}};
|
|
|
+ {error, Reason} -> {error, Reason}
|
|
|
+ end.
|
|
|
+
|
|
|
+on_stop(_InstId, #{pool_name := PoolName}) ->
|
|
|
+ emqx_plugin_libs_pool:stop_pool(PoolName).
|
|
|
+
|
|
|
+on_query(InstId, get_jwks, AfterQuery, #{pool_name := PoolName}) ->
|
|
|
+ Result = ecpool:pick_and_do(PoolName, {emqx_authn_jwks_client, get_jwks, []}, no_handover),
|
|
|
+ case Result of
|
|
|
+ {error, Reason} ->
|
|
|
+ ?SLOG(error, #{
|
|
|
+ msg => "emqx_authn_jwks_client_query_failed",
|
|
|
+ connector => InstId,
|
|
|
+ command => get_jwks,
|
|
|
+ reason => Reason
|
|
|
+ }),
|
|
|
+ emqx_resource:query_failed(AfterQuery);
|
|
|
_ ->
|
|
|
- ok = httpc:cancel_request(RequestID),
|
|
|
- receive
|
|
|
- {http, _} -> ok
|
|
|
- after 0 ->
|
|
|
- ok
|
|
|
- end
|
|
|
+ emqx_resource:query_success(AfterQuery)
|
|
|
end,
|
|
|
- {noreply, refresh_jwks(State)};
|
|
|
-handle_info(
|
|
|
- {http, {RequestID, Result}},
|
|
|
- #{request_id := RequestID, endpoint := Endpoint} = State0
|
|
|
-) ->
|
|
|
- ?tp(debug, jwks_endpoint_response, #{request_id => RequestID}),
|
|
|
- State1 = State0#{request_id := undefined},
|
|
|
- NewState =
|
|
|
- case Result of
|
|
|
- {error, Reason} ->
|
|
|
- ?SLOG(warning, #{
|
|
|
- msg => "failed_to_request_jwks_endpoint",
|
|
|
- endpoint => Endpoint,
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- State1;
|
|
|
- {StatusLine, Headers, Body} ->
|
|
|
- try
|
|
|
- JWKS = jose_jwk:from(emqx_json:decode(Body, [return_maps])),
|
|
|
- {_, JWKs} = JWKS#jose_jwk.keys,
|
|
|
- State1#{jwks := JWKs}
|
|
|
- catch
|
|
|
- _:_ ->
|
|
|
- ?SLOG(warning, #{
|
|
|
- msg => "invalid_jwks_returned",
|
|
|
- endpoint => Endpoint,
|
|
|
- status => StatusLine,
|
|
|
- headers => Headers,
|
|
|
- body => Body
|
|
|
- }),
|
|
|
- State1
|
|
|
- end
|
|
|
+ Result;
|
|
|
+on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
|
|
|
+ lists:foreach(
|
|
|
+ fun({_, Worker}) ->
|
|
|
+ ok = ecpool_worker:exec(Worker, {emqx_authn_jwks_client, update, [Opts]}, infinity)
|
|
|
end,
|
|
|
- {noreply, NewState};
|
|
|
-handle_info({http, {_, _}}, State) ->
|
|
|
- %% ignore
|
|
|
- {noreply, State};
|
|
|
-handle_info(_Info, State) ->
|
|
|
- {noreply, State}.
|
|
|
-
|
|
|
-terminate(_Reason, State) ->
|
|
|
- _ = cancel_timer(State),
|
|
|
+ ecpool:workers(PoolName)
|
|
|
+ ),
|
|
|
+ emqx_resource:query_success(AfterQuery),
|
|
|
ok.
|
|
|
|
|
|
-code_change(_OldVsn, State, _Extra) ->
|
|
|
- {ok, State}.
|
|
|
-
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-%% Internal functions
|
|
|
-%%--------------------------------------------------------------------
|
|
|
-
|
|
|
-handle_options(#{
|
|
|
- endpoint := Endpoint,
|
|
|
- refresh_interval := RefreshInterval0,
|
|
|
- ssl_opts := SSLOpts
|
|
|
-}) ->
|
|
|
- #{
|
|
|
- endpoint => Endpoint,
|
|
|
- refresh_interval => limit_refresh_interval(RefreshInterval0),
|
|
|
- ssl_opts => maps:to_list(SSLOpts),
|
|
|
- jwks => [],
|
|
|
- request_id => undefined
|
|
|
- }.
|
|
|
-
|
|
|
-refresh_jwks(
|
|
|
- #{
|
|
|
- endpoint := Endpoint,
|
|
|
- ssl_opts := SSLOpts
|
|
|
- } = State
|
|
|
-) ->
|
|
|
- HTTPOpts = [
|
|
|
- {timeout, 5000},
|
|
|
- {connect_timeout, 5000},
|
|
|
- {ssl, SSLOpts}
|
|
|
- ],
|
|
|
- NState =
|
|
|
- case
|
|
|
- httpc:request(
|
|
|
- get,
|
|
|
- {Endpoint, [{"Accept", "application/json"}]},
|
|
|
- HTTPOpts,
|
|
|
- [{body_format, binary}, {sync, false}, {receiver, self()}]
|
|
|
- )
|
|
|
- of
|
|
|
- {error, Reason} ->
|
|
|
- ?tp(warning, jwks_endpoint_request_fail, #{
|
|
|
- endpoint => Endpoint,
|
|
|
- http_opts => HTTPOpts,
|
|
|
- reason => Reason
|
|
|
- }),
|
|
|
- State;
|
|
|
- {ok, RequestID} ->
|
|
|
- ?tp(debug, jwks_endpoint_request_ok, #{request_id => RequestID}),
|
|
|
- State#{request_id := RequestID}
|
|
|
+on_health_check(_InstId, State = #{pool_name := PoolName}) ->
|
|
|
+ emqx_plugin_libs_pool:health_check(
|
|
|
+ PoolName,
|
|
|
+ fun(Pid) ->
|
|
|
+ case emqx_authn_jwks_client:get_jwks(Pid) of
|
|
|
+ {ok, _} -> true;
|
|
|
+ _ -> false
|
|
|
+ end
|
|
|
end,
|
|
|
- ensure_expiry_timer(NState).
|
|
|
-
|
|
|
-ensure_expiry_timer(State = #{refresh_interval := Interval}) ->
|
|
|
- State#{refresh_timer => emqx_misc:start_timer(timer:seconds(Interval), refresh_jwks)}.
|
|
|
-
|
|
|
-cancel_timer(State = #{refresh_timer := undefined}) ->
|
|
|
- State;
|
|
|
-cancel_timer(State = #{refresh_timer := TRef}) ->
|
|
|
- _ = emqx_misc:cancel_timer(TRef),
|
|
|
- State#{refresh_timer := undefined}.
|
|
|
+ State
|
|
|
+ ).
|
|
|
|
|
|
-limit_refresh_interval(Interval) when Interval < 10 ->
|
|
|
- 10;
|
|
|
-limit_refresh_interval(Interval) ->
|
|
|
- Interval.
|
|
|
+connect(Opts) ->
|
|
|
+ ConnectorOpts = proplists:get_value(connector_opts, Opts),
|
|
|
+ emqx_authn_jwks_client:start_link(ConnectorOpts).
|