Explorar o código

chore(webhook): merge enterprise to opensource

Zaiming Shi %!s(int64=5) %!d(string=hai) anos
pai
achega
fcfcbf139d

+ 2 - 2
apps/emqx_web_hook/etc/emqx_web_hook.conf

@@ -5,10 +5,10 @@
 ## Webhook URL
 ##
 ## Value: String
-web.hook.url = http://127.0.0.1:80
+web.hook.api.url = http://127.0.0.1:80
 
 ## HTTP Headers
-## 
+##
 ## Example:
 ## 1. web.hook.headers.content-type = application/json
 ## 2. web.hook.headers.accept = *

+ 1 - 1
apps/emqx_web_hook/include/emqx_web_hook.hrl

@@ -1 +1 @@
--define(APP, emqx_web_hook).
+-define(APP, emqx_web_hook).

+ 4 - 1
apps/emqx_web_hook/priv/emqx_web_hook.schema

@@ -1,7 +1,7 @@
 %%-*- mode: erlang -*-
 %% EMQ X R3.0 config mapping
 
-{mapping, "web.hook.url", "emqx_web_hook.url", [
+{mapping, "web.hook.api.url", "emqx_web_hook.url", [
   {datatype, string}
 ]}.
 
@@ -15,14 +15,17 @@
 ]}.
 
 {mapping, "web.hook.ssl.cacertfile", "emqx_web_hook.cacertfile", [
+  {default, ""},
   {datatype, string}
 ]}.
 
 {mapping, "web.hook.ssl.certfile", "emqx_web_hook.certfile", [
+  {default, ""},
   {datatype, string}
 ]}.
 
 {mapping, "web.hook.ssl.keyfile", "emqx_web_hook.keyfile", [
+  {default, ""},
   {datatype, string}
 ]}.
 

+ 0 - 11
apps/emqx_web_hook/rebar.config

@@ -18,14 +18,3 @@
 {cover_enabled, true}.
 {cover_opts, [verbose]}.
 {cover_export_enabled, true}.
-
-{profiles,
- [{test,
-   [{erl_opts, [export_all, nowarn_export_all]},
-    {deps,
-     [{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}},
-      {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}},
-      {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}
-     ]}
-   ]}
- ]}.

+ 10 - 0
apps/emqx_web_hook/src/emqx_web_hook.appup.src

@@ -0,0 +1,10 @@
+%% -*-: erlang -*-
+
+{VSN,
+  [
+    {<<".*">>, []}
+  ],
+  [
+    {<<".*">>, []}
+  ]
+}.

+ 172 - 139
apps/emqx_web_hook/src/emqx_web_hook_actions.erl

@@ -17,95 +17,111 @@
 %% Define the default actions.
 -module(emqx_web_hook_actions).
 
+-export([ on_resource_create/2
+        , on_get_resource_status/2
+        , on_resource_destroy/2
+        ]).
+
+-export([ on_action_create_data_to_webserver/2
+        , on_action_data_to_webserver/2
+        ]).
+
+-export_type([action_fun/0]).
+
 -include_lib("emqx/include/emqx.hrl").
 -include_lib("emqx/include/logger.hrl").
 -include_lib("emqx_rule_engine/include/rule_actions.hrl").
