design.rst 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. .. _design:
  2. ======
  3. Design
  4. ======
  5. .. _design_architecture:
  6. ------------
  7. Architecture
  8. ------------
  9. The emqttd broker 1.0 is more like a network Switch or Router, not a traditional enterprise message queue. Compared to a network router that routes packets based on IP or MPLS label, the emqttd broker routes MQTT messages based on topic trie.
  10. .. image:: _static/images/concept.png
  11. The EMQ 2.0 seperated the Message Flow Plane and Monitor/Control Plane, the Architecture is something like::
  12. Control Plane
  13. --------------------
  14. | |
  15. FrontEnd -> | Flow Plane | -> BackEnd
  16. | |
  17. Session Router
  18. ---------------------
  19. Monitor Plane
  20. Design Philosophy
  21. -----------------
  22. 1. Focus on handling millions of MQTT connections and routing MQTT messages between clustered nodes.
  23. 2. Embrace Erlang/OTP, The Soft-Realtime, Low-Latency, Concurrent and Fault-Tolerant Platform.
  24. 3. Layered Design: Connection, Session, PubSub and Router Layers.
  25. 4. Separate the Message Flow Plane and the Control/Management Plane.
  26. 5. Stream MQTT messages to various backends including MQ or databases.
  27. System Layers
  28. -------------
  29. 1. Connection Layer
  30. Handle TCP and WebSocket connections, encode/decode MQTT packets.
  31. 2. Session Layer
  32. Process MQTT PUBLISH/SUBSCRIBE Packets received from client, and deliver MQTT messages to client.
  33. 3. PubSub Layer
  34. Dispatch MQTT messages to subscribers in a node.
  35. 4. Routing(Distributed) Layer
  36. Route MQTT messages among clustered nodes.
  37. ----------------
  38. Connection Layer
  39. ----------------
  40. This layer is built on the `eSockd`_ library which is a general Non-blocking TCP/SSL Socket Server:
  41. * Acceptor Pool and Asynchronous TCP Accept
  42. * Parameterized Connection Module
  43. * Max connections management
  44. * Allow/Deny by peer address or CIDR
  45. * Keepalive Support
  46. * Rate Limit based on The Leaky Bucket Algorithm
  47. * Fully Asynchronous TCP RECV/SEND
  48. This layer is also responsible for encoding/decoding MQTT frames:
  49. 1. Parse MQTT frames received from client
  50. 2. Serialize MQTT frames sent to client
  51. 3. MQTT Connection Keepalive
  52. Main erlang modules of this layer:
  53. +------------------+--------------------------+
  54. | Module | Description |
  55. +==================+==========================+
  56. | emqttd_client | TCP Client |
  57. +------------------+--------------------------+
  58. | emqttd_ws_client | WebSocket Client |
  59. +------------------+--------------------------+
  60. | emqttd_protocol | MQTT Protocol Handler |
  61. +------------------+--------------------------+
  62. | emqttd_parser | MQTT Frame Parser |
  63. +------------------+--------------------------+
  64. | emqttd_serializer| MQTT Frame Serializer |
  65. +------------------+--------------------------+
  66. -------------
  67. Session Layer
  68. -------------
  69. The session layer processes MQTT packets received from client and delivers PUBLISH packets to client.
  70. A MQTT session will store the subscriptions and inflight messages in memory:
  71. 1. The Client’s subscriptions.
  72. 2. Inflight qos1/2 messages sent to the client but unacked, QoS 2 messages which
  73. have been sent to the Client, but have not been completely acknowledged.
  74. 3. Inflight qos2 messages received from client and waiting for PUBREL. QoS 2
  75. messages which have been received from the Client, but have not been
  76. completely acknowledged.
  77. 4. All qos1, qos2 messages published to when client is disconnected.
  78. MQueue and Inflight Window
  79. --------------------------
  80. Concept of Message Queue and Inflight Window::
  81. |<----------------- Max Len ----------------->|
  82. -----------------------------------------------
  83. IN -> | Messages Queue | Inflight Window | -> Out
  84. -----------------------------------------------
  85. |<--- Win Size --->|
  86. 1. Inflight Window to store the messages delivered and await for PUBACK.
  87. 2. Enqueue messages when the inflight window is full.
  88. 3. If the queue is full, drop qos0 messages if store_qos0 is true, otherwise drop the oldest one.
  89. The larger the inflight window size is, the higher the throughput is. The smaller the window size is, the more strict the message order is.
  90. PacketId and MessageId
  91. ----------------------
  92. The 16-bit PacketId is defined by MQTT Protocol Specification, used by client/server to PUBLISH/PUBACK packets. A GUID(128-bit globally unique Id) will be generated by the broker and assigned to a MQTT message.
  93. Format of the globally unique message id::
  94. --------------------------------------------------------
  95. | Timestamp | NodeID + PID | Sequence |
  96. |<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
  97. --------------------------------------------------------
  98. 1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
  99. 2. NodeId: encode node() to 2 bytes integer
  100. 3. Pid: encode pid to 4 bytes integer
  101. 4. Sequence: 2 bytes sequence in one process
  102. The PacketId and MessageId in a End-to-End Message PubSub Sequence::
  103. PktId <-- Session --> MsgId <-- Router --> MsgId <-- Session --> PktId
  104. ------------
  105. PubSub Layer
  106. ------------
  107. The PubSub layer maintains a subscription table and is responsible to dispatch MQTT messages to subscribers.
  108. .. image:: _static/images/dispatch.png
  109. MQTT messages will be dispatched to the subscriber's session, which finally delivers the messages to client.
  110. -------------
  111. Routing Layer
  112. -------------
  113. The routing(distributed) layer maintains and replicates the global Topic Trie and Routing Table. The topic tire is composed of wildcard topics created by subscribers. The Routing Table maps a topic to nodes in the cluster.
  114. For example, if node1 subscribed 't/+/x' and 't/+/y', node2 subscribed 't/#' and node3 subscribed 't/a', there will be a topic trie and route table::
  115. -------------------------
  116. | t |
  117. | / \ |
  118. | + # |
  119. | / \ |
  120. | x y |
  121. -------------------------
  122. | t/+/x -> node1, node3 |
  123. | t/+/y -> node1 |
  124. | t/# -> node2 |
  125. | t/a -> node3 |
  126. -------------------------
  127. The routing layer would route MQTT messages among clustered nodes by topic trie match and routing table lookup:
  128. .. image:: _static/images/route.png
  129. The routing design follows two rules:
  130. 1. A message only gets forwarded to other cluster nodes if a cluster node is interested in it. This reduces the network traffic tremendously, because it prevents nodes from forwarding unnecessary messages.
  131. 2. As soon as a client on a node subscribes to a topic it becomes known within the cluster. If one of the clients somewhere in the cluster is publishing to this topic, the message will be delivered to its subscriber no matter to which cluster node it is connected.
  132. .. _design_auth_acl:
  133. ----------------------
  134. Authentication and ACL
  135. ----------------------
  136. The emqttd broker supports an extensible authentication/ACL mechanism, which is implemented by emqttd_access_control, emqttd_auth_mod and emqttd_acl_mod modules.
  137. emqttd_access_control module provides two APIs that help register/unregister auth or ACL module:
  138. .. code-block:: erlang
  139. register_mod(auth | acl, atom(), list()) -> ok | {error, any()}.
  140. register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, any()}.
  141. Authentication Bahaviour
  142. -------------------------
  143. The emqttd_auth_mod defines an Erlang behaviour for authentication module:
  144. .. code-block:: erlang
  145. -module(emqttd_auth_mod).
  146. -ifdef(use_specs).
  147. -callback init(AuthOpts :: list()) -> {ok, State :: any()}.
  148. -callback check(Client, Password, State) -> ok | ignore | {error, string()} when
  149. Client :: mqtt_client(),
  150. Password :: binary(),
  151. State :: any().
  152. -callback description() -> string().
  153. -else.
  154. -export([behaviour_info/1]).
  155. behaviour_info(callbacks) ->
  156. [{init, 1}, {check, 3}, {description, 0}];
  157. behaviour_info(_Other) ->
  158. undefined.
  159. -endif.
  160. The authentication modules implemented by default:
  161. +-----------------------+--------------------------------+
  162. | Module | Authentication |
  163. +-----------------------+--------------------------------+
  164. | emqttd_auth_username | Username and Password |
  165. +-----------------------+--------------------------------+
  166. | emqttd_auth_clientid | ClientID |
  167. +-----------------------+--------------------------------+
  168. | emqttd_auth_ldap | LDAP |
  169. +-----------------------+--------------------------------+
  170. | emqttd_auth_anonymous | Anonymous |
  171. +-----------------------+--------------------------------+
  172. Authorization(ACL)
  173. ------------------
  174. The emqttd_acl_mod defines an Erlang behavihour for ACL module:
  175. .. code-block:: erlang
  176. -module(emqttd_acl_mod).
  177. -include("emqttd.hrl").
  178. -ifdef(use_specs).
  179. -callback init(AclOpts :: list()) -> {ok, State :: any()}.
  180. -callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
  181. Client :: mqtt_client(),
  182. PubSub :: pubsub(),
  183. Topic :: binary().
  184. -callback reload_acl(State :: any()) -> ok | {error, any()}.
  185. -callback description() -> string().
  186. -else.
  187. -export([behaviour_info/1]).
  188. behaviour_info(callbacks) ->
  189. [{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}];
  190. behaviour_info(_Other) ->
  191. undefined.
  192. -endif.
  193. emqttd_acl_internal implements the default ACL based on etc/acl.config file:
  194. .. code-block:: erlang
  195. %%%-----------------------------------------------------------------------------
  196. %%%
  197. %%% -type who() :: all | binary() |
  198. %%% {ipaddr, esockd_access:cidr()} |
  199. %%% {client, binary()} |
  200. %%% {user, binary()}.
  201. %%%
  202. %%% -type access() :: subscribe | publish | pubsub.
  203. %%%
  204. %%% -type topic() :: binary().
  205. %%%
  206. %%% -type rule() :: {allow, all} |
  207. %%% {allow, who(), access(), list(topic())} |
  208. %%% {deny, all} |
  209. %%% {deny, who(), access(), list(topic())}.
  210. %%%
  211. %%%-----------------------------------------------------------------------------
  212. {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
  213. {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
  214. {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
  215. {allow, all}.
  216. .. _design_hook:
  217. ------------
  218. Hooks Design
  219. ------------
  220. The emqttd broker implements a simple but powerful hooks mechanism to help users develop plugin. The broker would run the hooks when a client is connected/disconnected, a topic is subscribed/unsubscribed or a MQTT message is published/delivered/acked.
  221. Hooks defined by the emqttd 1.0 broker:
  222. +------------------------+------------------------------------------------------+
  223. | Hook | Description |
  224. +========================+======================================================+
  225. | client.connected | Run when client connected to the broker successfully |
  226. +------------------------+------------------------------------------------------+
  227. | client.subscribe | Run before client subscribes topics |
  228. +------------------------+------------------------------------------------------+
  229. | client.subscribe.after | Run After client subscribed topics |
  230. +------------------------+------------------------------------------------------+
  231. | client.unsubscribe | Run when client unsubscribes topics |
  232. +------------------------+------------------------------------------------------+
  233. | message.publish | Run when a MQTT message is published |
  234. +------------------------+------------------------------------------------------+
  235. | message.delivered | Run when a MQTT message is delivered |
  236. +------------------------+------------------------------------------------------+
  237. | message.acked | Run when a MQTT message is acked |
  238. +------------------------+------------------------------------------------------+
  239. | client.disconnected | Run when client disconnected from broker |
  240. +------------------------+------------------------------------------------------+
  241. The emqttd broker uses the `Chain-of-responsibility_pattern`_ to implement hook mechanism. The callback functions registered to hook will be executed one by one::
  242. -------- ok | {ok, NewAcc} -------- ok | {ok, NewAcc} --------
  243. (Args, Acc) --> | Fun1 | -------------------> | Fun2 | -------------------> | Fun3 | --> {ok, Acc} | {stop, Acc}
  244. -------- -------- --------
  245. | | |
  246. stop | {stop, NewAcc} stop | {stop, NewAcc} stop | {stop, NewAcc}
  247. The callback function for a hook should return:
  248. +-----------------+------------------------+
  249. | Return | Description |
  250. +=================+========================+
  251. | ok | Continue |
  252. +-----------------+------------------------+
  253. | {ok, NewAcc} | Return Acc and Continue|
  254. +-----------------+------------------------+
  255. | stop | Break |
  256. +-----------------+------------------------+
  257. | {stop, NewAcc} | Return Acc and Break |
  258. +-----------------+------------------------+
  259. The input arguments for a callback function are depending on the types of hook. Clone the `emqttd_plugin_template`_ project to check the argument in detail.
  260. Hook Implementation
  261. -------------------
  262. The hook APIs defined in emqttd module:
  263. .. code-block:: erlang
  264. -module(emqttd).
  265. %% Hooks API
  266. -export([hook/4, hook/3, unhook/2, run_hooks/3]).
  267. hook(Hook :: atom(), Callback :: function(), InitArgs :: list(any())) -> ok | {error, any()}.
  268. hook(Hook :: atom(), Callback :: function(), InitArgs :: list(any()), Priority :: integer()) -> ok | {error, any()}.
  269. unhook(Hook :: atom(), Callback :: function()) -> ok | {error, any()}.
  270. run_hooks(Hook :: atom(), Args :: list(any()), Acc :: any()) -> {ok | stop, any()}.
  271. And implemented in emqttd_hook module:
  272. .. code-block:: erlang
  273. -module(emqttd_hook).
  274. %% Hooks API
  275. -export([add/3, add/4, delete/2, run/3, lookup/1]).
  276. add(HookPoint :: atom(), Callback :: function(), InitArgs :: list(any())) -> ok.
  277. add(HookPoint :: atom(), Callback :: function(), InitArgs :: list(any()), Priority :: integer()) -> ok.
  278. delete(HookPoint :: atom(), Callback :: function()) -> ok.
  279. run(HookPoint :: atom(), Args :: list(any()), Acc :: any()) -> any().
  280. lookup(HookPoint :: atom()) -> [#callback{}].
  281. Hook Usage
  282. ----------
  283. The `emqttd_plugin_template`_ project provides the examples for hook usage:
  284. .. code-block:: erlang
  285. -module(emqttd_plugin_template).
  286. -export([load/1, unload/0]).
  287. -export([on_message_publish/2, on_message_delivered/3, on_message_acked/3]).
  288. load(Env) ->
  289. emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
  290. emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]),
  291. emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).
  292. on_message_publish(Message, _Env) ->
  293. io:format("publish ~s~n", [emqttd_message:format(Message)]),
  294. {ok, Message}.
  295. on_message_delivered(ClientId, Message, _Env) ->
  296. io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),
  297. {ok, Message}.
  298. on_message_acked(ClientId, Message, _Env) ->
  299. io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),
  300. {ok, Message}.
  301. unload() ->
  302. emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
  303. emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/3),
  304. emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/3).
  305. .. _design_plugin:
  306. -------------
  307. Plugin Design
  308. -------------
  309. Plugin is a normal erlang application that can be started/stopped dynamically by a running emqttd broker.
  310. emqttd_plugins Module
  311. ---------------------
  312. The plugin mechanism is implemented by emqttd_plugins module:
  313. .. code-block:: erlang
  314. -module(emqttd_plugins).
  315. -export([load/1, unload/1]).
  316. %% @doc Load a Plugin
  317. load(PluginName :: atom()) -> ok | {error, any()}.
  318. %% @doc UnLoad a Plugin
  319. unload(PluginName :: atom()) -> ok | {error, any()}.
  320. Load a Plugin
  321. -------------
  322. Use './bin/emqttd_ctl' CLI to load/unload a plugin::
  323. ./bin/emqttd_ctl plugins load emqttd_plugin_redis
  324. ./bin/emqttd_ctl plugins unload emqttd_plugin_redis
  325. Plugin Template
  326. ---------------
  327. http://github.com/emqtt/emqttd_plugin_template
  328. .. _eSockd: https://github.com/emqtt/esockd
  329. .. _Chain-of-responsibility_pattern: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
  330. .. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template/blob/master/src/emqttd_plugin_template.erl
  331. -----------------
  332. Mnesia/ETS Tables
  333. -----------------
  334. +--------------------+--------+----------------------------------------+
  335. | Table | Type | Description |
  336. +====================+========+========================================+
  337. | mqtt_trie | mnesia | Trie Table |
  338. +--------------------+--------+----------------------------------------+
  339. | mqtt_trie_node | mnesia | Trie Node Table |
  340. +--------------------+--------+----------------------------------------+
  341. | mqtt_route | mnesia | Global Route Table |
  342. +--------------------+--------+----------------------------------------+
  343. | mqtt_local_route | mnesia | Local Route Table |
  344. +--------------------+--------+----------------------------------------+
  345. | mqtt_pubsub | ets | PubSub Tab |
  346. +--------------------+--------+----------------------------------------+
  347. | mqtt_subscriber | ets | Subscriber Tab |
  348. +--------------------+--------+----------------------------------------+
  349. | mqtt_subscription | ets | Subscription Tab |
  350. +--------------------+--------+----------------------------------------+
  351. | mqtt_session | mnesia | Global Session Table |
  352. +--------------------+--------+----------------------------------------+
  353. | mqtt_local_session | ets | Local Session Table |
  354. +--------------------+--------+----------------------------------------+
  355. | mqtt_client | ets | Client Table |
  356. +--------------------+--------+----------------------------------------+