Ver código fonte

refactor(exhook): improve the exhook implementation

JianBo He 5 anos atrás
pai
commit
485bffadd6

+ 1 - 1
.gitignore

@@ -41,4 +41,4 @@ erlang.mk
 *.coverdata
 etc/emqx.conf.rendered
 Mnesia.*/
-*.DS_Store
+*.DS_Store

+ 3 - 0
apps/emqx_exhook/.gitignore

@@ -0,0 +1,3 @@
+src/emqx_exhook_pb.erl
+src/emqx_exhook_v_1_hook_provider_bhvr.erl
+src/emqx_exhook_v_1_hook_provider_client.erl

+ 39 - 0
apps/emqx_exhook/README.md

@@ -0,0 +1,39 @@
+# emqx_exhook
+
+The `emqx_exhook` extremly enhance the extensibility for EMQ X. It allow using an others programming language to mount the hooks intead of erlang.
+
+## Feature
+
+- [x] Based on gRPC, it brings a very wide range of applicability
+- [x] Allows you to use the return value to extend emqx behavior.
+
+## Architecture
+
+```
+EMQ X                                      Third-party Runtime
++========================+                 +========+==========+
+|    ExHook              |                 |        |          |
+|   +----------------+   |      gRPC       | gRPC   |  User's  |
+|   |   gPRC Client  | ------------------> | Server |  Codes   |
+|   +----------------+   |    (HTTP/2)     |        |          |
+|                        |                 |        |          |
++========================+                 +========+==========+
+```
+
+## Usage
+
+### gRPC service
+
+See: `priv/protos/exhook.proto`
+
+### CLI
+
+## Example
+
+## Recommended gRPC Framework
+
+See: https://github.com/grpc-ecosystem/awesome-grpc
+
+## Thanks
+
+- [grpcbox](https://github.com/tsloughter/grpcbox)

+ 116 - 0
apps/emqx_exhook/docs/design.md

@@ -0,0 +1,116 @@
+# 设计
+
+## 动机
+
+在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力:
+
+1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能
+2. `emqx-exproto` 提供了使用 Java,Python 编写用户自定义协议接入插件的功能
+
+但在后续的支持中发现许多难以处理的问题:
+
+1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。
+2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。
+3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。
+4. `erlport` 会占用 `stdin` `stdout`。
+
+因此,我们计划重构这部分的实现,其中主要的内容是:
+1. 使用 `gRPC` 替换 `erlport`。
+2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook`
+
+
+旧版本的设计参考:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md)
+
+## 设计
+
+架构如下:
+
+```
+  EMQ X                                    
++========================+                 +========+==========+
+|    ExHook              |                 |        |          |
+|   +----------------+   |      gRPC       | gRPC   |  User's  |
+|   |   gRPC Client  | ------------------> | Server |  Codes   |
+|   +----------------+   |    (HTTP/2)     |        |          |
+|                        |                 |        |          |
++========================+                 +========+==========+
+```
+
+`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。
+
+
+和 emqx 原生的钩子一致,emqx-exhook 也支持链式的方式计算和返回:
+
+<img src="https://docs.emqx.net/broker/latest/cn/advanced/assets/chain_of_responsiblity.png" style="zoom:50%;" />
+
+### gRPC 服务示例
+
+用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中。例如,其支持的接口有:
+
+```protobuff
+syntax = "proto3";
+
+package emqx.exhook.v1;
+
+service HookProvider {
+
+  rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
+
+  rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
+
+  rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
+
+  rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
+
+  rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
+
+  rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
+
+  rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
+
+  rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
+
+  rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
+}
+```
+
+### 配置文件示例
+
+```
+## 配置 gRPC 服务地址 (HTTP)
+##
+## s1 为服务器的名称
+exhook.server.s1.url = http://127.0.0.1:9001
+
+## 配置 gRPC 服务地址 (HTTPS)
+##
+## s2 为服务器名称
+exhook.server.s2.url = https://127.0.0.1:9002
+exhook.server.s2.cacertfile = ca.pem
+exhook.server.s2.certfile = cert.pem
+exhook.server.s2.keyfile = key.pem
+```

+ 22 - 0
apps/emqx_exhook/include/emqx_exhook.hrl

@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_EXHOOK_HRL).
+-define(EMQX_EXHOOK_HRL, true).
+
+-define(APP, emqx_exhook).
+
+-endif.

+ 38 - 0
apps/emqx_exhook/priv/emqx_exhook.schema

@@ -0,0 +1,38 @@
+%%-*- mode: erlang -*-
+
+{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [
+  {datatype, string}
+]}.
+
+{mapping, "exhook.server.$name.ssl.cacertfile", "emqx_exhook.servers", [
+  {datatype, string}
+]}.
+
+{mapping, "exhook.server.$name.ssl.certfile", "emqx_exhook.servers", [
+  {datatype, string}
+]}.
+
+{mapping, "exhook.server.$name.ssl.keyfile", "emqx_exhook.servers", [
+  {datatype, string}
+]}.
+
+{translation, "emqx_exhook.servers", fun(Conf) ->
+  Filter  = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
+  ServerOptions = fun(Prefix) ->
+                      case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of
+                          {ok, {http, _, Host, Port, _, _}} ->
+                              [{scheme, http}, {host, Host}, {port, Port}];
+                          {ok, {https, _, Host, Port, _, _}} ->
+                              [{scheme, https}, {host, Host}, {port, Port},
+                               {ssl_options,
+                                 Filter([{ssl, true},
+                                         {certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf)},
+                                         {keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf)},
+                                         {cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf)}
+                                        ])}];
+                          _ -> error(invalid_server_options)
+                      end
+                  end,
+  [{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)}
+   || {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)]
+end}.

+ 395 - 0
apps/emqx_exhook/priv/protos/exhook.proto

