在 EMQX v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力:
emqx-extension-hook 提供了使用 Java, Python 向 Broker 挂载钩子的功能emqx-exproto 提供了使用 Java,Python 编写用户自定义协议接入插件的功能但在后续的支持中发现许多难以处理的问题:
erlport 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。erlport 会占用 stdin stdout。因此,我们计划重构这部分的实现,其中主要的内容是:
gRPC 替换 erlport。emqx-extension-hook 重命名为 emqx-exhook旧版本的设计:emqx-extension-hook design in v4.2.0
架构如下:
EMQX
+========================+ +========+==========+
| ExHook | | | |
| +----------------+ | gRPC | gRPC | User's |
| | gRPC Client | ------------------> | Server | Codes |
| +----------------+ | (HTTP/2) | | |
| | | | |
+========================+ +========+==========+
emqx-exhook 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。
和 emqx 原生的钩子一致,emqx-exhook 也按照链式的方式执行:

用户需要实现的方法,和数据类型的定义在 priv/protos/exhook.proto 文件中:
syntax = "proto3";
package emqx.exhook.v2;
service HookProvider {
rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
rpc OnClientAuthorize(ClientAuthorizeRequest) returns (ValuedResponse) {};
rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
rpc OnSessionTakenover(SessionTakenoverRequest) returns (EmptySuccess) {};
rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}
exhook: {
## 配置 gRPC 服务地址 (HTTP)
##
## default 为服务器的名称
server.default: {
url: "http://127.0.0.1:9000"
}
}