--include("emqx_web_hook.hrl").
+
+-type(action_fun() :: fun((Data :: map(), Envs :: map()) -> Result :: any())).
+
+-type(url() :: binary()).
 
 -define(RESOURCE_TYPE_WEBHOOK, 'web_hook').
 -define(RESOURCE_CONFIG_SPEC, #{
-            url => #{
-                order => 1,
+    method => #{order => 1,
                 type => string,
-                format => url,
-                required => true,
-                title => #{en => <<"URL">>,
-                           zh => <<"URL"/utf8>>},
-                description => #{en => <<"The URL of the server that will receive the Webhook requests.">>,
-                                 zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}
-            },
-            connect_timeout => #{
-                order => 2,
-                type => number,
-                default => 5,
-                title => #{en => <<"Connect Timeout">>,
-                           zh => <<"连接超时时间"/utf8>>},
-                description => #{en => <<"Connect timeout in seconds">>,
-                                 zh => <<"连接超时时间,单位秒"/utf8>>}},
-            request_timeout => #{
-                order => 3,
-                type => number,
-                default => 5,
-                title => #{en => <<"Request Timeout">>,
-                           zh => <<"请求超时时间时间"/utf8>>},
-                description => #{en => <<"Request timeout in seconds">>,
-                                 zh => <<"请求超时时间,单位秒"/utf8>>}},
-            cacertfile => #{
-                order => 4,
-                type => file,
-                default => <<>>,
-                title => #{en => <<"CA Certificate File">>,
-                           zh => <<"CA 证书文件"/utf8>>},
-                description => #{en => <<"CA certificate file.">>,
-                                 zh => <<"CA 证书文件。"/utf8>>}
-            },
-            certfile => #{
-                order => 5,
-                type => file,
-                default => <<>>,
-                title => #{en => <<"Certificate File">>,
-                           zh => <<"证书文件"/utf8>>},
-                description => #{en => <<"Certificate file.">>,
-                                 zh => <<"证书文件。"/utf8>>}
-            },
-            keyfile => #{
-                order => 6,
-                type => file,
-                default => <<>>,
-                title => #{en => <<"Private Key File">>,
-                           zh => <<"私钥文件"/utf8>>},
-                description => #{en => <<"Private key file.">>,
-                                 zh => <<"私钥文件。"/utf8>>}
-            },
-            verify => #{
-                order => 7,
+                enum => [<<"PUT">>,<<"POST">>],
+                default => <<"POST">>,
+                title => #{en => <<"Request Method">>,
+                           zh => <<"请求方法"/utf8>>},
+                description => #{en => <<"Request Method">>,
+                                 zh => <<"请求方法"/utf8>>}},
+    url => #{order => 2,
+             type => string,
+             format => url,
+             required => true,
+             title => #{en => <<"Request URL">>,
+                        zh => <<"请求 URL"/utf8>>},
+             description => #{en => <<"The URL of the server that will receive the Webhook requests.">>,
+                              zh => <<"用于接收 Webhook 请求的服务器的 URL。"/utf8>>}},
+    headers => #{order => 3,
+                 type => object,
+                 schema => #{},
+                 default => #{},
+                 title => #{en => <<"Request Header">>,
+                            zh => <<"请求头"/utf8>>},
+                 description => #{en => <<"Request Header">>,
+                                  zh => <<"请求头"/utf8>>}},
+    connect_timeout => #{order => 4,
+                         type => string,
+                         default => <<"5s">>,
+                         title => #{en => <<"Connect Timeout">>,
+                                    zh => <<"连接超时时间"/utf8>>},
+                         description => #{en => <<"Connect Timeout In Seconds">>,
+                                          zh => <<"连接超时时间"/utf8>>}},
+    request_timeout => #{order => 5,
+                         type => string,
+                         default => <<"5s">>,
+                         title => #{en => <<"Request Timeout">>,
+                                    zh => <<"请求超时时间时间"/utf8>>},
+                         description => #{en => <<"Request Timeout In Seconds">>,
+                                          zh => <<"请求超时时间"/utf8>>}},
+    pool_size => #{order => 6,
+                   type => number,
+                   default => 8,
+                   title => #{en => <<"Pool Size">>, zh => <<"连接池大小"/utf8>>},
+                   description => #{en => <<"Connection Pool">>,
+                                    zh => <<"连接池大小"/utf8>>}
+                },
+    cacertfile => #{order => 7,
+                    type => file,
+                    default => <<"">>,
+                    title => #{en => <<"CA Certificate File">>,
+                               zh => <<"CA 证书文件"/utf8>>},
+                    description => #{en => <<"CA Certificate file">>,
+                                     zh => <<"CA 证书文件"/utf8>>}},
+    keyfile => #{order => 8,
+                 type => file,
+                 default => <<"">>,
+                 title =>#{en => <<"SSL Key">>,
+                           zh => <<"SSL Key"/utf8>>},
+                 description => #{en => <<"Your ssl keyfile">>,
+                                  zh => <<"SSL 私钥"/utf8>>}},
+    certfile => #{order => 9,
+                  type => file,
+                  default => <<"">>,
+                  title =>#{en => <<"SSL Cert">>,
+                            zh => <<"SSL Cert"/utf8>>},
+                  description => #{en => <<"Your ssl certfile">>,
+                                   zh => <<"SSL 证书"/utf8>>}},
+    verify => #{order => 10,
                 type => boolean,
                 default => false,
-                title => #{en => <<"Verify">>,
-                           zh => <<"Verify"/utf8>>},
-                description => #{en => <<"Turn on peer certificate verification.">>,
-                                 zh => <<"是否开启对端证书验证。"/utf8>>}
-            },
-            pool_size => #{
-                order => 8,
-                type => number,
-                default => 32,
-                title => #{en => <<"Pool Size">>,
-                           zh => <<"连接池大小"/utf8>>},
-                description => #{en => <<"Pool size for HTTP server.">>,
-                                 zh => <<"HTTP server 连接池大小。"/utf8>>}
-            }
-        }).
+                title =>#{en => <<"Verify Server Certfile">>,
+                          zh => <<"校验服务器证书"/utf8>>},
+                description => #{en => <<"Whether to verify the server certificate. By default, the client will not verify the server's certificate. If verification is required, please set it to true.">>,
+                                 zh => <<"是否校验服务器证书。 默认客户端不会去校验服务器的证书,如果需要校验,请设置成true。"/utf8>>}}
+}).
 
 -define(ACTION_PARAM_RESOURCE, #{
-            order => 0,
-            type => string,
-            required => true,
-            title => #{en => <<"Resource ID">>,
-                       zh => <<"资源 ID"/utf8>>},
-            description => #{en => <<"Bind a resource to this action.">>,
-                             zh => <<"给动作绑定一个资源"/utf8>>}
-        }).
+    order => 0,
+    type => string,
+    required => true,
+    title => #{en => <<"Resource ID">>,
+               zh => <<"资源 ID"/utf8>>},
+    description => #{en => <<"Bind a resource to this action">>,
+                     zh => <<"给动作绑定一个资源"/utf8>>}
+}).
 
 -define(ACTION_DATA_SPEC, #{
             '$resource' => ?ACTION_PARAM_RESOURCE,
@@ -153,39 +169,29 @@
                                          "默认 HTTP 请求体的内容为规则输出的所有字段的键和值构成的 JSON 字符串。"/utf8>>}}
             }).
 