@@ -0,0 +1,395 @@
+//------------------------------------------------------------------------------
+// Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//------------------------------------------------------------------------------
+
+syntax = "proto3";
+
+package emqx.exhook.v1;
+
+service HookProvider {
+
+  rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
+
+  rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
+
+  rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
+
+  rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
+
+  rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
+
+  rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
+
+  rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
+
+  rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
+
+  rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
+
+  rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
+
+  rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
+
+  rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
+}
+
+//------------------------------------------------------------------------------
+// Request & Response
+//------------------------------------------------------------------------------
+
+message ProviderLoadedRequest {
+
+  BrokerInfo broker = 1;
+}
+
+message LoadedResponse {
+
+  repeated HookSpec hooks = 1;
+}
+
+message ProviderUnloadedRequest { }
+
+message ClientConnectRequest {
+
+  ConnInfo conninfo = 1;
+
+  // MQTT CONNECT packet's properties (MQTT v5.0)
+  //
+  // It should be empty on MQTT v3.1.1/v3.1 or others protocol
+  repeated Property props = 2;
+}
+
+message ClientConnackRequest {
+
+  ConnInfo conninfo = 1;
+
+  string result_code = 2;
+
+  repeated Property props = 3;
+}
+
+message ClientConnectedRequest {
+
+  ClientInfo clientinfo = 1;
+}
+
+message ClientDisconnectedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string reason = 2;
+}
+
+message ClientAuthenticateRequest {
+
+  ClientInfo clientinfo = 1;
+
+  bool result = 2;
+}
+
+message ClientCheckAclRequest {
+
+  ClientInfo clientinfo = 1;
+
+  enum AclReqType {
+
+    PUBLISH = 0;
+
+    SUBSCRIBE = 1;
+  }
+
+  AclReqType type = 2;
+
+  string topic = 3;
+
+  bool result = 4;
+}
+
+message ClientSubscribeRequest {
+
+  ClientInfo clientinfo = 1;
+
+  repeated Property props = 2;
+
+  repeated TopicFilter topic_filters = 3;
+}
+
+message ClientUnsubscribeRequest {
+
+  ClientInfo clientinfo = 1;
+
+  repeated Property props = 2;
+
+  repeated TopicFilter topic_filters = 3;
+}
+
+message SessionCreatedRequest {
+
+  ClientInfo clientinfo = 1;
+}
+
+message SessionSubscribedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string topic = 2;
+
+  SubOpts subopts = 3;
+}
+
+message SessionUnsubscribedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string topic = 2;
+}
+
+message SessionResumedRequest {
+
+  ClientInfo clientinfo = 1;
+}
+
+message SessionDiscardedRequest {
+
+  ClientInfo clientinfo = 1;
+}
+
+message SessionTakeoveredRequest {
+
+  ClientInfo clientinfo = 1;
+}
+
+message SessionTerminatedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  string reason = 2;
+}
+
+message MessagePublishRequest {
+
+  Message message = 1;
+}
+
+message MessageDeliveredRequest {
+
+  ClientInfo clientinfo = 1;
+
+  Message message = 2;
+}
+
+message MessageDroppedRequest {
+
+  Message message = 1;
+
+  string reason = 2;
+}
+
+message MessageAckedRequest {
+
+  ClientInfo clientinfo = 1;
+
+  Message message = 2;
+}
+
+//------------------------------------------------------------------------------
+// Basic data types
+//------------------------------------------------------------------------------
+
+message EmptySuccess { }
+
+message ValuedResponse {
+
+  // The responsed value type
+  //  - ignore: Ignore the responsed value
+  //  - contiune: Use the responsed value and execute the next hook
+  //  - stop_and_return: Use the responsed value and stop the chain executing
+  enum ResponsedType {
+
+    IGNORE = 0;
+
+    CONTINUE = 1;
+
+    STOP_AND_RETURN = 2;
+  }
+
+  ResponsedType type = 1;
+
+  oneof value {
+
+    // Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks
+    bool bool_result = 3;
+
+    // Message result, used on the 'message.*' hooks
+    Message message = 4;
+  }
+}
+
+message BrokerInfo {
+
+  string version = 1;
+
+  string sysdescr = 2;
+
+  string uptime = 3;
+
+  string datetime = 4;
+}
+
+message HookSpec {
+
+  // The registered hooks name
+  //
+  // Available value:
+  //   "client.connect",      "client.connack"
+  //   "client.connected",    "client.disconnected"
+  //   "client.authenticate", "client.check_acl"
+  //   "client.subscribe",    "client.unsubscribe"
+  //
+  //   "session.created",      "session.subscribed"
+  //   "session.unsubscribed", "session.resumed"
+  //   "session.discarded",    "session.takeovered"
+  //   "session.terminated"
+  //
+  //   "message.publish", "message.delivered"
+  //   "message.acked",   "message.dropped"
+  string name = 1;
+
+  // The topic filters for message hooks
+  repeated string topics = 2;
+}
+
+message ConnInfo {
+
+  string node = 1;
+
+  string clientid = 2;
+
+  string username = 3;
+
+  string peerhost = 4;
+
+  uint32 sockport = 5;
+
+  string proto_name = 6;
+
+  string proto_ver = 7;
+
+  uint32 keepalive = 8;
+}
+
+message ClientInfo {
+
+  string node = 1;
+
+  string clientid = 2;
+
+  string username = 3;
+
+  string password = 4;
+
+  string peerhost = 5;
+
+  uint32 sockport = 6;
+
+  string protocol = 7;
+
+  string mountpoint = 8;
+
+  bool  is_superuser = 9;
+
+  bool  anonymous = 10;
+}
+
+message Message {
+
+  string node = 1;
+
+  string id = 2;
+
+  uint32 qos = 3;
+
+  string from = 4;
+
+  string topic = 5;
+
+  bytes  payload = 6;
+
+  uint64 timestamp = 7;
+}
+
+message Property {
+
+  string name = 1;
+
+  string value = 2;
+}
+
+message TopicFilter {
+
+  string name = 1;
+
+  uint32 qos = 2;
+}
+
+message SubOpts {
+
+  // The QoS level
+  uint32 qos = 1;
+
+  // The group name for shared subscription
+  string share = 2;
+
+  // The Retain Handling option (MQTT v5.0)
+  //
+  //  0 = Send retained messages at the time of the subscribe
+  //  1 = Send retained messages at subscribe only if the subscription does
+  //       not currently exist
+  //  2 = Do not send retained messages at the time of the subscribe
+  uint32 rh = 3;
+
+  // The Retain as Published option (MQTT v5.0)
+  //
+  //  If 1, Application Messages forwarded using this subscription keep the
+  //        RETAIN flag they were published with.
+  //  If 0, Application Messages forwarded using this subscription have the
+  //        RETAIN flag set to 0.
+  // Retained messages sent when the subscription is established have the RETAIN flag set to 1.
+  uint32 rap = 4;
+
+  // The No Local option (MQTT v5.0)
+  //
+  // If the value is 1, Application Messages MUST NOT be forwarded to a
+  // connection with a ClientID equal to the ClientID of the publishing
+  uint32 nl = 5;
+}

+ 47 - 0
apps/emqx_exhook/rebar.config

@@ -0,0 +1,47 @@
+%%-*- mode: erlang -*-
+{plugins,
+ [rebar3_proper,
+  {grpcbox_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {branch, "master"}}}
+]}.
+
+{deps,
+ [{grpcbox, {git, "https://github.com/tsloughter/grpcbox", {branch, "master"}}}
+]}.
+
+{grpc,
+ [{protos, ["priv/protos"]},
+  {gpb_opts, [{module_name_prefix, "emqx_"},
+              {module_name_suffix, "_pb"}]}
+]}.
+
+{provider_hooks,
+ [{pre, [{compile, {grpc, gen}}]}]}.
+
+{edoc_opts, [{preprocess, true}]}.
+
+{erl_opts, [warn_unused_vars,
+            warn_shadow_vars,
+            warn_unused_import,
+            warn_obsolete_guard,
+            debug_info,
+            {parse_transform}]}.
+
+{xref_checks, [undefined_function_calls, undefined_functions,
+               locals_not_used, deprecated_function_calls,
+               warnings_as_errors, deprecated_functions]}.
+{xref_ignores, [emqx_exhook_pb]}.
+
+{cover_enabled, true}.
+{cover_opts, [verbose]}.
+{cover_export_enabled, true}.
+{cover_excl_mods, [emqx_exhook_pb,
+                   emqx_exhook_v_1_hook_provider_bhvr,
+                   emqx_exhook_v_1_connection_client]}.
+
+{profiles,
+    [{test, [
+        {deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.1"}}}
+               , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
+               ]}
+    ]}
+]}.

+ 12 - 0
apps/emqx_exhook/src/emqx_exhook.app.src

@@ -0,0 +1,12 @@
+{application, emqx_exhook,
+ [{description, "EMQ X Extension for Hook"},
+  {vsn, "git"},
+  {modules, []},
+  {registered, []},
+  {mod, {emqx_exhook_app, []}},
+  {applications, [kernel,stdlib,emqx_libs,grpcbox]},
+  {env,[]},
+  {licenses, ["Apache-2.0"]},
+  {maintainers, ["EMQ X Team <contact@emqx.io>"]},
+  {links, [{"Homepage", "https://emqx.io/"}]}
+ ]}.

+ 134 - 0
apps/emqx_exhook/src/emqx_exhook.erl

@@ -0,0 +1,134 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook).
+
+-include("emqx_exhook.hrl").
+-include_lib("emqx_libs/include/logger.hrl").
+
+-logger_header("[ExHook]").
+
+%% Mgmt APIs
+-export([ enable/2
+        , disable/1
+        , disable_all/0
+        , list/0
+        ]).
+
+-export([ cast/2
+        , call_fold/3
+        ]).
+
+%%--------------------------------------------------------------------
+%% Mgmt APIs
+%%--------------------------------------------------------------------
+
+-spec list() -> [emqx_exhook_server:server()].
+list() ->
+    [server(Name) || Name <- running()].
+
+-spec enable(atom()|string(), list()) -> ok | {error, term()}.
+enable(Name, Opts) ->
+    case lists:member(Name, running()) of
+        true ->
+            {error, already_started};
+        _ ->
+            case emqx_exhook_server:load(Name, Opts) of
+                {ok, ServiceState} ->
+                    save(Name, ServiceState);
+                {error, Reason} ->
+                    ?LOG(error, "Load server ~p failed: ~p", [Name, Reason]),
+                    {error, Reason}
+            end
+    end.
+
+-spec disable(atom()|string()) -> ok | {error, term()}.
+disable(Name) ->
+    case server(Name) of
+        undefined -> {error, not_running};
+        Service ->
+            ok = emqx_exhook_server:unload(Service),
+            unsave(Name)
+    end.
+
+-spec disable_all() -> [term()].
+disable_all() ->
+    [begin disable(Name), Name end || Name <- running()].
+
+%%----------------------------------------------------------
+%% Dispatch APIs
+%%----------------------------------------------------------
+
+-spec cast(atom(), map()) -> ok.
+cast(Hookpoint, Req) ->
+    cast(Hookpoint, Req, running()).
+
+cast(_, _, []) ->
+    ok;
+cast(Hookpoint, Req, [ServiceName|More]) ->
+    %% XXX: Need a real asynchronous running
+    _ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)),
+    cast(Hookpoint, Req, More).
+
+-spec call_fold(atom(), term(), function())
+  -> {ok, term()}
+   | {stop, term()}.
+call_fold(Hookpoint, Req, AccFun) ->
+    call_fold(Hookpoint, Req, AccFun, running()).
+
+call_fold(_, Req, _, []) ->
+    {ok, Req};
+call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
+    case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of
+        {ok, Resp} ->
+            case AccFun(Req, Resp) of
+                {stop, NReq} -> {stop, NReq};
+                {ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More)
+            end;
+        _ ->
+            call_fold(Hookpoint, Req, AccFun, More)
+    end.
+
+%%----------------------------------------------------------
+%% Storage
+
+-compile({inline, [save/2]}).
+save(Name, ServiceState) ->
+    Saved = persistent_term:get(?APP, []),
+    persistent_term:put(?APP, lists:reverse([Name | Saved])),
+    persistent_term:put({?APP, Name}, ServiceState).
+
+-compile({inline, [unsave/1]}).
+unsave(Name) ->
+    case persistent_term:get(?APP, []) of
+        [] ->
+            persistent_term:erase(?APP);
+        Saved ->
+            persistent_term:put(?APP, lists:delete(Name, Saved))
+    end,
+    persistent_term:erase({?APP, Name}),
+    ok.
+
+-compile({inline, [running/0]}).
+running() ->
+    persistent_term:get(?APP, []).
+
+-compile({inline, [server/1]}).
+server(Name) ->
+    case catch persistent_term:get({?APP, Name}) of
+        {'EXIT', {badarg,_}} -> undefined;
+        Service -> Service
+    end.

