|
|
@@ -75,7 +75,8 @@
|
|
|
username := binary(),
|
|
|
password := emqx_schema_secret:secret(),
|
|
|
dsn := binary(),
|
|
|
- pool_size := pos_integer()
|
|
|
+ pool_size := pos_integer(),
|
|
|
+ proxy := none | proxy_config()
|
|
|
}.
|
|
|
-type connector_state() :: #{
|
|
|
account := account(),
|
|
|
@@ -106,6 +107,11 @@
|
|
|
-type stage() :: binary().
|
|
|
-type pipe() :: binary().
|
|
|
|
|
|
+-type proxy_config() :: #{
|
|
|
+ host := binary(),
|
|
|
+ port := emqx_schema:port_number()
|
|
|
+}.
|
|
|
+
|
|
|
-type odbc_pool() :: connector_resource_id().
|
|
|
-type http_pool() :: action_resource_id().
|
|
|
-type http_client_config() :: #{
|
|
|
@@ -192,7 +198,8 @@ on_start(ConnResId, ConnConfig) ->
|
|
|
username := Username,
|
|
|
password := Password,
|
|
|
dsn := DSN,
|
|
|
- pool_size := PoolSize
|
|
|
+ pool_size := PoolSize,
|
|
|
+ proxy := ProxyConfig
|
|
|
} = ConnConfig,
|
|
|
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_OPTS),
|
|
|
PoolOpts = [
|
|
|
@@ -202,6 +209,7 @@ on_start(ConnResId, ConnConfig) ->
|
|
|
{server, Server},
|
|
|
{username, Username},
|
|
|
{password, Password},
|
|
|
+ {proxy, ProxyConfig},
|
|
|
{on_disconnect, {?MODULE, disconnect, []}}
|
|
|
],
|
|
|
case emqx_resource_pool:start(ConnResId, ?MODULE, PoolOpts) of
|
|
|
@@ -643,7 +651,8 @@ start_http_pool(ActionResId, ActionConfig, ConnState) ->
|
|
|
connect_timeout := ConnectTimeout,
|
|
|
pipelining := Pipelining,
|
|
|
pool_size := PoolSize,
|
|
|
- max_retries := MaxRetries
|
|
|
+ max_retries := MaxRetries,
|
|
|
+ proxy := ProxyConfig0
|
|
|
},
|
|
|
resource_opts := #{request_ttl := RequestTTL}
|
|
|
} = ActionConfig,
|
|
|
@@ -668,17 +677,31 @@ start_http_pool(ActionResId, ActionConfig, ConnState) ->
|
|
|
]),
|
|
|
JWTConfig = jwt_config(ActionResId, ActionConfig, ConnState),
|
|
|
TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),
|
|
|
- PoolOpts = [
|
|
|
- {host, Host},
|
|
|
- {port, Port},
|
|
|
- {connect_timeout, ConnectTimeout},
|
|
|
- {keepalive, 30_000},
|
|
|
- {pool_type, random},
|
|
|
- {pool_size, PoolSize},
|
|
|
- {transport, tls},
|
|
|
- {transport_opts, TransportOpts},
|
|
|
- {enable_pipelining, Pipelining}
|
|
|
- ],
|
|
|
+ ProxyConfig =
|
|
|
+ case ProxyConfig0 of
|
|
|
+ none ->
|
|
|
+ [];
|
|
|
+ #{host := ProxyHost, port := ProxyPort} ->
|
|
|
+ [
|
|
|
+ {proxy, #{
|
|
|
+ host => str(ProxyHost),
|
|
|
+ port => ProxyPort
|
|
|
+ }}
|
|
|
+ ]
|
|
|
+ end,
|
|
|
+ PoolOpts =
|
|
|
+ ProxyConfig ++
|
|
|
+ [
|
|
|
+ {host, Host},
|
|
|
+ {port, Port},
|
|
|
+ {connect_timeout, ConnectTimeout},
|
|
|
+ {keepalive, 30_000},
|
|
|
+ {pool_type, random},
|
|
|
+ {pool_size, PoolSize},
|
|
|
+ {transport, tls},
|
|
|
+ {transport_opts, TransportOpts},
|
|
|
+ {enable_pipelining, Pipelining}
|
|
|
+ ],
|
|
|
case ehttpc_sup:start_pool(ActionResId, PoolOpts) of
|
|
|
{ok, _} ->
|
|
|
{ok, #{
|
|
|
@@ -691,6 +714,9 @@ start_http_pool(ActionResId, ActionConfig, ConnState) ->
|
|
|
request_ttl => RequestTTL
|
|
|
}
|
|
|
}};
|
|
|
+ {error, {already_started, _}} ->
|
|
|
+ _ = ehttpc_sup:stop_pool(ActionResId),
|
|
|
+ start_http_pool(ActionResId, ActionConfig, ConnState);
|
|
|
{error, Reason} ->
|
|
|
{error, Reason}
|
|
|
end.
|
|
|
@@ -803,6 +829,10 @@ conn_str([{username, Username} | Opts], Acc) ->
|
|
|
conn_str(Opts, ["uid=" ++ str(Username) | Acc]);
|
|
|
conn_str([{password, Password} | Opts], Acc) ->
|
|
|
conn_str(Opts, ["pwd=" ++ str(emqx_secret:unwrap(Password)) | Acc]);
|
|
|
+conn_str([{proxy, none} | Opts], Acc) ->
|
|
|
+ conn_str(Opts, Acc);
|
|
|
+conn_str([{proxy, #{host := Host, port := Port}} | Opts], Acc) ->
|
|
|
+ conn_str(Opts, ["proxy=" ++ str(Host) ++ ":" ++ str(Port) | Acc]);
|
|
|
conn_str([{_, _} | Opts], Acc) ->
|
|
|
conn_str(Opts, Acc).
|
|
|
|