--resource_type(#{name => ?RESOURCE_TYPE_WEBHOOK,
-                 create => on_resource_create,
-                 status => on_get_resource_status,
-                 destroy => on_resource_destroy,
-                 params => ?RESOURCE_CONFIG_SPEC,
-                 title => #{en => <<"WebHook">>,
-                            zh => <<"WebHook"/utf8>>},
-                 description => #{en => <<"WebHook">>,
-                                  zh => <<"WebHook"/utf8>>}
-                }).
+-resource_type(
+    #{name => ?RESOURCE_TYPE_WEBHOOK,
+      create => on_resource_create,
+      status => on_get_resource_status,
+      destroy => on_resource_destroy,
+      params => ?RESOURCE_CONFIG_SPEC,
+      title => #{en => <<"WebHook">>,
+                 zh => <<"WebHook"/utf8>>},
+      description => #{en => <<"WebHook">>,
+                       zh => <<"WebHook"/utf8>>}
+}).
 
 -rule_action(#{name => data_to_webserver,
-               category => data_forward,
-               for => '$any',
-               create => on_action_create_data_to_webserver,
-               params => ?ACTION_DATA_SPEC,
-               types => [?RESOURCE_TYPE_WEBHOOK],
-               title => #{en => <<"Data to Web Server">>,
-                          zh => <<"发送数据到 Web 服务"/utf8>>},
-               description => #{en => <<"Forward Messages to Web Server">>,
-                                zh => <<"将数据转发给 Web 服务"/utf8>>}
-              }).
-
--type(url() :: binary()).
-
--export([ on_resource_create/2
-        , on_get_resource_status/2
-        , on_resource_destroy/2
-        ]).
-
--export([ on_action_create_data_to_webserver/2
-        , on_action_data_to_webserver/2
-        ]).
+    category => data_forward,
+    for => '$any',
+    create => on_action_create_data_to_webserver,
+    params => ?ACTION_DATA_SPEC,
+    types => [?RESOURCE_TYPE_WEBHOOK],
+    title => #{en => <<"Data to Web Server">>,
+               zh => <<"发送数据到 Web 服务"/utf8>>},
+    description => #{en => <<"Forward Messages to Web Server">>,
+                     zh => <<"将数据转发给 Web 服务"/utf8>>}
+}).
 
 %%------------------------------------------------------------------------------
 %% Actions for web hook
@@ -194,7 +200,7 @@
 -spec(on_resource_create(binary(), map()) -> map()).
 on_resource_create(ResId, Conf) ->
     {ok, _} = application:ensure_all_started(ehttpc),
-    Options = pool_opts(Conf),
+    Options = pool_opts(Conf, ResId),
     PoolName = pool_name(ResId),
     case test_http_connect(Conf) of
         true -> ok;
@@ -299,7 +305,7 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
           path => path(filename:join(CommonPath, maps:get(<<"path">>, Params, <<>>))),
           headers => NHeaders,
           body => maps:get(<<"body">>, Params, <<>>),
-          request_timeout => timer:seconds(maps:get(<<"request_timeout">>, Params, 5)),
+          request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
           pool => maps:get(<<"pool">>, Params)}
     catch _:_ ->
         throw({invalid_params, Params})
@@ -328,50 +334,77 @@ str(Str) when is_list(Str) -> Str;
 str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
 str(Bin) when is_binary(Bin) -> binary_to_list(Bin).
 