+ 117 - 0
apps/emqx_exhook/src/emqx_exhook_app.erl

@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_app).
+
+-behaviour(application).
+
+-include("emqx_exhook.hrl").
+
+-emqx_plugin(extension).
+
+-export([ start/2
+        , stop/1
+        , prep_stop/1
+        ]).
+
+%% Internal export
+-export([ load_server/2
+        , unload_server/1
+        , load_exhooks/0
+        , unload_exhooks/0
+        ]).
+
+%%--------------------------------------------------------------------
+%% Application callbacks
+%%--------------------------------------------------------------------
+
+start(_StartType, _StartArgs) ->
+    {ok, Sup} = emqx_exhook_sup:start_link(),
+
+    %% Load all dirvers
+    load_all_servers(),
+
+    %% Register all hooks
+    load_exhooks(),
+
+    %% Register CLI
+    emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []),
+    {ok, Sup}.
+
+prep_stop(State) ->
+    emqx_ctl:unregister_command(exhook),
+    unload_exhooks(),
+    unload_all_servers(),
+    State.
+
+stop(_State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+load_all_servers() ->
+    lists:foreach(fun({Name, Options}) ->
+        load_server(Name, Options)
+    end, application:get_env(?APP, servers, [])).
+
+unload_all_servers() ->
+    emqx_exhook:disable_all().
+
+load_server(Name, Options) ->
+    emqx_exhook:enable(Name, Options).
+
+unload_server(Name) ->
+    emqx_exhook:disable(Name).
+
+%%--------------------------------------------------------------------
+%% Exhooks
+
+load_exhooks() ->
+    [emqx:hook(Name, {M, F, A}) || {Name, {M, F, A}} <- search_exhooks()].
+
+unload_exhooks() ->
+    [emqx:unhook(Name, {M, F}) || {Name, {M, F, _A}} <- search_exhooks()].
+
+search_exhooks() ->
+    search_exhooks(ignore_lib_apps(application:loaded_applications())).
+search_exhooks(Apps) ->
+    lists:flatten([ExHooks || App <- Apps, {_App, _Mod, ExHooks} <- find_attrs(App, exhooks)]).
+
+ignore_lib_apps(Apps) ->
+    LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
+               syntax_tools, ssl, crypto, mnesia, os_mon,
+               inets, goldrush, gproc, runtime_tools,
+               snmp, otp_mibs, public_key, asn1, ssh, hipe,
+               common_test, observer, webtool, xmerl, tools,
+               test_server, compiler, debugger, eunit, et,
+               wx],
+    [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)].
+
+find_attrs(App, Def) ->
+    [{App, Mod, Attr} || {ok, Modules} <- [application:get_key(App, modules)],
+                         Mod <- Modules,
+                         {Name, Attrs} <- module_attributes(Mod), Name =:= Def,
+                         Attr <- Attrs].
+
+module_attributes(Module) ->
+    try Module:module_info(attributes)
+    catch
+        error:undef -> [];
+        error:Reason -> error(Reason)
+    end.
+

+ 80 - 0
apps/emqx_exhook/src/emqx_exhook_cli.erl

@@ -0,0 +1,80 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_cli).
+
+-include("emqx_exhook.hrl").
+
+-export([cli/1]).
+
+cli(["server", "list"]) ->
+    if_enabled(fun() ->
+        Services = emqx_exhook:list(),
+        [emqx_ctl:print("HookServer(~s)~n", [emqx_exhook_server:format(Service)]) || Service <- Services]
+    end);
+
+cli(["server", "enable", Name0]) ->
+    if_enabled(fun() ->
+        Name = list_to_atom(Name0),
+        case proplists:get_value(Name, application:get_env(?APP, servers, [])) of
+            undefined ->
+                emqx_ctl:print("not_found~n");
+            Opts ->
+                print(emqx_exhook:enable(Name, Opts))
+        end
+    end);
+
+cli(["server", "disable", Name]) ->
+    if_enabled(fun() ->
+        print(emqx_exhook:disable(list_to_atom(Name)))
+    end);
+
+cli(["server", "stats"]) ->
+    if_enabled(fun() ->
+        [emqx_ctl:print("~-35s:~w~n", [Name, N]) || {Name, N} <- stats()]
+    end);
+
+cli(_) ->
+    emqx_ctl:usage([{"exhook server list", "List all running exhook server"},
+                    {"exhook server enable <Name>", "Enable a exhook server in the configuration"},
+                    {"exhook server disable <Name>", "Disable a exhook server"},
+                    {"exhook server stats", "Print exhook server statistic"}]).
+
+print(ok) ->
+    emqx_ctl:print("ok~n");
+print({error, Reason}) ->
+    emqx_ctl:print("~p~n", [Reason]).
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+if_enabled(Fun) ->
+    case lists:keymember(?APP, 1, application:which_applications()) of
+        true -> Fun();
+        _ -> hint()
+    end.
+
+hint() ->
+    emqx_ctl:print("Please './bin/emqx_ctl plugins load emqx_exhook' first.~n").
+
+stats() ->
+    lists:usort(lists:foldr(fun({K, N}, Acc) ->
+        case atom_to_list(K) of
+            "exhook." ++ Key -> [{Key, N}|Acc];
+            _ -> Acc
+        end
+    end, [], emqx_metrics:all())).

+ 288 - 0
apps/emqx_exhook/src/emqx_exhook_handler.erl

