|
@@ -17,95 +17,95 @@
|
|
|
%% Define the default actions.
|
|
%% Define the default actions.
|
|
|
-module(emqx_web_hook_actions).
|
|
-module(emqx_web_hook_actions).
|
|
|
|
|
|
|
|
|
|
+-export([ on_resource_create/2
|
|
|
|
|
+ , on_get_resource_status/2
|
|
|
|
|
+ , on_resource_destroy/2
|
|
|
|
|
+ ]).
|
|
|
|
|
+
|
|
|
|
|
+-export([ on_action_create_data_to_webserver/2
|
|
|
|
|
+ , on_action_data_to_webserver/2
|
|
|
|
|
+ ]).
|
|
|
|
|
+
|
|
|
|
|
+-export_type([action_fun/0]).
|
|
|
|
|
+
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
|
|
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
|
|
|
--include("emqx_web_hook.hrl").
|
|
|
|
|
|
|
+
|
|
|
|
|
+-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())).
|
|
|
|
|
+
|
|
|
|
|
+-type(url() :: binary()).
|
|
|
|
|
|
|
|
-define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
|
|
-define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
|
|
|
-define(RESOURCE_CONFIG_SPEC, #{
|
|
-define(RESOURCE_CONFIG_SPEC, #{
|
|
|
- url => #{
|
|
|
|
|
- order => 1,
|
|
|
|
|
- type => string,
|
|
|
|
|
- format => url,
|
|
|
|
|
- required => true,
|
|
|
|
|
- title => #{en => <<"URL">>,
|
|
|
|
|
- zh => <<"URL"/utf8>>},
|
|
|
|
|
- description => #{en => <<"The URL of the server that will receive the Webhook requests.">>,
|
|
|
|
|
- zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}
|
|
|
|
|
- },
|
|
|
|
|
- connect_timeout => #{
|
|
|
|
|
- order => 2,
|
|
|
|
|
- type => number,
|
|
|
|
|
- default => 5,
|
|
|
|
|
- title => #{en => <<"Connect Timeout">>,
|
|
|
|
|
- zh => <<"连接超时时间"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Connect timeout in seconds">>,
|
|
|
|
|
- zh => <<"连接超时时间,单位秒"/utf8>>}},
|
|
|
|
|
- request_timeout => #{
|
|
|
|
|
- order => 3,
|
|
|
|
|
- type => number,
|
|
|
|
|
- default => 5,
|
|
|
|
|
- title => #{en => <<"Request Timeout">>,
|
|
|
|
|
- zh => <<"请求超时时间时间"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Request timeout in seconds">>,
|
|
|
|
|
- zh => <<"请求超时时间,单位秒"/utf8>>}},
|
|
|
|
|
- cacertfile => #{
|
|
|
|
|
- order => 4,
|
|
|
|
|
- type => file,
|
|
|
|
|
- default => <<>>,
|
|
|
|
|
- title => #{en => <<"CA Certificate File">>,
|
|
|
|
|
- zh => <<"CA 证书文件"/utf8>>},
|
|
|
|
|
- description => #{en => <<"CA certificate file.">>,
|
|
|
|
|
- zh => <<"CA 证书文件。"/utf8>>}
|
|
|
|
|
- },
|
|
|
|
|
- certfile => #{
|
|
|
|
|
- order => 5,
|
|
|
|
|
- type => file,
|
|
|
|
|
- default => <<>>,
|
|
|
|
|
- title => #{en => <<"Certificate File">>,
|
|
|
|
|
- zh => <<"证书文件"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Certificate file.">>,
|
|
|
|
|
- zh => <<"证书文件。"/utf8>>}
|
|
|
|
|
- },
|
|
|
|
|
- keyfile => #{
|
|
|
|
|
- order => 6,
|
|
|
|
|
- type => file,
|
|
|
|
|
- default => <<>>,
|
|
|
|
|
- title => #{en => <<"Private Key File">>,
|
|
|
|
|
- zh => <<"私钥文件"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Private key file.">>,
|
|
|
|
|
- zh => <<"私钥文件。"/utf8>>}
|
|
|
|
|
- },
|
|
|
|
|
- verify => #{
|
|
|
|
|
- order => 7,
|
|
|
|
|
|
|
+ url => #{order => 1,
|
|
|
|
|
+ type => string,
|
|
|
|
|
+ format => url,
|
|
|
|
|
+ required => true,
|
|
|
|
|
+ title => #{en => <<"Request URL">>,
|
|
|
|
|
+ zh => <<"请求 URL"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"The URL of the server that will receive the Webhook requests.">>,
|
|
|
|
|
+ zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}},
|
|
|
|
|
+ connect_timeout => #{order => 2,
|
|
|
|
|
+ type => string,
|
|
|
|
|
+ default => <<"5s">>,
|
|
|
|
|
+ title => #{en => <<"Connect Timeout">>,
|
|
|
|
|
+ zh => <<"连接超时时间"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Connect Timeout In Seconds">>,
|
|
|
|
|
+ zh => <<"连接超时时间"/utf8>>}},
|
|
|
|
|
+ request_timeout => #{order => 3,
|
|
|
|
|
+ type => string,
|
|
|
|
|
+ default => <<"5s">>,
|
|
|
|
|
+ title => #{en => <<"Request Timeout">>,
|
|
|
|
|
+ zh => <<"请求超时时间时间"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Request Timeout In Seconds">>,
|
|
|
|
|
+ zh => <<"请求超时时间"/utf8>>}},
|
|
|
|
|
+ pool_size => #{order => 4,
|
|
|
|
|
+ type => number,
|
|
|
|
|
+ default => 8,
|
|
|
|
|
+ title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Connection Pool">>,
|
|
|
|
|
+ zh => <<"连接池大小"/utf8>>}
|
|
|
|
|
+ },
|
|
|
|
|
+ cacertfile => #{order => 5,
|
|
|
|
|
+ type => file,
|
|
|
|
|
+ default => <<"">>,
|
|
|
|
|
+ title => #{en => <<"CA Certificate File">>,
|
|
|
|
|
+ zh => <<"CA 证书文件"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"CA Certificate file">>,
|
|
|
|
|
+ zh => <<"CA 证书文件"/utf8>>}},
|
|
|
|
|
+ keyfile => #{order => 6,
|
|
|
|
|
+ type => file,
|
|
|
|
|
+ default => <<"">>,
|
|
|
|
|
+ title =>#{en => <<"SSL Key">>,
|
|
|
|
|
+ zh => <<"SSL Key"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Your ssl keyfile">>,
|
|
|
|
|
+ zh => <<"SSL 私钥"/utf8>>}},
|
|
|
|
|
+ certfile => #{order => 7,
|
|
|
|
|
+ type => file,
|
|
|
|
|
+ default => <<"">>,
|
|
|
|
|
+ title =>#{en => <<"SSL Cert">>,
|
|
|
|
|
+ zh => <<"SSL Cert"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Your ssl certfile">>,
|
|
|
|
|
+ zh => <<"SSL 证书"/utf8>>}},
|
|
|
|
|
+ verify => #{order => 8,
|
|
|
type => boolean,
|
|
type => boolean,
|
|
|
default => false,
|
|
default => false,
|
|
|
- title => #{en => <<"Verify">>,
|
|
|
|
|
- zh => <<"Verify"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Turn on peer certificate verification.">>,
|
|
|
|
|
- zh => <<"是否开启对端证书验证。"/utf8>>}
|
|
|
|
|
- },
|
|
|
|
|
- pool_size => #{
|
|
|
|
|
- order => 8,
|
|
|
|
|
- type => number,
|
|
|
|
|
- default => 32,
|
|
|
|
|
- title => #{en => <<"Pool Size">>,
|
|
|
|
|
- zh => <<"连接池大小"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Pool size for HTTP server.">>,
|
|
|
|
|
- zh => <<"HTTP server 连接池大小。"/utf8>>}
|
|
|
|
|
- }
|
|
|
|
|
- }).
|
|
|
|
|
|
|
+ title =>#{en => <<"Verify Server Certfile">>,
|
|
|
|
|
+ zh => <<"校验服务器证书"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>,
|
|
|
|
|
+ zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书,如果需要校验,请设置成true。"/utf8>>}}
|
|
|
|
|
+}).
|
|
|
|
|
|
|
|
-define(ACTION_PARAM_RESOURCE, #{
|
|
-define(ACTION_PARAM_RESOURCE, #{
|
|
|
- order => 0,
|
|
|
|
|
- type => string,
|
|
|
|
|
- required => true,
|
|
|
|
|
- title => #{en => <<"Resource ID">>,
|
|
|
|
|
- zh => <<"资源 ID"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Bind a resource to this action.">>,
|
|
|
|
|
- zh => <<"给动作绑定一个资源。"/utf8>>}
|
|
|
|
|
- }).
|
|
|
|
|
|
|
+ order => 0,
|
|
|
|
|
+ type => string,
|
|
|
|
|
+ required => true,
|
|
|
|
|
+ title => #{en => <<"Resource ID">>,
|
|
|
|
|
+ zh => <<"资源 ID"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Bind a resource to this action">>,
|
|
|
|
|
+ zh => <<"给动作绑定一个资源"/utf8>>}
|
|
|
|
|
+}).
|
|
|
|
|
|
|
|
-define(ACTION_DATA_SPEC, #{
|
|
-define(ACTION_DATA_SPEC, #{
|
|
|
'$resource' => ?ACTION_PARAM_RESOURCE,
|
|
'$resource' => ?ACTION_PARAM_RESOURCE,
|
|
@@ -140,7 +140,7 @@
|
|
|
description => #{en => <<"HTTP headers.">>,
|
|
description => #{en => <<"HTTP headers.">>,
|
|
|
zh => <<"HTTP headers。"/utf8>>}},
|
|
zh => <<"HTTP headers。"/utf8>>}},
|
|
|
body => #{
|
|
body => #{
|
|
|
- order => 5,
|
|
|
|
|
|
|
+ order => 4,
|
|
|
type => string,
|
|
type => string,
|
|
|
input => textarea,
|
|
input => textarea,
|
|
|
required => false,
|
|
required => false,
|
|
@@ -153,39 +153,29 @@
|
|
|
"默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}}
|
|
"默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}}
|
|
|
}).
|
|
}).
|
|
|
|
|
|
|
|
--resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK,
|
|
|
|
|
- create => on_resource_create,
|
|
|
|
|
- status => on_get_resource_status,
|
|
|
|
|
- destroy => on_resource_destroy,
|
|
|
|
|
- params => ?RESOURCE_CONFIG_SPEC,
|
|
|
|
|
- title => #{en => <<"WebHook">>,
|
|
|
|
|
- zh => <<"WebHook"/utf8>>},
|
|
|
|
|
- description => #{en => <<"WebHook">>,
|
|
|
|
|
- zh => <<"WebHook"/utf8>>}
|
|
|
|
|
- }).
|
|
|
|
|
|
|
+-resource_type(
|
|
|
|
|
+ #{name => ?RESOURCE_TYPE_WEBHOOK,
|
|
|
|
|
+ create => on_resource_create,
|
|
|
|
|
+ status => on_get_resource_status,
|
|
|
|
|
+ destroy => on_resource_destroy,
|
|
|
|
|
+ params => ?RESOURCE_CONFIG_SPEC,
|
|
|
|
|
+ title => #{en => <<"WebHook">>,
|
|
|
|
|
+ zh => <<"WebHook"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"WebHook">>,
|
|
|
|
|
+ zh => <<"WebHook"/utf8>>}
|
|
|
|
|
+}).
|
|
|
|
|
|
|
|
-rule_action(#{name => data_to_webserver,
|
|
-rule_action(#{name => data_to_webserver,
|
|
|
- category => data_forward,
|
|
|
|
|
- for => '$any',
|
|
|
|
|
- create => on_action_create_data_to_webserver,
|
|
|
|
|
- params => ?ACTION_DATA_SPEC,
|
|
|
|
|
- types => [?RESOURCE_TYPE_WEBHOOK],
|
|
|
|
|
- title => #{en => <<"Data to Web Server">>,
|
|
|
|
|
- zh => <<"发送数据到 Web 服务"/utf8>>},
|
|
|
|
|
- description => #{en => <<"Forward Messages to Web Server">>,
|
|
|
|
|
- zh => <<"将数据转发给 Web 服务"/utf8>>}
|
|
|
|
|
- }).
|
|
|
|
|
-
|
|
|
|
|
--type(url() :: binary()).
|
|
|
|
|
-
|
|
|
|
|
--export([ on_resource_create/2
|
|
|
|
|
- , on_get_resource_status/2
|
|
|
|
|
- , on_resource_destroy/2
|
|
|
|
|
- ]).
|
|
|
|
|
-
|
|
|
|
|
--export([ on_action_create_data_to_webserver/2
|
|
|
|
|
- , on_action_data_to_webserver/2
|
|
|
|
|
- ]).
|
|
|
|
|
|
|
+ category => data_forward,
|
|
|
|
|
+ for => '$any',
|
|
|
|
|
+ create => on_action_create_data_to_webserver,
|
|
|
|
|
+ params => ?ACTION_DATA_SPEC,
|
|
|
|
|
+ types => [?RESOURCE_TYPE_WEBHOOK],
|
|
|
|
|
+ title => #{en => <<"Data to Web Server">>,
|
|
|
|
|
+ zh => <<"发送数据到 Web 服务"/utf8>>},
|
|
|
|
|
+ description => #{en => <<"Forward Messages to Web Server">>,
|
|
|
|
|
+ zh => <<"将数据转发给 Web 服务"/utf8>>}
|
|
|
|
|
+}).
|
|
|
|
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%%------------------------------------------------------------------------------
|
|
|
%% Actions for web hook
|
|
%% Actions for web hook
|
|
@@ -194,7 +184,7 @@
|
|
|
-spec(on_resource_create(binary(), map()) -> map()).
|
|
-spec(on_resource_create(binary(), map()) -> map()).
|
|
|
on_resource_create(ResId, Conf) ->
|
|
on_resource_create(ResId, Conf) ->
|
|
|
{ok, _} = application:ensure_all_started(ehttpc),
|
|
{ok, _} = application:ensure_all_started(ehttpc),
|
|
|
- Options = pool_opts(Conf),
|
|
|
|
|
|
|
+ Options = pool_opts(Conf, ResId),
|
|
|
PoolName = pool_name(ResId),
|
|
PoolName = pool_name(ResId),
|
|
|
case test_http_connect(Conf) of
|
|
case test_http_connect(Conf) of
|
|
|
true -> ok;
|
|
true -> ok;
|
|
@@ -299,7 +289,7 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
|
|
|
path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
|
|
path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
|
|
|
headers => NHeaders,
|
|
headers => NHeaders,
|
|
|
body => maps:get(<<"body">>, Params, <<>>),
|
|
body => maps:get(<<"body">>, Params, <<>>),
|
|
|
- request_timeout => timer:seconds(maps:get(<<"request_timeout">>, Params, 5)),
|
|
|
|
|
|
|
+ request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
|
|
|
pool => maps:get(<<"pool">>, Params)}
|
|
pool => maps:get(<<"pool">>, Params)}
|
|
|
catch _:_ ->
|
|
catch _:_ ->
|
|
|
throw({invalid_params, Params})
|
|
throw({invalid_params, Params})
|
|
@@ -328,50 +318,53 @@ str(Str) when is_list(Str) -> Str;
|
|
|
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
|
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
|
|
str(Bin) when is_binary(Bin) -> binary_to_list(Bin).
|
|
str(Bin) when is_binary(Bin) -> binary_to_list(Bin).
|
|
|
|
|
|
|
|
-pool_opts(Params = #{<<"url">> := URL}) ->
|
|
|
|
|
- #{host := Host0,
|
|
|
|
|
- scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(URL)),
|
|
|
|
|
- Port = maps:get(port, URIMap, case Scheme of
|
|
|
|
|
- "https" -> 443;
|
|
|
|
|
- _ -> 80
|
|
|
|
|
- end),
|
|
|
|
|
|
|
+add_default_scheme(<<"http://", _/binary>> = URL) ->
|
|
|
|
|
+ URL;
|
|
|
|
|
+add_default_scheme(<<"https://", _/binary>> = URL) ->
|
|
|
|
|
+ URL;
|
|
|
|
|
+add_default_scheme(URL) ->
|
|
|
|
|
+ <<"http://", URL/binary>>.
|
|
|
|
|
+
|
|
|
|
|
+pool_opts(Params = #{<<"url">> := URL}, ResId) ->
|
|
|
|
|
+ #{host := Host0, scheme := Scheme} = URIMap =
|
|
|
|
|
+ uri_string:parse(binary_to_list(add_default_scheme(URL))),
|
|
|
|
|
+ DefaultPort = case is_https(Scheme) of
|
|
|
|
|
+ true -> 443;
|
|
|
|
|
+ false -> 80
|
|
|
|
|
+ end,
|
|
|
|
|
+ Port = maps:get(port, URIMap, DefaultPort),
|
|
|
PoolSize = maps:get(<<"pool_size">>, Params, 32),
|
|
PoolSize = maps:get(<<"pool_size">>, Params, 32),
|
|
|
- ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)),
|
|
|
|
|
|
|
+ ConnectTimeout =
|
|
|
|
|
+ cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))),
|
|
|
{Inet, Host} = parse_host(Host0),
|
|
{Inet, Host} = parse_host(Host0),
|
|
|
- MoreOpts = case Scheme of
|
|
|
|
|
- "http" ->
|
|
|
|
|
- [{transport_opts, [Inet]}];
|
|
|
|
|
- "https" ->
|
|
|
|
|
- KeyFile = maps:get(<<"keyfile">>, Params),
|
|
|
|
|
- CertFile = maps:get(<<"certfile">>, Params),
|
|
|
|
|
- CACertFile = maps:get(<<"cacertfile">>, Params),
|
|
|
|
|
- VerifyType = case maps:get(<<"verify">>, Params) of
|
|
|
|
|
- true -> verify_peer;
|
|
|
|
|
- false -> verify_none
|
|
|
|
|
- end,
|
|
|
|
|
- TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> ->
|
|
|
|
|
- false;
|
|
|
|
|
- (_) ->
|
|
|
|
|
- true
|
|
|
|
|
- end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]),
|
|
|
|
|
- NTLSOpts = [ {verify, VerifyType}
|
|
|
|
|
- , {versions, emqx_tls_lib:default_versions()}
|
|
|
|
|
- , {ciphers, emqx_tls_lib:default_ciphers()}
|
|
|
|
|
- | TLSOpts
|
|
|
|
|
- ],
|
|
|
|
|
- [{transport, ssl}, {transport_opts, [Inet | NTLSOpts]}]
|
|
|
|
|
- end,
|
|
|
|
|
|
|
+ TransportOpts =
|
|
|
|
|
+ case is_https(Scheme) of
|
|
|
|
|
+ true -> [Inet | get_ssl_opts(Params, ResId)];
|
|
|
|
|
+ false -> [Inet]
|
|
|
|
|
+ end,
|
|
|
|
|
+ Opts = case is_https(Scheme) of
|
|
|
|
|
+ true -> [{transport_opts, TransportOpts}, {transport, ssl}];
|
|
|
|
|
+ false -> [{transport_opts, TransportOpts}]
|
|
|
|
|
+ end,
|
|
|
[{host, Host},
|
|
[{host, Host},
|
|
|
{port, Port},
|
|
{port, Port},
|
|
|
{pool_size, PoolSize},
|
|
{pool_size, PoolSize},
|
|
|
{pool_type, hash},
|
|
{pool_type, hash},
|
|
|
{connect_timeout, ConnectTimeout},
|
|
{connect_timeout, ConnectTimeout},
|
|
|
{retry, 5},
|
|
{retry, 5},
|
|
|
- {retry_timeout, 1000}] ++ MoreOpts.
|
|
|
|
|
|
|
+ {retry_timeout, 1000} | Opts].
|
|
|
|
|
|
|
|
pool_name(ResId) ->
|
|
pool_name(ResId) ->
|
|
|
list_to_atom("webhook:" ++ str(ResId)).
|
|
list_to_atom("webhook:" ++ str(ResId)).
|
|
|
|
|
|
|
|
|
|
+is_https(Scheme) when is_list(Scheme) -> is_https(list_to_binary(Scheme));
|
|
|
|
|
+is_https(<<"https", _/binary>>) -> true;
|
|
|
|
|
+is_https(_) -> false.
|
|
|
|
|
+
|
|
|
|
|
+get_ssl_opts(Opts, ResId) ->
|
|
|
|
|
+ Dir = filename:join([emqx:get_env(data_dir), "rule", ResId]),
|
|
|
|
|
+ [{ssl, true}, {ssl_opts, emqx_plugin_libs_ssl:save_files_return_opts(Opts, Dir)}].
|
|
|
|
|
+
|
|
|
parse_host(Host) ->
|
|
parse_host(Host) ->
|
|
|
case inet:parse_address(Host) of
|
|
case inet:parse_address(Host) of
|
|
|
{ok, Addr} when size(Addr) =:= 4 -> {inet, Addr};
|
|
{ok, Addr} when size(Addr) =:= 4 -> {inet, Addr};
|