-pool_opts(Params = #{<<"url">> := URL}) ->
-    #{host := Host0,
-      scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(URL)),
+add_default_scheme(<<"http://", _/binary>> = URL) ->
+    URL;
+add_default_scheme(<<"https://", _/binary>> = URL) ->
+    URL;
+add_default_scheme(URL) ->
+    <<"http://", URL/binary>>.
+
+pool_opts(Params = #{<<"url">> := URL}, ResId) ->
+    #{host := Host0, scheme := Scheme} = URIMap = uri_string:parse(binary_to_list(add_default_scheme(URL))),
     Port = maps:get(port, URIMap, case Scheme of
                                       "https" -> 443;
                                       _ -> 80
                                   end),
     PoolSize = maps:get(<<"pool_size">>, Params, 32),
-    ConnectTimeout = timer:seconds(maps:get(<<"connect_timeout">>, Params, 5)),
+    ConnectTimeout = cuttlefish_duration:parse(str(maps:get(<<"connect_timeout">>, Params, <<"5s">>))),
     {Inet, Host} = parse_host(Host0),
-    MoreOpts = case Scheme of
-                   "http" ->
-                       [{transport_opts, [Inet]}];
-                   "https" ->
-                       KeyFile = maps:get(<<"keyfile">>, Params),
-                       CertFile = maps:get(<<"certfile">>, Params),
-                       CACertFile = maps:get(<<"cacertfile">>, Params),
-                       VerifyType = case maps:get(<<"verify">>, Params) of
-                                        true -> verify_peer;
-                                        false -> verify_none
-                                    end,
-                       TLSOpts = lists:filter(fun({_K, V}) when V =:= <<>> ->
-                                                  false;
-                                                 (_) ->
-                                                  true
-                                              end, [{keyfile, KeyFile}, {certfile, CertFile}, {cacertfile, CACertFile}]),
-                       NTLSOpts = [ {verify, VerifyType}
-                                  , {versions, emqx_tls_lib:default_versions()}
-                                  , {ciphers, emqx_tls_lib:default_ciphers()}
-                                  | TLSOpts
-                                  ],
-                       [{transport, ssl}, {transport_opts, [Inet | NTLSOpts]}]
-              end,
+    SslOpts = get_ssl_options(Params, ResId, add_default_scheme(URL)),
     [{host, Host},
      {port, Port},
      {pool_size, PoolSize},
      {pool_type, hash},
      {connect_timeout, ConnectTimeout},
      {retry, 5},
-     {retry_timeout, 1000}] ++ MoreOpts.
+     {retry_timeout, 1000},
+     {transport_opts, [Inet] ++ SslOpts}].
 
 pool_name(ResId) ->
     list_to_atom("webhook:" ++ str(ResId)).
 