@@ -0,0 +1,288 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_handler).
+
+-include("emqx_exhook.hrl").
+-include_lib("emqx_libs/include/emqx.hrl").
+-include_lib("emqx_libs/include/logger.hrl").
+
+-logger_header("[ExHook]").
+
+-export([ on_client_connect/2
+        , on_client_connack/3
+        , on_client_connected/2
+        , on_client_disconnected/3
+        , on_client_authenticate/2
+        , on_client_check_acl/4
+        , on_client_subscribe/3
+        , on_client_unsubscribe/3
+        ]).
+
+%% Session Lifecircle Hooks
+-export([ on_session_created/2
+        , on_session_subscribed/3
+        , on_session_unsubscribed/3
+        , on_session_resumed/2
+        , on_session_discarded/2
+        , on_session_takeovered/2
+        , on_session_terminated/3
+        ]).
+
+%% Utils
+-export([ message/1
+        , stringfy/1
+        , merge_responsed_bool/2
+        , merge_responsed_message/2
+        , assign_to_message/2
+        , clientinfo/1
+        ]).
+
+-import(emqx_exhook,
+        [ cast/2
+        , call_fold/3
+        ]).
+
+-exhooks([ {'client.connect',      {?MODULE, on_client_connect,       []}}
+         , {'client.connack',      {?MODULE, on_client_connack,       []}}
+         , {'client.connected',    {?MODULE, on_client_connected,     []}}
+         , {'client.disconnected', {?MODULE, on_client_disconnected,  []}}
+         , {'client.authenticate', {?MODULE, on_client_authenticate,  []}}
+         , {'client.check_acl',    {?MODULE, on_client_check_acl,     []}}
+         , {'client.subscribe',    {?MODULE, on_client_subscribe,     []}}
+         , {'client.unsubscribe',  {?MODULE, on_client_unsubscribe,   []}}
+         , {'session.created',     {?MODULE, on_session_created,      []}}
+         , {'session.subscribed',  {?MODULE, on_session_subscribed,   []}}
+         , {'session.unsubscribed',{?MODULE, on_session_unsubscribed, []}}
+         , {'session.resumed',     {?MODULE, on_session_resumed,      []}}
+         , {'session.discarded',   {?MODULE, on_session_discarded,    []}}
+         , {'session.takeovered',  {?MODULE, on_session_takeovered,   []}}
+         , {'session.terminated',  {?MODULE, on_session_terminated,   []}}
+         ]).
+
+%%--------------------------------------------------------------------
+%% Clients
+%%--------------------------------------------------------------------
+
+on_client_connect(ConnInfo, Props) ->
+    Req = #{conninfo => conninfo(ConnInfo),
+            props => properties(Props)
+           },
+    cast('client.connect', Req).
+
+on_client_connack(ConnInfo, Rc, Props) ->
+    Req = #{conninfo => conninfo(ConnInfo),
+            result_code => stringfy(Rc),
+            props => properties(Props)},
+    cast('client.connack', Req).
+
+on_client_connected(ClientInfo, _ConnInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo)},
+    cast('client.connected', Req).
+
+on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            reason => stringfy(Reason)
+           },
+    cast('client.disconnected', Req).
+
+on_client_authenticate(ClientInfo, AuthResult) ->
+    Bool = maps:get(auth_result, AuthResult, undefined) == success,
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            result => Bool
+           },
+
+    case call_fold('client.authenticate', Req,
+                   fun merge_responsed_bool/2) of
+        {StopOrOk, #{result := Bool}} when is_boolean(Bool) ->
+            Result = case Bool of true -> success; _ -> not_authorized end,
+            {StopOrOk, AuthResult#{auth_result => Result, anonymous => false}};
+        _ ->
+            {ok, AuthResult}
+    end.
+
+on_client_check_acl(ClientInfo, PubSub, Topic, Result) ->
+    Bool = Result == allow,
+    Type = case PubSub of
+               publish -> 'PUBLISH';
+               subscribe -> 'SUBSCRIBE'
+           end,
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            type => Type,
+            topic => Topic,
+            result => Bool
+           },
+    case call_fold('client.check_acl', Req,
+                   fun merge_responsed_bool/2) of
+        {StopOrOk, #{result := Bool}} when is_boolean(Bool) ->
+            NResult = case Bool of true -> allow; _ -> deny end,
+            {StopOrOk, NResult};
+        _ -> {ok, Result}
+    end.
+
+on_client_subscribe(ClientInfo, Props, TopicFilters) ->
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            props => properties(Props),
+            topic_filters => topicfilters(TopicFilters)
+           },
+    cast('client.subscribe', Req).
+
+on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            props => properties(Props),
+            topic_filters => topicfilters(TopicFilters)
+           },
+    cast('client.unsubscribe', Req).
+
+%%--------------------------------------------------------------------
+%% Session
+%%--------------------------------------------------------------------
+
+on_session_created(ClientInfo, _SessInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo)},
+    cast('session.created', Req).
+
+on_session_subscribed(ClientInfo, Topic, SubOpts) ->
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            topic => Topic,
+            subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
+           },
+    cast('session.subscribed', Req).
+
+on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            topic => Topic
+           },
+    cast('session.unsubscribed', Req).
+
+on_session_resumed(ClientInfo, _SessInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo)},
+    cast('session.resumed', Req).
+
+on_session_discarded(ClientInfo, _SessInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo)},
+    cast('session.discarded', Req).
+
+on_session_takeovered(ClientInfo, _SessInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo)},
+    cast('session.takeovered', Req).
+
+on_session_terminated(ClientInfo, Reason, _SessInfo) ->
+    Req = #{clientinfo => clientinfo(ClientInfo),
+            reason => stringfy(Reason)},
+    cast('session.terminated', Req).
+
+%%--------------------------------------------------------------------
+%% Types
+
+properties(undefined) -> [];
+properties(M) when is_map(M) ->
+    maps:fold(fun(K, V, Acc) ->
+        [#{name => stringfy(K),
+           value => stringfy(V)} | Acc]
+    end, [], M).
+
+conninfo(_ConnInfo =
+           #{clientid := ClientId, username := Username, peername := {Peerhost, _},
+             sockname := {_, SockPort}, proto_name := ProtoName, proto_ver := ProtoVer,
+             keepalive := Keepalive}) ->
+    #{node => stringfy(node()),
+      clientid => ClientId,
+      username => maybe(Username),
+      peerhost => ntoa(Peerhost),
+      sockport => SockPort,
+      proto_name => ProtoName,
+      proto_ver => stringfy(ProtoVer),
+      keepalive => Keepalive}.
+
+clientinfo(ClientInfo =
+            #{clientid := ClientId, username := Username, peerhost := PeerHost,
+              sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) ->
+    #{node => stringfy(node()),
+      clientid => ClientId,
+      username => maybe(Username),
+      password => maybe(maps:get(password, ClientInfo, undefined)),
+      peerhost => ntoa(PeerHost),
+      sockport => SockPort,
+      protocol => stringfy(Protocol),
+      mountpoint => maybe(Mountpoiont),
+      is_superuser => maps:get(is_superuser, ClientInfo, false),
+      anonymous => maps:get(anonymous, ClientInfo, true)}.
+
+message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
+    #{node => stringfy(node()),
+      id => hexstr(Id),
+      qos => Qos,
+      from => stringfy(From),
+      topic => Topic,
+      payload => Payload,
+      timestamp => Ts}.
+
+assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
+    Message#message{qos = Qos, topic = Topic, payload = Payload}.
+
+topicfilters(Tfs) when is_list(Tfs) ->
+    [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+    list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
+ntoa(IP) ->
+    list_to_binary(inet_parse:ntoa(IP)).
+
+maybe(undefined) -> <<>>;
+maybe(B) -> B.
+
+%% @private
+stringfy(Term) when is_binary(Term) ->
+    Term;
+stringfy(Term) when is_integer(Term) ->
+    integer_to_binary(Term);
+stringfy(Term) when is_atom(Term) ->
+    atom_to_binary(Term, utf8);
+stringfy(Term) ->
+    unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
+
+hexstr(B) ->
+    iolist_to_binary([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(B)]).
+
+%%--------------------------------------------------------------------
+%% Acc funcs
+
+%% see exhook.proto
+merge_responsed_bool(Req, #{type := 'IGNORE'}) ->
+    {ok, Req};
+merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
+  when is_boolean(NewBool) ->
+    NReq = Req#{result => NewBool},
+    case Type of
+        'CONTINUE' -> {ok, NReq};
+        'STOP_AND_RETURN' -> {stop, NReq}
+    end;
+merge_responsed_bool(Req, Resp) ->
+    ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
+    {ok, Req}.
+
+merge_responsed_message(Req, #{type := 'IGNORE'}) ->
+    {ok, Req};
+merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
+    NReq = Req#{message => NMessage},
+    case Type of
+        'CONTINUE' -> {ok, NReq};
+        'STOP_AND_RETURN' -> {stop, NReq}
+    end;
+merge_responsed_message(Req, Resp) ->
+    ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
+    {ok, Req}.

+ 274 - 0
apps/emqx_exhook/src/emqx_exhook_server.erl

@@ -0,0 +1,274 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_server).
+
+-include_lib("emqx_libs/include/logger.hrl").
+
+-logger_header("[ExHook Svr]").
+
+-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
+
+%% Load/Unload
+-export([ load/2
+        , unload/1
+        ]).
+
+%% APIs
+-export([call/3]).
+
+%% Infos
+-export([ name/1
+        , format/1
+        ]).
+
+-record(server, {
+          %% Server name (equal to grpcbox client channel name)
+          name :: server_name(),
+          %% The server started options
+          options :: list(),
+          %% gRPC channel pid
+          channel :: pid(),
+          %% Registered hook names and options
+          hookspec :: #{hookpoint() => map()},
+          %% Metric fun
+          incfun :: function()
+       }).
+
+-type server_name() :: atom().
+-type server() :: #server{}.
+
+-type hookpoint() :: 'client.connect'
+                   | 'client.connack'
+                   | 'client.connected'
+                   | 'client.disconnected'
+                   | 'client.authenticate'
+                   | 'client.check_acl'
+                   | 'client.subscribe'
+                   | 'client.unsubscribe'
+                   | 'session.created'
+                   | 'session.subscribed'
+                   | 'session.unsubscribed'
+                   | 'session.resumed'
+                   | 'session.discarded'
+                   | 'session.takeovered'
+                   | 'session.terminated'
+                   | 'message.publish'
+                   | 'message.delivered'
+                   | 'message.acked'
+                   | 'message.dropped'.
+
+-export_type([server/0]).
+
+%%--------------------------------------------------------------------
+%% Load/Unload APIs
+%%--------------------------------------------------------------------
+
+-spec load(atom(), list()) -> {ok, server()} | {error, term()} .
+load(Name, Opts0) ->
+    {Endpoints, Options} = channel_opts(Opts0),
+    StartFun = case proplists:get_bool(inplace, Opts0) of
+                   true -> start_grpc_client_channel_inplace;
+                   _ -> start_grpc_client_channel
+               end,
+    case emqx_exhook_sup:StartFun(Name, Endpoints, Options) of
+        {ok, ChannPid} ->
+            case do_init(Name) of
+                {ok, HookSpecs} ->
+                    %% Reigster metrics
+                    Prefix = lists:flatten(io_lib:format("exhook.~s.", [Name])),
+                    ensure_metrics(Prefix, HookSpecs),
+                    {ok, #server{name = Name,
+                                  options = Opts0,
+                                  channel = ChannPid,
+                                  hookspec = HookSpecs,
+                                  incfun = incfun(Prefix) }};
+                {error, _} = E -> E
+            end;
+        {error, _} = E -> E
+    end.
+
+%% @private
+channel_opts(Opts) ->
+    Scheme = proplists:get_value(scheme, Opts),
+    Host = proplists:get_value(host, Opts),
+    Port = proplists:get_value(port, Opts),
+    Options = proplists:get_value(options, Opts, []),
+    SslOpts = case Scheme of
+                  https -> proplists:get_value(ssl_options, Opts, []);
+                  _ -> []
+              end,
+    {[{Scheme, Host, Port, SslOpts}], maps:from_list(Options)}.
+
+-spec unload(server()) -> ok.
+unload(#server{name = Name, channel = ChannPid, options = Options}) ->
+    _ = do_deinit(Name),
+    {StopFun, Args} = case proplists:get_bool(inplace, Options) of
+                          true -> {stop_grpc_client_channel_inplace, [ChannPid]};
+                          _ -> {stop_grpc_client_channel, [Name]}
+                      end,
+    apply(emqx_exhook_sup, StopFun, Args).
+
+do_deinit(Name) ->
+    _ = do_call(Name, 'on_provider_unloaded', #{}),
+    ok.
+
+do_init(ChannName) ->
+    Req = #{broker => maps:from_list(emqx_sys:info())},
+    case do_call(ChannName, 'on_provider_loaded', Req) of
+        {ok, InitialResp} ->
+            try
+                {ok, resovle_hookspec(maps:get(hooks, InitialResp, []))}
+            catch _:Reason:Stk ->
+                ?LOG(error, "try to init ~p failed, reason: ~p, stacktrace: ~0p",
+                             [ChannName, Reason, Stk]),
+                {error, Reason}
+            end;
+        {error, Reason} ->
+            {error, Reason}
+    end.
+
+%% @private
+resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
+    MessageHooks = message_hooks(),
+    AvailableHooks = available_hooks(),
+    lists:foldr(fun(HookSpec, Acc) ->
+        case maps:get(name, HookSpec, undefined) of
+            undefined -> Acc;
+            Name0 ->
+                Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
+                case lists:member(Name, AvailableHooks) of
+                    true ->
+                        case lists:member(Name, MessageHooks) of
+                            true ->
+                                Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
+                            _ ->
+                                Acc#{Name => #{}}
+                        end;
+                    _ -> error({unknown_hookpoint, Name})
+                end
+        end
+    end, #{}, HookSpecs).
+
+ensure_metrics(Prefix, HookSpecs) ->
+    Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint))
+            || Hookpoint <- maps:keys(HookSpecs)],
+    lists:foreach(fun emqx_metrics:ensure/1, Keys).
+
+incfun(Prefix) ->
+    fun(Name) ->
+        emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name)))
+    end.
+
+format(#server{name = Name, hookspec = Hooks}) ->
+    io_lib:format("name=~p, hooks=~0p", [Name, Hooks]).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+name(#server{name = Name}) ->
+    Name.
+
+-spec call(hookpoint(), map(), server())
+  -> ignore
+   | {ok, Resp :: term()}
+   | {error, term()}.
+call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, incfun = IncFun}) ->
+    GrpcFunc = hk2func(Hookpoint),
+    case maps:get(Hookpoint, Hooks, undefined) of
+        undefined -> ignore;
+        Opts ->
+            NeedCall  = case lists:member(Hookpoint, message_hooks()) of
+                            false -> true;
+                            _ ->
+                                #{message := #{topic := Topic}} = Req,
+                                match_topic_filter(Topic, maps:get(topics, Opts, []))
+                        end,
+            case NeedCall of
+                false -> ignore;
+                _ ->
+                    IncFun(Hookpoint),
+                    do_call(ChannName, GrpcFunc, Req)
+            end
+    end.
+
+-compile({inline, [match_topic_filter/2]}).
+match_topic_filter(_, []) ->
+    true;
+match_topic_filter(TopicName, TopicFilter) ->
+    lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
+
+-spec do_call(atom(), atom(), map()) -> {ok, map()} | {error, term()}.
+do_call(ChannName, Fun, Req) ->
+    Options = #{channel => ChannName},
+    ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
+    case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
+        {ok, Resp, _Metadata} ->
+            ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
+            {ok, Resp};
+        {error, {Code, Msg}, _Metadata} ->
+            ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
+                        [?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]),
+            {error, {Code, Msg}};
+        {error, Reason} ->
+            ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
+                        [?PB_CLIENT_MOD, Fun, Req, Options, Reason]),
+            {error, Reason};
+        {'EXIT', Reason, Stk} ->
+            ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~p",
+                        [?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]),
+            {error, Reason}
+    end.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+-compile({inline, [hk2func/1]}).
+hk2func('client.connect') -> 'on_client_connect';
+hk2func('client.connack') -> 'on_client_connack';
+hk2func('client.connected') -> 'on_client_connected';
+hk2func('client.disconnected') -> 'on_client_disconnected';
+hk2func('client.authenticate') -> 'on_client_authenticate';
+hk2func('client.check_acl') -> 'on_client_check_acl';
+hk2func('client.subscribe') -> 'on_client_subscribe';
+hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
+hk2func('session.created') -> 'on_session_created';
+hk2func('session.subscribed') -> 'on_session_subscribed';
+hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
+hk2func('session.resumed') -> 'on_session_resumed';
+hk2func('session.discarded') -> 'on_session_discarded';
+hk2func('session.takeovered') -> 'on_session_takeovered';
+hk2func('session.terminated') -> 'on_session_terminated';
+hk2func('message.publish') -> 'on_message_publish';
+hk2func('message.delivered') ->'on_message_delivered';
+hk2func('message.acked') -> 'on_message_acked';
+hk2func('message.dropped') ->'on_message_dropped'.
+
+-compile({inline, [message_hooks/0]}).
+message_hooks() ->
+    ['message.publish', 'message.delivered',
+     'message.acked', 'message.dropped'].
+
+-compile({inline, [available_hooks/0]}).
+available_hooks() ->
+    ['client.connect', 'client.connack', 'client.connected',
+     'client.disconnected', 'client.authenticate', 'client.check_acl',
+     'client.subscribe', 'client.unsubscribe',
+     'session.created', 'session.subscribed', 'session.unsubscribed',
+     'session.resumed', 'session.discarded', 'session.takeovered',
+     'session.terminated' | message_hooks()].

+ 74 - 0
apps/emqx_exhook/src/emqx_exhook_sup.erl

@@ -0,0 +1,74 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_sup).
+
+-behaviour(supervisor).
+
+-export([ start_link/0
+        , init/1
+        ]).
+
+-export([ start_grpc_client_channel/3
+        , stop_grpc_client_channel/1
+        ]).
+
+-export([ start_grpc_client_channel_inplace/3
+        , stop_grpc_client_channel_inplace/1
+        ]).
+
+%%--------------------------------------------------------------------
+%%  Supervisor APIs & Callbacks
+%%--------------------------------------------------------------------
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+    {ok, {{one_for_one, 10, 100}, []}}.
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+-spec start_grpc_client_channel(
+        atom() | string(),
+        [grpcbox_channel:endpoint()],
+        grpcbox_channel:options()) -> {ok, pid()} | {error, term()}.
+start_grpc_client_channel(Name, Endpoints, Options0) ->
+    Options = Options0#{sync_start => true},
+    Spec = #{id => Name,
+             start => {grpcbox_channel, start_link, [Name, Endpoints, Options]},
+             type => worker},
+
+    supervisor:start_child(?MODULE, Spec).
+
+-spec stop_grpc_client_channel(atom()) -> ok.
+stop_grpc_client_channel(Name) ->
+    ok = supervisor:terminate_child(?MODULE, Name),
+    ok = supervisor:delete_child(?MODULE, Name).
+
+-spec start_grpc_client_channel_inplace(
+        atom() | string(),
+        [grpcbox_channel:endpoint()],
+        grpcbox_channel:options()) -> {ok, pid()} | {error, term()}.
+start_grpc_client_channel_inplace(Name, Endpoints, Options0) ->
+    Options = Options0#{sync_start => true},
+    grpcbox_channel_sup:start_child(Name, Endpoints, Options).
+
+-spec stop_grpc_client_channel_inplace(pid()) -> ok.
+stop_grpc_client_channel_inplace(Pid) ->
+    ok = supervisor:terminate_child(grpcbox_channel_sup, Pid).

+ 53 - 0
apps/emqx_exhook/test/emqx_exhook_SUITE.erl