+get_ssl_options(Config, ResId, <<"https://", _URL/binary>>) ->
+    [{transport, ssl}, {transport_opts, get_ssl_opts(Config, ResId)}];
+get_ssl_options(_Config, _ResId, _URL) ->
+    [].
+
+get_ssl_opts(Opts, ResId) ->
+    KeyFile = maps:get(<<"keyfile">>, Opts, undefined),
+    CertFile = maps:get(<<"certfile">>, Opts, undefined),
+    CAFile = maps:get(<<"cacertfile">>, Opts, undefined),
+    Filter = fun(Opts1) ->
+                     [{K, V} || {K, V} <- Opts1,
+                                    V =/= undefined,
+                                    V =/= <<>>,
+                                    V =/= "" ]
+             end,
+    Key = save_upload_file(KeyFile, ResId),
+    Cert = save_upload_file(CertFile, ResId),
+    CA = save_upload_file(CAFile, ResId),
+    Verify = case maps:get(<<"verify">>, Opts, false) of
+        false -> verify_none;
+        true -> verify_peer
+    end,
+    case Filter([{keyfile, Key}, {certfile, Cert}, {cacertfile, CA}]) of
+        [] -> [{verify, Verify}];
+        SslOpts ->
+            [{verify, Verify} | SslOpts]
+    end.
+
+save_upload_file(#{<<"file">> := <<>>, <<"filename">> := <<>>}, _ResId) -> "";
+save_upload_file(FilePath, _) when is_binary(FilePath) -> binary_to_list(FilePath);
+save_upload_file(#{<<"file">> := File, <<"filename">> := FileName}, ResId) ->
+     FullFilename = filename:join([emqx:get_env(data_dir), rules, ResId, FileName]),
+     ok = filelib:ensure_dir(FullFilename),
+     case file:write_file(FullFilename, File) of
+          ok ->
+               binary_to_list(FullFilename);
+          {error, Reason} ->
+               logger:error("Store file failed, ResId: ~p, ~0p", [ResId, Reason]),
+               error({ResId, store_file_fail})
+     end;
+save_upload_file(_, _) -> "".
+
 parse_host(Host) ->
     case inet:parse_address(Host) of
         {ok, Addr} when size(Addr) =:= 4 -> {inet, Addr};

+ 142 - 0
apps/emqx_web_hook/test/prop_webhook_confs.erl

@@ -0,0 +1,142 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(prop_webhook_confs).
+-include_lib("proper/include/proper.hrl").
+
+-import(emqx_ct_proper_types,
+        [ url/0
+        , nof/1
+        ]).
+
+-define(ALL(Vars, Types, Exprs),
+        ?SETUP(fun() ->
+            State = do_setup(),
+            fun() -> do_teardown(State) end
+         end, ?FORALL(Vars, Types, Exprs))).
+
+%%--------------------------------------------------------------------
+%% Properties
+%%--------------------------------------------------------------------
+
+prop_confs() ->
+    Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")),
+    ?ALL({Url, Confs0}, {url(), confs()},
+        begin
+            Confs = [{"web.hook.api.url", Url}|Confs0],
+            Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)),
+
+            assert_confs(Confs, Envs),
+
+            set_application_envs(Envs),
+            {ok, _} = application:ensure_all_started(emqx_web_hook),
+            application:stop(emqx_web_hook),
+            unset_application_envs(Envs),
+            true
+        end).
+
+%%--------------------------------------------------------------------
+%% Helpers
+%%--------------------------------------------------------------------
+
+do_setup() ->
+    application:set_env(kernel, logger_level, error),
+    emqx_ct_helpers:start_apps([], fun set_special_cfgs/1),
+    ok.
+
+do_teardown(_) ->
+    emqx_ct_helpers:stop_apps([]),
+    ok.
+
+set_special_cfgs(_) ->
+    application:set_env(emqx, plugins_loaded_file, undefined),
+    application:set_env(emqx, modules_loaded_file, undefined),
+    ok.
+
+assert_confs([{"web.hook.api.url", Url}|More], Envs) ->
+    %% Assert!
+    Url = deep_get_env("emqx_web_hook.url", Envs),
+    assert_confs(More, Envs);
+
+assert_confs([{"web.hook.rule." ++ HookName0, Spec}|More], Envs) ->
+    HookName = re:replace(HookName0, "\\.[0-9]", "", [{return, list}]),
+    Rules = deep_get_env("emqx_web_hook.rules", Envs),
+
+    %% Assert!
+    Spec = proplists:get_value(HookName, Rules),
+
+    assert_confs(More, Envs);
+
+assert_confs([_|More], Envs) ->
+    assert_confs(More, Envs);
+
+assert_confs([], _) ->
+    true.
+
+deep_get_env(Path, Envs) ->
+    lists:foldl(
+      fun(_K, undefiend) -> undefiend;
+         (K, Acc) -> proplists:get_value(binary_to_atom(K, utf8), Acc)
+    end, Envs, re:split(Path, "\\.")).
+
+set_application_envs(Envs) ->
+    application:set_env(Envs).
+
+unset_application_envs(Envs) ->
+    lists:foreach(fun({App, Es}) ->
+        lists:foreach(fun({K, _}) ->
+            application:unset_env(App, K)
+        end, Es) end, Envs).
+
+cuttlefish_conf_file(Ls) when is_list(Ls) ->
+    [cuttlefish_conf_option(K,V) || {K, V} <- Ls].
+
+cuttlefish_conf_option(K, V)
+    when is_list(K) ->
+    {re:split(K, "[.]", [{return, list}]), V}.
+
+%%--------------------------------------------------------------------
+%% Generators
+%%--------------------------------------------------------------------
+
+confs() ->
+    nof([{"web.hook.encode_payload", oneof(["base64", "base62"])},
+         {"web.hook.rule.client.connect.1", rule_spec()},
+         {"web.hook.rule.client.connack.1", rule_spec()},
+         {"web.hook.rule.client.connected.1", rule_spec()},
+         {"web.hook.rule.client.disconnected.1", rule_spec()},
+         {"web.hook.rule.client.subscribe.1", rule_spec()},
+         {"web.hook.rule.client.unsubscribe.1", rule_spec()},
+         {"web.hook.rule.session.subscribed.1", rule_spec()},
+         {"web.hook.rule.session.unsubscribed.1", rule_spec()},
+         {"web.hook.rule.session.terminated.1", rule_spec()},
+         {"web.hook.rule.message.publish.1", rule_spec()},
+         {"web.hook.rule.message.delivered.1", rule_spec()},
+         {"web.hook.rule.message.acked.1", rule_spec()}
+        ]).
+
+rule_spec() ->
+    ?LET(Action, action_names(),
+         begin
+            binary_to_list(emqx_json:encode(#{action => Action}))
+         end).
+
+action_names() ->
+    oneof([on_client_connect, on_client_connack, on_client_connected,
+           on_client_connected, on_client_disconnected, on_client_subscribe, on_client_unsubscribe,
+           on_session_subscribed, on_session_unsubscribed, on_session_terminated,
+           on_message_publish, on_message_delivered, on_message_acked]).
+

+ 409 - 0
apps/emqx_web_hook/test/prop_webhook_hooks.erl

@@ -0,0 +1,409 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(prop_webhook_hooks).
+
+-include_lib("proper/include/proper.hrl").
+
+-import(emqx_ct_proper_types,
+        [ conninfo/0
+        , clientinfo/0
+        , sessioninfo/0
+        , message/0
+        , connack_return_code/0
+        , topictab/0
+        , topic/0
+        , subopts/0
+        ]).
+
+-define(ALL(Vars, Types, Exprs),
+        ?SETUP(fun() ->
+            State = do_setup(),
+            fun() -> do_teardown(State) end
+         end, ?FORALL(Vars, Types, Exprs))).
+
+%%--------------------------------------------------------------------
+%% Properties
+%%--------------------------------------------------------------------
+
+prop_client_connect() ->
+    ?ALL({ConnInfo, ConnProps, Env},
+         {conninfo(), conn_properties(), empty_env()},
+       begin
+           ok = emqx_web_hook:on_client_connect(ConnInfo, ConnProps, Env),
+           Body = receive_http_request_body(),
+           Body = emqx_json:encode(
+                    #{action => client_connect,
+                      node => stringfy(node()),
+                      clientid => maps:get(clientid, ConnInfo),
+                      username => maybe(maps:get(username, ConnInfo)),
+                      ipaddress => peer2addr(maps:get(peername, ConnInfo)),
+                      keepalive => maps:get(keepalive, ConnInfo),
+                      proto_ver => maps:get(proto_ver, ConnInfo)
+                     }),
+           true
+       end).
+
+prop_client_connack() ->
+    ?ALL({ConnInfo, Rc, AckProps, Env},
+         {conninfo(), connack_return_code(), ack_properties(), empty_env()},
+        begin
+            ok = emqx_web_hook:on_client_connack(ConnInfo, Rc, AckProps, Env),
+            Body = receive_http_request_body(),
+            Body = emqx_json:encode(
+                     #{action => client_connack,
+                       node => stringfy(node()),
+                       clientid => maps:get(clientid, ConnInfo),
+                       username => maybe(maps:get(username, ConnInfo)),
+                       ipaddress => peer2addr(maps:get(peername, ConnInfo)),
+                       keepalive => maps:get(keepalive, ConnInfo),
+                       proto_ver => maps:get(proto_ver, ConnInfo),
+                       conn_ack => Rc
+                       }),
+            true
+        end).
+
+prop_client_connected() ->
+    ?ALL({ClientInfo, ConnInfo, Env},
+         {clientinfo(), conninfo(), empty_env()},
+        begin
+            ok = emqx_web_hook:on_client_connected(ClientInfo, ConnInfo, Env),
+            Body = receive_http_request_body(),
+            Body = emqx_json:encode(
+                     #{action => client_connected,
+                       node => stringfy(node()),
+                       clientid => maps:get(clientid, ClientInfo),
+                       username => maybe(maps:get(username, ClientInfo)),
+                       ipaddress => peer2addr(maps:get(peerhost, ClientInfo)),
+                       keepalive => maps:get(keepalive, ConnInfo),
+                       proto_ver => maps:get(proto_ver, ConnInfo),
+                       connected_at => maps:get(connected_at, ConnInfo)
+                      }),
+            true
+        end).
+
+prop_client_disconnected() ->
+    ?ALL({ClientInfo, Reason, ConnInfo, Env},
+         {clientinfo(), shutdown_reason(), disconnected_conninfo(), empty_env()},
+        begin
+            ok = emqx_web_hook:on_client_disconnected(ClientInfo, Reason, ConnInfo, Env),
+            Body = receive_http_request_body(),
+            Body = emqx_json:encode(
+                     #{action => client_disconnected,
+                       node => stringfy(node()),
+                       clientid => maps:get(clientid, ClientInfo),
+                       username => maybe(maps:get(username, ClientInfo)),
+                       disconnected_at => maps:get(disconnected_at, ConnInfo),
+                       reason => stringfy(Reason)
+                      }),
+            true
+        end).
+
+prop_client_subscribe() ->
+    ?ALL({ClientInfo, SubProps, TopicTab, Env},
+         {clientinfo(), sub_properties(), topictab(), topic_filter_env()},
+        begin
+            ok = emqx_web_hook:on_client_subscribe(ClientInfo, SubProps, TopicTab, Env),
+
+            Matched = filter_topictab(TopicTab, Env),
+
+            lists:foreach(fun({Topic, Opts}) ->
+                Body = receive_http_request_body(),
+                Body = emqx_json:encode(
+                         #{action => client_subscribe,
+                           node => stringfy(node()),
+                           clientid => maps:get(clientid, ClientInfo),
+                           username => maybe(maps:get(username, ClientInfo)),
+                           topic => Topic,
+                           opts => Opts})
+            end, Matched),
+            true
+        end).
+
+prop_client_unsubscribe() ->
+    ?ALL({ClientInfo, SubProps, TopicTab, Env},
+         {clientinfo(), unsub_properties(), topictab(), topic_filter_env()},
+        begin
+            ok = emqx_web_hook:on_client_unsubscribe(ClientInfo, SubProps, TopicTab, Env),
+
+            Matched = filter_topictab(TopicTab, Env),
+
+            lists:foreach(fun({Topic, Opts}) ->
+                Body = receive_http_request_body(),
+                Body = emqx_json:encode(
+                         #{action => client_unsubscribe,
+                           node => stringfy(node()),
+                           clientid => maps:get(clientid, ClientInfo),
+                           username => maybe(maps:get(username, ClientInfo)),
+                           topic => Topic,
+                           opts => Opts})
+            end, Matched),
+            true
+        end).
+
+prop_session_subscribed() ->
+    ?ALL({ClientInfo, Topic, SubOpts, Env},
+         {clientinfo(), topic(), subopts(), topic_filter_env()},
+        begin
+            ok = emqx_web_hook:on_session_subscribed(ClientInfo, Topic, SubOpts, Env),
+            filter_topic_match(Topic, Env) andalso begin
+                Body = receive_http_request_body(),
+                Body1 = emqx_json:encode(
+                         #{action => session_subscribed,
+                           node => stringfy(node()),
+                           clientid => maps:get(clientid, ClientInfo),
+                           username => maybe(maps:get(username, ClientInfo)),
+                           topic => Topic,
+                           opts => SubOpts
+                          }),
+                Body = Body1
+            end,
+            true
+        end).
+
+prop_session_unsubscribed() ->
+    ?ALL({ClientInfo, Topic, SubOpts, Env},
+         {clientinfo(), topic(), subopts(), empty_env()},
+        begin
+            ok = emqx_web_hook:on_session_unsubscribed(ClientInfo, Topic, SubOpts, Env),
+            filter_topic_match(Topic, Env) andalso begin
+                Body = receive_http_request_body(),
+                Body = emqx_json:encode(
+                         #{action => session_unsubscribed,
+                           node => stringfy(node()),
+                           clientid => maps:get(clientid, ClientInfo),
+                           username => maybe(maps:get(username, ClientInfo)),
+                           topic => Topic
+                          })
+            end,
+            true
+        end).
+
+prop_session_terminated() ->
+    ?ALL({ClientInfo, Reason, SessInfo, Env},
+         {clientinfo(), shutdown_reason(), sessioninfo(), empty_env()},
+        begin
+            ok = emqx_web_hook:on_session_terminated(ClientInfo, Reason, SessInfo, Env),
+            Body = receive_http_request_body(),
+            Body = emqx_json:encode(
+                     #{action => session_terminated,
+                       node => stringfy(node()),
+                       clientid => maps:get(clientid, ClientInfo),
+                       username => maybe(maps:get(username, ClientInfo)),
+                       reason => stringfy(Reason)
+                      }),
+            true
+        end).
+
+prop_message_publish() ->
+    ?ALL({Msg, Env, Encode}, {message(), topic_filter_env(), payload_encode()},
+        begin
+            application:set_env(emqx_web_hook, encode_payload, Encode),
+            {ok, Msg} = emqx_web_hook:on_message_publish(Msg, Env),
+            application:unset_env(emqx_web_hook, encode_payload),
+
+            (not emqx_message:is_sys(Msg))
+                andalso filter_topic_match(emqx_message:topic(Msg), Env)
+                andalso begin
+                    Body = receive_http_request_body(),
+                    Body = emqx_json:encode(
+                             #{action => message_publish,
+                               node => stringfy(node()),
+                               from_client_id => emqx_message:from(Msg),
+                               from_username => maybe(emqx_message:get_header(username, Msg)),
+                               topic => emqx_message:topic(Msg),
+                               qos => emqx_message:qos(Msg),
+                               retain => emqx_message:get_flag(retain, Msg),
+                               payload => encode(emqx_message:payload(Msg), Encode),
+                               ts => emqx_message:timestamp(Msg)
+                              })
+                end,
+            true
+        end).
+
+prop_message_delivered() ->
+    ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env(), payload_encode()},
+        begin
+            application:set_env(emqx_web_hook, encode_payload, Encode),
+            ok = emqx_web_hook:on_message_delivered(ClientInfo, Msg, Env),
+            application:unset_env(emqx_web_hook, encode_payload),
+
+            (not emqx_message:is_sys(Msg))
+                andalso filter_topic_match(emqx_message:topic(Msg), Env)
+                andalso begin
+                    Body = receive_http_request_body(),
+                    Body = emqx_json:encode(
+                             #{action => message_delivered,
+                               node => stringfy(node()),
+                               clientid => maps:get(clientid, ClientInfo),
+                               username => maybe(maps:get(username, ClientInfo)),
+                               from_client_id => emqx_message:from(Msg),
+                               from_username => maybe(emqx_message:get_header(username, Msg)),
+                               topic => emqx_message:topic(Msg),
+                               qos => emqx_message:qos(Msg),
+                               retain => emqx_message:get_flag(retain, Msg),
+                               payload => encode(emqx_message:payload(Msg), Encode),
+                               ts => emqx_message:timestamp(Msg)
+                              })
+                end,
+            true
+        end).
+
+prop_message_acked() ->
+    ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), empty_env(), payload_encode()},
+        begin
+            application:set_env(emqx_web_hook, encode_payload, Encode),
+            ok = emqx_web_hook:on_message_acked(ClientInfo, Msg, Env),
+            application:unset_env(emqx_web_hook, encode_payload),
+
+            (not emqx_message:is_sys(Msg))
+                andalso filter_topic_match(emqx_message:topic(Msg), Env)
+                andalso begin
+                    Body = receive_http_request_body(),
+                    Body = emqx_json:encode(
+                             #{action => message_acked,
+                               node => stringfy(node()),
+                               clientid => maps:get(clientid, ClientInfo),
+                               username => maybe(maps:get(username, ClientInfo)),
+                               from_client_id => emqx_message:from(Msg),
+                               from_username => maybe(emqx_message:get_header(username, Msg)),
+                               topic => emqx_message:topic(Msg),
+                               qos => emqx_message:qos(Msg),
+                               retain => emqx_message:get_flag(retain, Msg),
+                               payload => encode(emqx_message:payload(Msg), Encode),
+                               ts => emqx_message:timestamp(Msg)
+                              })
+                end,
+            true
+        end).
+
+%%--------------------------------------------------------------------
+%% Helper
+%%--------------------------------------------------------------------
+do_setup() ->
+    %% Pre-defined envs
+    application:set_env(emqx_web_hook, path, "path"),
+    application:set_env(emqx_web_hook, headers, []),
+
+    meck:new(ehttpc_pool, [passthrough, no_history]),
+    meck:expect(ehttpc_pool, pick_worker, fun(_, _) -> ok end),
+
+    Self = self(),
+    meck:new(ehttpc, [passthrough, no_history]),
+    meck:expect(ehttpc, request,
+                fun(_ClientId, Method, {Path, Headers, Body}) ->
+                    Self ! {Method, Path, Headers, Body}, {ok, ok, ok}
+                end),
+
+    meck:new(emqx_metrics, [passthrough, no_history]),
+    meck:expect(emqx_metrics, inc, fun(_) -> ok end),
+    ok.
+
+do_teardown(_) ->
+    meck:unload(ehttpc_pool),
+    meck:unload(ehttpc),
+    meck:unload(emqx_metrics).
+
+maybe(undefined) -> null;
+maybe(T) -> T.
+
+peer2addr({Host, _}) ->
+    list_to_binary(inet:ntoa(Host));
+peer2addr(Host) ->
+    list_to_binary(inet:ntoa(Host)).
+
+ensure_to_binary(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
+ensure_to_binary(Bin) when is_binary(Bin) -> Bin.
+
+stringfy({shutdown, Reason}) ->
+    stringfy(Reason);
+stringfy(Term) when is_atom(Term); is_binary(Term) ->
+    Term;
+stringfy(Term) ->
+    unicode:characters_to_binary(io_lib:format("~0p", [Term])).
+
+receive_http_request_body() ->
+    receive
+        {post, _, _, Body} ->
+            Body
+    after 100 ->
+        exit(waiting_message_timeout)
+    end.
+
+receive_http_request_bodys() ->
+    receive_http_request_bodys_([]).
+
+receive_http_request_bodys_(Acc) ->
+    receive
+        {post, _, _, Body} ->
+           receive_http_request_bodys_([Body|Acc])
+    after 1000 ->
+          lists:reverse(Acc)
+    end.
+
+filter_topictab(TopicTab, {undefined}) ->
+    TopicTab;
+filter_topictab(TopicTab, {TopicFilter}) ->
+    lists:filter(fun({Topic, _}) -> emqx_topic:match(Topic, TopicFilter) end, TopicTab).
+
+filter_topic_match(_Topic, {undefined}) ->
+    true;
+filter_topic_match(Topic, {TopicFilter}) ->
+    emqx_topic:match(Topic, TopicFilter).
+
+encode(Bin, base64) ->
+    base64:encode(Bin);
+encode(Bin, base62) ->
+    emqx_base62:encode(Bin);
+encode(Bin, _) ->
+    Bin.
+
+%%--------------------------------------------------------------------
+%% Generators
+%%--------------------------------------------------------------------
+
+conn_properties() ->
+    #{}.
+
+ack_properties() ->
+    #{}.
+
+sub_properties() ->
+    #{}.
+
+unsub_properties() ->
+    #{}.
+
+shutdown_reason() ->
+    oneof([any(), {shutdown, atom()}]).
+
+empty_env() ->
+    {undefined}.
+
+topic_filter_env() ->
+    oneof([{<<"#">>}, {undefined}, {topic()}]).
+
+payload_encode() ->
+    oneof([base62, base64, undefined]).
+
+http_code() ->
+    oneof([socket_closed_remotely, others]).
+
+disconnected_conninfo() ->
+    ?LET(Info, conninfo(),
+         begin
+           Info#{disconnected_at => erlang:system_time(millisecond)}
+         end).

+ 3 - 3
apps/emqx_web_hook/test/props/prop_webhook_confs.erl

@@ -34,8 +34,9 @@
 
 prop_confs() ->
     Schema = cuttlefish_schema:files(filelib:wildcard(code:priv_dir(emqx_web_hook) ++ "/*.schema")),
-    ?ALL(Confs, confs(),
+    ?ALL({Url, Confs0}, {url(), confs()},
         begin
+            Confs = [{"web.hook.api.url", Url}|Confs0],
             Envs = cuttlefish_generator:map(Schema, cuttlefish_conf_file(Confs)),
 
             assert_confs(Confs, Envs),
@@ -112,8 +113,7 @@ cuttlefish_conf_option(K, V)
 %%--------------------------------------------------------------------
 
 confs() ->
-    nof([{"web.hook.api.url", url()},
-         {"web.hook.encode_payload", oneof(["base64", "base62"])},
+    nof([{"web.hook.encode_payload", oneof(["base64", "base62"])},
          {"web.hook.rule.client.connect.1", rule_spec()},
          {"web.hook.rule.client.connack.1", rule_spec()},
          {"web.hook.rule.client.connected.1", rule_spec()},