@@ -0,0 +1,53 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+%%--------------------------------------------------------------------
+%% Setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Cfg) ->
+    _ = emqx_exhook_demo_svr:start(),
+    emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
+    Cfg.
+
+end_per_suite(Cfg) ->
+    emqx_exhook_demo_svr:stop(),
+    emqx_ct_helpers:stop_apps([emqx_exhook]).
+
+set_special_cfgs(emqx) ->
+    application:set_env(emqx, allow_anonymous, false),
+    application:set_env(emqx, enable_acl_cache, false),
+    application:set_env(emqx, plugins_loaded_file,
+                        emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
+set_special_cfgs(emqx_exhook) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_hooks(Cfg) ->
+    ok.

+ 299 - 0
apps/emqx_exhook/test/emqx_exhook_demo_svr.erl

@@ -0,0 +1,299 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_demo_svr).
+
+-behavior(emqx_exhook_v_1_hook_provider_bhvr).
+
+%%
+-export([ start/0
+        , stop/0
+        , take/0
+        , in/1
+        ]).
+
+%% gRPC server HookProvider callbacks
+-export([ on_provider_loaded/2
+        , on_provider_unloaded/2
+        , on_client_connect/2
+        , on_client_connack/2
+        , on_client_connected/2
+        , on_client_disconnected/2
+        , on_client_authenticate/2
+        , on_client_check_acl/2
+        , on_client_subscribe/2
+        , on_client_unsubscribe/2
+        , on_session_created/2
+        , on_session_subscribed/2
+        , on_session_unsubscribed/2
+        , on_session_resumed/2
+        , on_session_discarded/2
+        , on_session_takeovered/2
+        , on_session_terminated/2
+        , on_message_publish/2
+        , on_message_delivered/2
+        , on_message_dropped/2
+        , on_message_acked/2
+        ]).
+
+-define(PORT, 9000).
+
+-define(HTTP, #{grpc_opts => #{service_protos => [emqx_exhook_pb],
+                               services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}},
+                listen_opts => #{port => ?PORT,
+                                 socket_options => [{reuseaddr, true}]},
+                pool_opts => #{size => 8},
+                transport_opts => #{ssl => false}}).
+
+%%--------------------------------------------------------------------
+%% Server APIs
+%%--------------------------------------------------------------------
+
+start() ->
+    Pid = spawn(fun mngr_main/0),
+    register(?MODULE, Pid),
+    {ok, Pid}.
+
+stop() ->
+    ?MODULE ! stop.
+
+take() ->
+    ?MODULE ! {take, self()},
+    receive {value, V} -> V
+    after 5000 -> error(timeout) end.
+
+in({FunName, Req}) ->
+    ?MODULE ! {in, FunName, Req}.
+
+mngr_main() ->
+    application:ensure_all_started(grpcbox),
+    Svr = grpcbox:start_server(?HTTP),
+    mngr_loop([Svr, queue:new(), queue:new()]).
+
+mngr_loop([Svr, Q, Takes]) ->
+    receive
+        {in, FunName, Req} ->
+            {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes),
+            mngr_loop([Svr, NQ1, NQ2]);
+        {take, From} ->
+            {NQ1, NQ2} = reply(Q, queue:in(From, Takes)),
+            mngr_loop([Svr, NQ1, NQ2]);
+        stop ->
+            supervisor:terminate_child(grpcbox_services_simple_sup, Svr),
+            exit(normal)
+    end.
+
+reply(Q1, Q2) ->
+    case queue:len(Q1) =:= 0 orelse
+         queue:len(Q2) =:= 0 of
+        true -> {Q1, Q2};
+        _ ->
+            {{value, {Name, V}}, NQ1} = queue:out(Q1),
+            {{value, From}, NQ2} = queue:out(Q2),
+            From ! {value, {Name, V}},
+            {NQ1, NQ2}
+    end.
+
+%%--------------------------------------------------------------------
+%% callbacks
+%%--------------------------------------------------------------------
+
+-spec on_provider_loaded(ctx:ctx(), emqx_exhook_pb:on_provider_loadedial_request())
+  -> {ok, emqx_exhook_pb:on_provider_loaded_response(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_provider_loaded(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{hooks => [
+                     #{name => <<"client.connect">>},
+                     #{name => <<"client.connack">>},
+                     #{name => <<"client.connected">>},
+                     #{name => <<"client.disconnected">>},
+                     #{name => <<"client.authenticate">>},
+                     #{name => <<"client.check_acl">>},
+                     #{name => <<"client.subscribe">>},
+                     #{name => <<"client.unsubscribe">>},
+                     #{name => <<"session.created">>},
+                     #{name => <<"session.subscribed">>},
+                     #{name => <<"session.unsubscribed">>},
+                     #{name => <<"session.resumed">>},
+                     #{name => <<"session.discarded">>},
+                     #{name => <<"session.takeovered">>},
+                     #{name => <<"session.terminated">>},
+                     #{name => <<"message.publish">>},
+                     #{name => <<"message.delivered">>},
+                     #{name => <<"message.acked">>},
+                     #{name => <<"message.dropped">>}]}, Ctx}.
+
+-spec on_provider_unloaded(ctx:ctx(), emqx_exhook_pb:on_provider_unloadedial_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_provider_unloaded(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_client_connect(ctx:ctx(), emqx_exhook_pb:client_connect_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_connect(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_client_connack(ctx:ctx(), emqx_exhook_pb:client_connack_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_connack(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_client_connected(ctx:ctx(), emqx_exhook_pb:client_connected_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_connected(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_client_disconnected(ctx:ctx(), emqx_exhook_pb:client_disconnected_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_disconnected(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_client_authenticate(ctx:ctx(), emqx_exhook_pb:client_authenticate_request())
+  -> {ok, emqx_exhook_pb:bool_result(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_authenticate(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{type => 'IGNORE'}, Ctx}.
+
+-spec on_client_check_acl(ctx:ctx(), emqx_exhook_pb:client_check_acl_request())
+  -> {ok, emqx_exhook_pb:bool_result(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_check_acl(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{type => 'STOP_AND_RETURN', value => {bool_result, true}}, Ctx}.
+
+-spec on_client_subscribe(ctx:ctx(), emqx_exhook_pb:client_subscribe_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_subscribe(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_client_unsubscribe(ctx:ctx(), emqx_exhook_pb:client_unsubscribe_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_client_unsubscribe(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_created(ctx:ctx(), emqx_exhook_pb:session_created_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_created(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_subscribed(ctx:ctx(), emqx_exhook_pb:session_subscribed_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_subscribed(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_unsubscribed(ctx:ctx(), emqx_exhook_pb:session_unsubscribed_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_unsubscribed(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_resumed(ctx:ctx(), emqx_exhook_pb:session_resumed_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_resumed(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_discarded(ctx:ctx(), emqx_exhook_pb:session_discarded_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_discarded(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_takeovered(ctx:ctx(), emqx_exhook_pb:session_takeovered_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_takeovered(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_session_terminated(ctx:ctx(), emqx_exhook_pb:session_terminated_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_session_terminated(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_message_publish(ctx:ctx(), emqx_exhook_pb:message_publish_request())
+  -> {ok, emqx_exhook_pb:valued_response(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_message_publish(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_message_delivered(ctx:ctx(), emqx_exhook_pb:message_delivered_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_message_delivered(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_message_dropped(ctx:ctx(), emqx_exhook_pb:message_dropped_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_message_dropped(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.
+
+-spec on_message_acked(ctx:ctx(), emqx_exhook_pb:message_acked_request())
+  -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+   | grpcbox_stream:grpc_error_response().
+on_message_acked(Ctx, Req) ->
+    ?MODULE:in({?FUNCTION_NAME, Req}),
+    %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+    {ok, #{}, Ctx}.

+ 535 - 0
apps/emqx_exhook/test/props/prop_exhook_hooks.erl

@@ -0,0 +1,535 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(prop_exhook_hooks).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-import(emqx_ct_proper_types,
+        [ conninfo/0
+        , clientinfo/0
+        , sessioninfo/0
+        , message/0
+        , connack_return_code/0
+        , topictab/0
+        , topic/0
+        , subopts/0
+        ]).
+
+-define(ALL(Vars, Types, Exprs),
+        ?SETUP(fun() ->
+            State = do_setup(),
+            fun() -> do_teardown(State) end
+         end, ?FORALL(Vars, Types, Exprs))).
+
+%%--------------------------------------------------------------------
+%% Properties
+%%--------------------------------------------------------------------
+
+prop_client_connect() ->
+    ?ALL({ConnInfo, ConnProps},
+         {conninfo(), conn_properties()},
+       begin
+           _OutConnProps = emqx_hooks:run_fold('client.connect', [ConnInfo], ConnProps),
+           {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
+           Expected =
+               #{props => properties(ConnProps),
+                 conninfo =>
+                   #{node => nodestr(),
+                     clientid => maps:get(clientid, ConnInfo),
+                     username => maybe(maps:get(username, ConnInfo, <<>>)),
+                     peerhost => peerhost(ConnInfo),
+                     sockport => sockport(ConnInfo),
+                     proto_name => maps:get(proto_name, ConnInfo),
+                     proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
+                     keepalive => maps:get(keepalive, ConnInfo)
+                    }
+                },
+           ?assertEqual(Expected, Resp),
+           true
+       end).
+
+prop_client_connack() ->
+    ?ALL({ConnInfo, Rc, AckProps},
+         {conninfo(), connack_return_code(), ack_properties()},
+        begin
+            _OutAckProps = emqx_hooks:run_fold('client.connack', [ConnInfo, Rc], AckProps),
+            {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{props => properties(AckProps),
+                  result_code => atom_to_binary(Rc, utf8),
+                  conninfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ConnInfo),
+                      username => maybe(maps:get(username, ConnInfo, <<>>)),
+                      peerhost => peerhost(ConnInfo),
+                      sockport => sockport(ConnInfo),
+                      proto_name => maps:get(proto_name, ConnInfo),
+                      proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
+                      keepalive => maps:get(keepalive, ConnInfo)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_client_authenticate() ->
+    ?ALL({ClientInfo, AuthResult}, {clientinfo(), authresult()},
+        begin
+            _OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
+            {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{result => authresult_to_bool(AuthResult),
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_client_check_acl() ->
+    ?ALL({ClientInfo, PubSub, Topic, Result},
+         {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny])},
+        begin
+            _OutResult = emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Result),
+            {'on_client_check_acl', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{result => aclresult_to_bool(Result),
+                  type => pubsub_to_enum(PubSub),
+                  topic => Topic,
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+
+prop_client_connected() ->
+    ?ALL({ClientInfo, ConnInfo},
+         {clientinfo(), conninfo()},
+        begin
+            ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
+            {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_client_disconnected() ->
+    ?ALL({ClientInfo, Reason, ConnInfo},
+         {clientinfo(), shutdown_reason(), conninfo()},
+        begin
+            ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
+            {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{reason => stringfy(Reason),
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_client_subscribe() ->
+    ?ALL({ClientInfo, SubProps, TopicTab},
+         {clientinfo(), sub_properties(), topictab()},
+        begin
+            _OutTopicTab = emqx_hooks:run_fold('client.subscribe', [ClientInfo, SubProps], TopicTab),
+            {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{props => properties(SubProps),
+                  topic_filters => topicfilters(TopicTab),
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_client_unsubscribe() ->
+    ?ALL({ClientInfo, UnSubProps, TopicTab},
+         {clientinfo(), unsub_properties(), topictab()},
+        begin
+            _OutTopicTab = emqx_hooks:run_fold('client.unsubscribe', [ClientInfo, UnSubProps], TopicTab),
+            {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{props => properties(UnSubProps),
+                  topic_filters => topicfilters(TopicTab),
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_created() ->
+    ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+        begin
+            ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
+            {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+             ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_subscribed() ->
+    ?ALL({ClientInfo, Topic, SubOpts},
+         {clientinfo(), topic(), subopts()},
+        begin
+            ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
+            {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{topic => Topic,
+                  subopts => subopts(SubOpts),
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_unsubscribed() ->
+    ?ALL({ClientInfo, Topic, SubOpts},
+         {clientinfo(), topic(), subopts()},
+        begin
+            ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
+            {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{topic => Topic,
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_resumed() ->
+    ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+        begin
+            ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
+            {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_discared() ->
+    ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+        begin
+            ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
+            {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_takeovered() ->
+    ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+        begin
+            ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]),
+            {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+            true
+        end).
+
+prop_session_terminated() ->
+    ?ALL({ClientInfo, Reason, SessInfo},
+         {clientinfo(), shutdown_reason(), sessioninfo()},
+        begin
+            ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
+            {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
+            Expected =
+                #{reason => stringfy(Reason),
+                  clientinfo =>
+                    #{node => nodestr(),
+                      clientid => maps:get(clientid, ClientInfo),
+                      username => maybe(maps:get(username, ClientInfo, <<>>)),
+                      password => maybe(maps:get(password, ClientInfo, <<>>)),
+                      peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+                      sockport => maps:get(sockport, ClientInfo),
+                      protocol => stringfy(maps:get(protocol, ClientInfo)),
+                      mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+                      is_superuser => maps:get(is_superuser, ClientInfo, false),
+                      anonymous => maps:get(anonymous, ClientInfo, true)
+                     }
+                 },
+            ?assertEqual(Expected, Resp),
+
+            true
+        end).
+
+nodestr() ->
+    stringfy(node()).
+
+peerhost(#{peername := {Host, _}}) ->
+    ntoa(Host).
+
+sockport(#{sockname := {_, Port}}) ->
+    Port.
+
+%% copied from emqx_exhook
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+    list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
+ntoa(IP) ->
+    list_to_binary(inet_parse:ntoa(IP)).
+
+maybe(undefined) -> <<>>;
+maybe(B) -> B.
+
+properties(undefined) -> [];
+properties(M) when is_map(M) ->
+    maps:fold(fun(K, V, Acc) ->
+        [#{name => stringfy(K),
+           value => stringfy(V)} | Acc]
+    end, [], M).
+
+topicfilters(Tfs) when is_list(Tfs) ->
+    [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
+
+%% @private
+stringfy(Term) when is_binary(Term) ->
+    Term;
+stringfy(Term) when is_integer(Term) ->
+    integer_to_binary(Term);
+stringfy(Term) when is_atom(Term) ->
+    atom_to_binary(Term, utf8);
+stringfy(Term) ->
+    unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
+
+subopts(SubOpts) ->
+    #{qos => maps:get(qos, SubOpts, 0),
+      rh => maps:get(rh, SubOpts, 0),
+      rap => maps:get(rap, SubOpts, 0),
+      nl => maps:get(nl, SubOpts, 0),
+      share => maps:get(share, SubOpts, <<>>)
+     }.
+
+authresult_to_bool(AuthResult) ->
+    maps:get(auth_result, AuthResult, undefined) == success.
+
+aclresult_to_bool(Result) ->
+    Result == allow.
+
+pubsub_to_enum(publish) -> 'PUBLISH';
+pubsub_to_enum(subscribe) -> 'SUBSCRIBE'.
+
+%prop_message_publish() ->
+%    ?ALL({Msg, Env, Encode}, {message(), topic_filter_env()},
+%        begin
+%            true
+%        end).
+%
+%prop_message_delivered() ->
+%    ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env()},
+%        begin
+%            true
+%        end).
+%
+%prop_message_acked() ->
+%    ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message()},
+%        begin
+%            true
+%        end).
+
+%%--------------------------------------------------------------------
+%% Helper
+%%--------------------------------------------------------------------
+
+do_setup() ->
+    _ = emqx_exhook_demo_svr:start(),
+    emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
+    %% waiting first loaded event
+    {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
+    ok.
+
+do_teardown(_) ->
+    emqx_ct_helpers:stop_apps([emqx_exhook]),
+    %% waiting last unloaded event
+    {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
+    _ = emqx_exhook_demo_svr:stop(),
+    ok.
+
+set_special_cfgs(emqx) ->
+    application:set_env(emqx, allow_anonymous, false),
+    application:set_env(emqx, enable_acl_cache, false),
+    application:set_env(emqx, plugins_loaded_file,
+                        emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
+set_special_cfgs(emqx_exhook) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% Generators
+%%--------------------------------------------------------------------
+
+conn_properties() ->
+    #{}.
+
+ack_properties() ->
+    #{}.
+
+sub_properties() ->
+    #{}.
+
+unsub_properties() ->
+    #{}.
+
+shutdown_reason() ->
+    oneof([utf8(), {shutdown, atom()}]).
+
+authresult() ->
+    #{auth_result => connack_return_code()}.
+
+%topic_filter_env() ->
+%    oneof([{<<"#">>}, {undefined}, {topic()}]).

+ 15 - 0
etc/emqx_cloud.d/emqx_exhook.conf

@@ -0,0 +1,15 @@
+##====================================================================
+## EMQ X Hooks
+##====================================================================
+
+##--------------------------------------------------------------------
+## Server Address
+
+## The gRPC server url
+##
+## exhook.server.$name.url = url()
+exhook.server.default.url = http://127.0.0.1:9000
+
+#exhook.server.default.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
+#exhook.server.default.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem
+#exhook.server.default.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem

+ 199 - 40
rebar.config

@@ -1,49 +1,208 @@
 {minimum_otp_vsn, "21.3"}.
-
-{plugins, [rebar3_proper]}.
-
-{deps,
- [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
-  {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
-  {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
-  {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}},
-  {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}},
-  {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
-  {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
- ]}.
-
-{erl_opts, [warn_unused_vars,
-            warn_shadow_vars,
-            warn_unused_import,
-            warn_obsolete_guard,
-            debug_info,
-            compressed %% for edge
+{plugins,[{relup_helper,{git,"https://github.com/emqx/relup_helper",
+                             {branch,"master"}}}]}.
+{edge_relx_overlay,[]}.
+{edoc_opts,[{preprocess,true}]}.
+{erl_opts,[warn_unused_vars,warn_shadow_vars,warn_unused_import,
+           warn_obsolete_guard,no_debug_info,compressed]}.
+{overrides,[{add,[{erl_opts,[no_debug_info,compressed,
+                             {parse_transform,mod_vsn}]}]}]}.
+{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
+              deprecated_function_calls,warnings_as_errors,
+              deprecated_functions]}.
+{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]}
            ]}.
+{cover_enabled,true}.
+{cover_opts,[verbose]}.
+{cover_export_enabled,true}.
+{provider_hooks,[{pre,[{release,{relup_helper,gen_appups}}]},
+                 {post,[{release,{relup_helper,otp_vsn}},
+                        {release,{relup_helper,untar}}]}]}.
+{post_hooks,[]}.
+{erl_first_files, ["apps/emqx/src/emqx_logger.erl"]}.
 
-{overrides, [{add, [{erl_opts, [compressed]}]}]}.
+"${COVERALL_CONFIGS}".
 
-{edoc_opts, [{preprocess, true}]}.
+{deps, ["${BASIC_DEPS}"]}.
 
-{xref_checks, [undefined_function_calls, undefined_functions,
-               locals_not_used, deprecated_function_calls,
-               warnings_as_errors, deprecated_functions
-              ]}.
+{profiles,
+    [{'emqx',
+        [{relx,
+             ["${BASIC_RELX_PARAMS}",
+              {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}", "${CLOUD_RELX_APPS}"]},
+              {overlay, ["${BASIC_OVERLAYS}", "${CLOUD_OVERLAYS}"]},
+              {overlay_vars,["vars/vars-cloud.config","vars/vars-bin.config"]}]}]},
+     {'emqx-pkg',
+         [{relx,
+              ["${BASIC_RELX_PARAMS}",
+               {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}", "${CLOUD_RELX_APPS}"]},
+               {overlay, ["${BASIC_OVERLAYS}", "${CLOUD_OVERLAYS}"]},
+               {overlay_vars,["vars/vars-cloud.config","vars/vars-pkg.config"]}]}]},
+     {'emqx-edge',
+         [{relx,
+              ["${BASIC_RELX_PARAMS}",
+               {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}"]},
+               {overlay, ["${BASIC_OVERLAYS}", "${EDGE_OVERLAYS}"]},
+               {overlay_vars,["vars/vars-edge.config","vars/vars-bin.config"]}]}]},
+     {'emqx-edge-pkg',
+         [{relx,
+              ["${BASIC_RELX_PARAMS}",
+               {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}"]},
+               {overlay, ["${BASIC_OVERLAYS}", "${EDGE_OVERLAYS}"]},
+               {overlay_vars,["vars/vars-edge.config","vars/vars-pkg.config"]}]}]},
+     {test,
+        [
+          {plugins, [
+            rebar3_proper,
+            {coveralls,
+              {git, "https://github.com/emqx/coveralls-erl",
+                {branch, "github"}}}]},
+          {deps,
+          [ {bbmustache, "1.7.0"},
+            {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}},
+            {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}},
+            meck
+          ]},
+          {erl_opts, [debug_info]}
+        ]}
+    ]}.
 
-{cover_enabled, true}.
-{cover_opts, [verbose]}.
-{cover_export_enabled, true}.
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Placeholders
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+{placeholders, [
 
-{erl_first_files, ["src/emqx_logger.erl"]}.
+  {"BASIC_DEPS", elems,
+    [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
+    , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
+    , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}
+    , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.2"}}}
+    , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}}
+    , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}
+    , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
+    , {emqx_passwd, {git, "https://github.com/emqx/emqx-passwd", {tag, "v1.1.1"}}}
+    , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.0"}}}
+    , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "v0.4.2"}}}
+    , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}}
+    , {erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}}
+    , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}}
+    , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
+    , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.1"}}}
+    , {getopt, "1.0.1"}
+    ]
+  },
 
-{profiles,
- [{test,
-   [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]},
-    {deps,
-     [{bbmustache, "1.7.0"},
-      {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}},
-      {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}
-     ]},
-    {erl_opts, [debug_info]}
-   ]}
- ]}.
+  {"BASIC_RELX_PARAMS", elems,
+    [ {include_src,false}
+    , {extended_start_script,false}
+    , {generate_start_script,false}
+    , {sys_config,false}
+    , {vm_args,false}
+    ]
+  },
+
+  {"BASIC_RELX_APPS", elems,
+    [ kernel
+    , sasl
+    , crypto
+    , public_key
+    , asn1
+    , syntax_tools
+    , ssl
+    , os_mon
+    , inets
+    , compiler
+    , runtime_tools
+    , cuttlefish
+    , emqx
+    , {mnesia, load}
+    , {ekka, load}
+    , {emqx_retainer, load}
+    , {emqx_management, load}
+    , {emqx_dashboard, load}
+    , {emqx_bridge_mqtt, load}
+    , {emqx_sn, load}
+    , {emqx_coap, load}
+    , {emqx_stomp, load}
+    , {emqx_auth_clientid, load}
+    , {emqx_auth_username, load}
+    , {emqx_auth_http, load}
+    , {emqx_auth_mysql, load}
+    , {emqx_auth_jwt, load}
+    , {emqx_auth_mnesia, load}
+    , {emqx_web_hook, load}
+    , {emqx_recon, load}
+    , {emqx_rule_engine, load}
+    , {emqx_sasl, load}
+    , {emqx_telemetry, load}
+    ]
+  },
+
+  {"CLOUD_RELX_APPS", elems,
+    [ {emqx_lwm2m, load}
+    , {emqx_auth_ldap, load}
+    , {emqx_auth_pgsql, load}
+    , {emqx_auth_redis, load}
+    , {emqx_auth_mongo, load}
+    , {emqx_lua_hook, load}
+    , {emqx_exhook, load}
+    , {emqx_exproto, load}
+    , {emqx_prometheus, load}
+    , {emqx_psk_file, load}
+    , {emqx_plugin_template, load}
+    , {observer, load}
+    , luerl
+    , xmerl
+    ]
+  },
+
+  {"BASIC_OVERLAYS", elems,
+    [ {mkdir,"etc/"}
+    , {mkdir,"etc/emqx.d/"}
+    , {mkdir,"log/"}
+    , {mkdir,"data/"}
+    , {mkdir,"data/mnesia"}
+    , {mkdir,"data/configs"}
+    , {mkdir,"data/scripts"}
+    , {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"}
+    , {template, "data/loaded_modules.tmpl", "data/loaded_modules"}
+    , {template,"data/emqx_vars","releases/emqx_vars"}
+    , {copy,"{{lib_dirs}}/cuttlefish/cuttlefish","bin/"}
+    , {copy,"bin/*","bin/"}
+    , {template,"etc/*.conf","etc/"}
+    , {template,"etc/emqx.d/*.conf","etc/emqx.d/"}
+    , {copy,"{{lib_dirs}}/emqx/priv/emqx.schema","releases/{{rel_vsn}}/"}
+    , {copy, "etc/certs","etc/"}
+    , "${RELUP_OVERLAYS}"
+    ]
+  },
+
+  {"RELUP_OVERLAYS", elems,
+    [ {copy,"bin/emqx.cmd","bin/emqx.cmd-{{rel_vsn}}"}
+    , {copy,"bin/emqx_ctl.cmd","bin/emqx_ctl.cmd-{{rel_vsn}}"}
+    , {copy,"bin/emqx","bin/emqx-{{rel_vsn}}"}
+    , {copy,"bin/emqx_ctl","bin/emqx_ctl-{{rel_vsn}}"}
+    , {copy,"bin/install_upgrade.escript", "bin/install_upgrade.escript-{{rel_vsn}}"}
+    , {copy,"bin/nodetool","bin/nodetool-{{rel_vsn}}"}
+    , {copy,"{{lib_dirs}}/cuttlefish/cuttlefish","bin/cuttlefish-{{rel_vsn}}"}
+    ]
+  },
+
+  {"CLOUD_OVERLAYS", elems,
+    [ {template,"etc/emqx_cloud.d/*.conf","etc/emqx.d/"}
+    , {template,"etc/emqx_cloud.d/vm.args","etc/vm.args"}
+    ]
+  },
+
+  {"EDGE_OVERLAYS", elems,
+    [ {template,"etc/emqx_edge.d/*.conf","etc/emqx.d/"}
+    , {template,"etc/emqx_edge.d/vm.args.edge","etc/vm.args"}
+    ]
+  },
+
+  {"GIT_DESC", var, fun mod_project:get_vsn/1},
+
+  {"COVERALL_CONFIGS", elems, fun mod_project:coveralls_configs/1}
+
+]}.