Преглед изворни кода

Merge pull request #495 from emqtt/0.17

1.0 - update design guide and fix issue #486
Feng Lee пре 9 година
родитељ
комит
5a91f9d089

BIN
docs/source/_static/images/dashboard.png


BIN
docs/source/_static/images/dispatch.png


BIN
docs/source/_static/images/route.png


+ 24 - 4
docs/source/cluster.rst

@@ -236,12 +236,32 @@ If a persistent MQTT client connected to node1 first, then disconnected and conn
      client-->| connection |<--|
      client-->| connection |<--|
               --------------
               --------------
 
 
-----------------
-Notice: NetSplit
-----------------
+------------
+The Firewall
+------------
+
+If there is a firewall between clustered nodes, the cluster requires to open 4369 port used by epmd daemon, and a port segment for nodes' communication.
+
+Configure the port segment in etc/emqttd.config, for example::
+
+    [{kernel, [
+        ...
+        {inet_dist_listen_min, 20000},
+        {inet_dist_listen_max, 21000}
+     ]},
+     ...
+
+------------------
+Network Partitions
+------------------
+
+The emqttd 1.0 cluster requires reliable network to avoid network partitions. The cluster will not recover from a network partition automatically.
+
+If a network partition occures, there will be critical logs in log/emqttd_error.log::
 
 
-The emqttd cluster does not support deployment across IDC, and the cluster will not handle NetSplit automatically. If NetSplit occures, nodes have to be rebooted manually.
+    Mnesia inconsistent_database event: running_partitioned_network, emqttd@host
 
 
+To recover from a network partition, you have to stop the nodes in a partition, clean the 'data/mneisa' of these nodes and reboot to join the cluster again.
 
 
 -----------------------
 -----------------------
 Consistent Hash and DHT
 Consistent Hash and DHT

+ 372 - 231
docs/source/design.rst

@@ -1,330 +1,471 @@
-==============
-Design Guide
-==============
-
----------------
-Pubsub Sequence
----------------
 
 
-## PubSub Sequence
+.. _design:
 
 
-### Clean Session = 1
+============
+Design Guide
+============
 
 
-```
+.. _design_architecture:
 
 
-title PubSub Sequence(Clean Session = 1)
+------------
+Architecture
+------------
 
 
-ClientA-->PubSub: Publish Message
-PubSub-->ClientB: Dispatch Message
-```
+The emqttd broker 1.0 is more like a network Switch or Router, not a traditional enterprise message queue. Compared to a network router that routes packets based on IP or MPLS label, the emqttd broker routes MQTT messages based on topic trie.
 
 
-![PubSub_CleanSess_1](http://emqtt.io/static/img/design/PubSub_CleanSess_1.png)
+.. image:: _static/images/concept.png
 
 
-### Clean Session = 0
+Design Philosophy
+-----------------
 
 
-```
-title PubSub Sequence(Clean Session = 0)
+1. Focus on handling millions of MQTT connections and routing MQTT messages between clustered nodes.
 
 
-ClientA-->SessionA: Publish Message
-SessionA-->PubSub: Publish Message
-PubSub-->SessionB: Dispatch Message
-SessionB-->ClientB: Dispatch Message
+2. Embrace Erlang/OTP, The Soft-Realtime, Low-Latency, Concurrent and Fault-Tolerant Platform.
 
 
-```
-![PubSub_CleanSess_0](http://emqtt.io/static/img/design/PubSub_CleanSess_0.png)
+3. Layered Design: Connection, Session, PubSub and Router Layers.
 
 
+4. Separate the Message Flow Plane and the Control/Management Plane.
 
 
-## Qos
+5. Stream MQTT messages to various backends including MQ or databases.
 
 
-PubQos | SubQos | In Message | Out Message
--------|--------|------------|-------------
-0   |   0    |   0        | 0
-0   |   1    |   0        | 0
-0   |   2    |   0        | 0
-1   |   0    |   1        | 0
-1   |   1    |   1        | 1
-1   |   2    |   1        | 1
-2   |   0    |   2        | 0
-2   |   1    |   2        | 1
-2   |   2    |   2        | 2
+System Layers
+-------------
 
 
+1. Connection Layer
+   
+   Handle TCP and WebSocket connections, encode/decode MQTT packets.
+
+2. Session Layer
+   
+   Process MQTT PUBLISH/SUBSCRIBE Packets received from client, and deliver MQTT messages to client.
+   
+3. PubSub Layer
+   
+   Dispatch MQTT messages to subscribers in a node.
+
+4. Routing(Distributed) Layer
+   
+   Route MQTT messages among clustered nodes.
+
+----------------
+Connection Layer
+----------------
+
+This layer is built on the `eSockd`_ library which is a general Non-blocking TCP/SSL Socket Server:
+
+* Acceptor Pool and Asynchronous TCP Accept
+* Parameterized Connection Module
+* Max connections management
+* Allow/Deny by peer address or CIDR
+* Keepalive Support
+* Rate Limit based on The Leaky Bucket Algorithm
+* Fully Asynchronous TCP RECV/SEND
+
+This layer is also responsible for encoding/decoding MQTT frames:
+
+1. Parse MQTT frames received from client
+2. Serialize MQTT frames sent to client
+3. MQTT Connection Keepalive
+
+Main erlang modules of this layer:
+
++------------------+--------------------------+
+| Module           | Description              |
++==================+==========================+
+| emqttd_client    | TCP Client               |
++------------------+--------------------------+
+| emqttd_ws_client | WebSocket Client         |
++------------------+--------------------------+
+| emqttd_protocol  | MQTT Protocol Handler    |
++------------------+--------------------------+
+| emqttd_parser    | MQTT Frame Parser        |
++------------------+--------------------------+
+| emqttd_serializer| MQTT Frame Serializer    |
++------------------+--------------------------+
 
 
-## Topic Functions Benchmark
+-------------
+Session Layer
+-------------
 
 
-Mac Air(11):
+The session layer processes MQTT packets received from client and delivers PUBLISH packets to client.
 
 
-Function     | Time(microseconds)
--------------|--------------------
-match        | 6.25086
-triples      | 13.86881
-words        | 3.41177
-binary:split | 3.03776
+A MQTT session will store the subscriptions and inflight messages in memory:
 
 
-iMac:
+1. The Client’s subscriptions.
 
 
-Function     | Time(microseconds)
--------------|--------------------
-match        | 3.2348
-triples      | 6.93524
-words        | 1.89616
-binary:split | 1.65243
+2. Inflight qos1/2 messages sent to the client but unacked, QoS 2 messages which 
+   have been sent to the Client, but have not been completely acknowledged.
 
 
+3. Inflight qos2 messages received from client and waiting for PUBREL. QoS 2
+   messages which have been received from the Client, but have not been
+   completely acknowledged.
 
 
---------------
-Cluster Design
---------------
+4. All qos1, qos2 messages published to when client is disconnected.
 
 
-## Cluster Design
+MQueue and Inflight Window
+--------------------------
 
 
-1. One 'disc_copies' node and many 'ram_copies' nodes.
+Concept of Message Queue and Inflight Window::
 
 
-   2. Topic trie tree will be copied to every clusterd node.
+          |<----------------- Max Len ----------------->|
+          -----------------------------------------------
+    IN -> |     Messages Queue    |  Inflight Window    | -> Out
+          -----------------------------------------------
+                                  |<---   Win Size  --->|
 
 
-   3. Subscribers to topic will be stored in each node and will not be copied.
+1. Inflight Window to store the messages delivered and await for PUBACK.
 
 
-   ## Cluster Strategy
+2. Enqueue messages when the inflight window is full.
 
 
-   TODO:...
+3. If the queue is full, drop qos0 messages if store_qos0 is true, otherwise drop the oldest one.
 
 
-   1. A message only gets forwarded to other cluster nodes if a cluster node is interested in it. this reduces the network traffic tremendously, because it prevents nodes from forwarding unnecessary messages.
+The larger the inflight window size is, the higher the throughput is. The smaller the window size is, the more strict the message order is.
 
 
-   2. As soon as a client on a node subscribes to a topic it becomes known within the cluster. If one of the clients somewhere in the cluster is publishing to this topic, the message will be delivered to its subscriber no matter to which cluster node it is connected.
+PacketId and MessageId
+----------------------
 
 
-   ....
+The 16-bit PacketId is defined by MQTT Protocol Specification, used by client/server to PUBLISH/PUBACK packets. A GUID(128-bit globally unique Id) will be generated by the broker and assigned to a MQTT message.
 
 
-## Cluster Architecture
+Format of the globally unique message id::
 
 
-![Cluster Design](http://emqtt.io/static/img/Cluster.png)
-## Cluster Command
+    --------------------------------------------------------
+    |        Timestamp       |  NodeID + PID  |  Sequence  |
+    |<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
+    --------------------------------------------------------
 
 
-```sh
-./bin/emqttd_ctl cluster DiscNode
-```
+1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
+2. NodeId:    encode node() to 2 bytes integer
+3. Pid:       encode pid to 4 bytes integer
+4. Sequence:  2 bytes sequence in one process
 
 
-## Mnesia Example
+The PacketId and MessageId in a End-to-End Message PubSub Sequence::
 
 
-```
-(emqttd3@127.0.0.1)3> mnesia:info().
----> Processes holding locks <---
----> Processes waiting for locks <---
----> Participant transactions <---
----> Coordinator transactions <---
----> Uncertain transactions <---
----> Active tables <---
-mqtt_retained : with 6 records occupying 221 words of mem
-topic_subscriber: with 0 records occupying 305 words of mem
-topic_trie_node: with 129 records occupying 3195 words of mem
-topic_trie : with 128 records occupying 3986 words of mem
-topic : with 93 records occupying 1797 words of mem
-schema : with 6 records occupying 1081 words of mem
-===> System info in version "4.12.4", debug level = none <===
-opt_disc. Directory "/Users/erylee/Projects/emqttd/rel/emqttd3/data/mnesia" is NOT used.
-use fallback at restart = false
-running db nodes = ['emqttd2@127.0.0.1','emqttd@127.0.0.1','emqttd3@127.0.0.1']
-stopped db nodes = []
-master node tables = []
-remote = []
-ram_copies = [mqtt_retained,schema,topic,topic_subscriber,topic_trie,
-topic_trie_node]
-disc_copies = []
-disc_only_copies = []
-[{'emqttd2@127.0.0.1',ram_copies},
-{'emqttd3@127.0.0.1',ram_copies},
-{'emqttd@127.0.0.1',disc_copies}] = [schema]
-[{'emqttd2@127.0.0.1',ram_copies},
-{'emqttd3@127.0.0.1',ram_copies},
-{'emqttd@127.0.0.1',ram_copies}] = [topic,topic_trie,topic_trie_node,
-mqtt_retained]
-[{'emqttd3@127.0.0.1',ram_copies}] = [topic_subscriber]
-44 transactions committed, 5 aborted, 0 restarted, 0 logged to disc
-   0 held locks, 0 in queue; 0 local transactions, 0 remote
-   0 transactions waits for other nodes: []
-   ```
+    PktId <-- Session --> MsgId <-- Router --> MsgId <-- Session --> PktId
 
 
-   ## Cluster vs Bridge
+------------
+PubSub Layer
+------------
 
 
-   Cluster will copy topic trie tree between nodes, Bridge will not.
+The PubSub layer maintains a subscription table and is responsible to dispatch MQTT messages to subscribers.
 
 
+.. image:: _static/images/dispatch.png
 
 
+MQTT messages will be dispatched to the subscriber's session, which finally delivers the messages to client.
 
 
 -------------
 -------------
-Hooks Design
+Routing Layer
 -------------
 -------------
 
 
-## Overview 
+The routing(distributed) layer maintains and replicates the global Topic Trie and Routing Table. The topic tire is composed of wildcard topics created by subscribers. The Routing Table maps a topic to nodes in the cluster.
 
 
-emqttd supported a simple hooks mechanism in 0.8.0 release to extend the broker. The designed is improved in 0.9.0 release.
+For example, if node1 subscribed 't/+/x' and 't/+/y', node2 subscribed 't/#' and node3 subscribed 't/a', there will be a topic trie and route table::
 
 
-## API
+    -------------------------
+    |            t          |
+    |           / \         |
+    |          +   #        |
+    |        /  \           |
+    |      x      y         |
+    -------------------------
+    | t/+/x -> node1, node3 |
+    | t/+/y -> node1        |
+    | t/#   -> node2        |
+    | t/a   -> node3        |
+    -------------------------
 
 
-emqttd_broker Hook API:
+The routing layer would route MQTT messages among clustered nodes by topic trie match and routing table lookup:
 
 
-```
--export([hook/3, unhook/2, foreach_hooks/2, foldl_hooks/3]).
-```
+.. image:: _static/images/route.png
 
 
-### Hook 
+The routing design follows two rules:
 
 
-``` 
--spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
-hook(Hook, Name, MFA) ->
-    ...
-    ```
+1. A message only gets forwarded to other cluster nodes if a cluster node is interested in it. This reduces the network traffic tremendously, because it prevents nodes from forwarding unnecessary messages.
 
 
- ### Unhook
+2. As soon as a client on a node subscribes to a topic it becomes known within the cluster. If one of the clients somewhere in the cluster is publishing to this topic, the message will be delivered to its subscriber no matter to which cluster node it is connected.
 
 
- ```
- -spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
- unhook(Hook, Name) ->
-     ...
-     ```
+.. _design_auth_acl:
 
 
-  ### Foreach Hooks
+----------------------
+Authentication and ACL
+----------------------
 
 
-  ```
-  -spec foreach_hooks(Hook :: atom(), Args :: list()) -> any().
-  foreach_hooks(Hook, Args) ->
-      ...
-      ```
+The emqttd broker supports an extensible authentication/ACL mechanism, which is implemented by emqttd_access_control, emqttd_auth_mod and emqttd_acl_mod modules.
 
 
-   ### Foldl Hooks
+emqttd_access_control module provides two APIs that help register/unregister auth or ACL module::
 
 
-   ```
-   -spec foldl_hooks(Hook :: atom(), Args :: list(), Acc0 :: any()) -> any().
-   foldl_hooks(Hook, Args, Acc0) ->
-       ...
-       ```
+    register_mod(auth | acl, atom(), list()) -> ok | {error, any()}.
 
 
-    ## Hooks 
+    register_mod(auth | acl, atom(), list(), non_neg_integer()) -> ok | {error, any()}.
 
 
-    Name             | Type      | Description
-    ---------------  | ----------| --------------
-    client.connected | foreach   | Run when client connected successfully
-    client.subscribe | foldl     | Run before client subscribe topics
-    client.subscribe.after | foreach | Run After client subscribe topics
-    client.unsubscribe | foldl   | Run when client unsubscribe topics
-    message.publish   | foldl     | Run when message is published
-    message.acked   | foreach     | Run when message is acked
-    client.disconnected | foreach | Run when client is disconnnected
+Authentication Bahaviour
+-------------------------
 
 
-    ## End-to-End Message Pub/Ack
+The emqttd_auth_mod defines an Erlang behaviour for authentication module::
 
 
-    Could use 'message.publish', 'message.acked' hooks to implement end-to-end message pub/ack:
+    -module(emqttd_auth_mod).
 
 
-    ```
-     PktId <-- --> MsgId <-- --> MsgId <-- --> PktId
-          |<--- Qos --->|<---PubSub--->|<-- Qos -->|
-          ```
-## Limit
+    -ifdef(use_specs).
 
 
-The design is experimental.
+    -callback init(AuthOpts :: list()) -> {ok, State :: any()}.
 
 
+    -callback check(Client, Password, State) -> ok | ignore | {error, string()} when
+        Client    :: mqtt_client(),
+        Password  :: binary(),
+        State     :: any().
 
 
---------------
-Plugin Design
---------------
+    -callback description() -> string().
+
+    -else.
+
+    -export([behaviour_info/1]).
+
+    behaviour_info(callbacks) ->
+        [{init, 1}, {check, 3}, {description, 0}];
+    behaviour_info(_Other) ->
+        undefined.
+
+    -endif.
+
+The authentication modules implemented by default:
 
 
-## Overview
++-----------------------+--------------------------------+
+| Module                | Authentication                 |
++-----------------------+--------------------------------+
+| emqttd_auth_username  | Username and Password          |
++-----------------------+--------------------------------+
+| emqttd_auth_clientid  | ClientID                       |
++-----------------------+--------------------------------+
+| emqttd_auth_ldap      | LDAP                           |
++-----------------------+--------------------------------+
+| emqttd_auth_anonymous | Anonymous                      |
++-----------------------+--------------------------------+
 
 
-**Notice that 0.11.0 release use rebar to manage plugin's deps.**
+Authorization(ACL)
+------------------
 
 
-A plugin is just an erlang application that extends emqttd broker.
+The emqttd_acl_mod defines an Erlang behavihour for ACL module::
 
 
-The plugin application should be put in "emqttd/plugins/" folder to build. 
+    -module(emqttd_acl_mod).
 
 
+    -include("emqttd.hrl").
 
 
-## Plugin Project
+    -ifdef(use_specs).
 
 
-You could create a standalone plugin project outside emqttd, and then add it to "emqttd/plugins/" folder by "git submodule". 
+    -callback init(AclOpts :: list()) -> {ok, State :: any()}.
 
 
-Git submodule to compile emqttd_dashboard plugin with the broker, For example:
+    -callback check_acl({Client, PubSub, Topic}, State :: any()) -> allow | deny | ignore when
+        Client   :: mqtt_client(),
+        PubSub   :: pubsub(),
+        Topic    :: binary().
+
+    -callback reload_acl(State :: any()) -> ok | {error, any()}.
+
+    -callback description() -> string().
+
+    -else.
+
+    -export([behaviour_info/1]).
+
+    behaviour_info(callbacks) ->
+        [{init, 1}, {check_acl, 2}, {reload_acl, 1}, {description, 0}];
+    behaviour_info(_Other) ->
+        undefined.
+
+    -endif.
+
+emqttd_acl_internal implements the default ACL based on etc/acl.config file::
+
+    %%%-----------------------------------------------------------------------------
+    %%%
+    %%% -type who() :: all | binary() |
+    %%%                {ipaddr, esockd_access:cidr()} |
+    %%%                {client, binary()} |
+    %%%                {user, binary()}.
+    %%%
+    %%% -type access() :: subscribe | publish | pubsub.
+    %%%
+    %%% -type topic() :: binary().
+    %%%
+    %%% -type rule() :: {allow, all} |
+    %%%                 {allow, who(), access(), list(topic())} |
+    %%%                 {deny, all} |
+    %%%                 {deny, who(), access(), list(topic())}.
+    %%%
+    %%%-----------------------------------------------------------------------------
+
+    {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
+
+    {allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
+
+    {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
+
+    {allow, all}.
+
+.. _design_hook:
+
+------------
+Hooks Design
+------------
 
 
-```
-git submodule add https://github.com/emqtt/emqttd_dashboard.git plugins/emqttd_dashboard
-make && make dist
-```
+The emqttd broker implements a simple but powerful hooks mechanism to help users develop plugin. The broker would run the hooks when a client is connected/disconnected, a topic is subscribed/unsubscribed or a MQTT message is published/delivered/acked.
 
 
-## plugin.config
+Hooks defined by the emqttd 1.0 broker:
 
 
-**Each plugin should have a 'etc/plugin.config' file**
++------------------------+------------------------------------------------------+
+| Hook                   | Description                                          |
++========================+======================================================+
+| client.connected       | Run when client connected to the broker successfully |
++------------------------+------------------------------------------------------+
+| client.subscribe       | Run before client subscribes topics                  |
++------------------------+------------------------------------------------------+
+| client.subscribe.after | Run After client subscribed topics                   |
++------------------------+------------------------------------------------------+
+| client.unsubscribe     | Run when client unsubscribes topics                  |
++------------------------+------------------------------------------------------+
+| message.publish        | Run when a MQTT message is published                 |
++------------------------+------------------------------------------------------+
+| message.delivered      | Run when a MQTT message is delivered                 |
++------------------------+------------------------------------------------------+
+| message.acked          | Run when a MQTT message is acked                     |
++------------------------+------------------------------------------------------+
+| client.disconnected    | Run when client disconnected from broker             |
++------------------------+------------------------------------------------------+
 
 
-For example, project structure of emqttd_dashboard plugin:
+The emqttd broker uses the `Chain-of-responsibility_pattern`_ to implement hook mechanism. The callback functions registered to hook will be executed one by one::
 
 
-```
-LICENSE
-README.md
-ebin
-etc
-priv
-rebar.config
-src
-```
+                     --------  ok | {ok, NewAcc}   --------  ok | {ok, NewAcc}   --------
+     (Args, Acc) --> | Fun1 | -------------------> | Fun2 | -------------------> | Fun3 | --> {ok, Acc} | {stop, Acc}
+                     --------                      --------                      --------
+                        |                             |                             |
+                   stop | {stop, NewAcc}         stop | {stop, NewAcc}         stop | {stop, NewAcc}
 
 
-etc/plugin.config for emqttd_dashboard plugin:
+The callback function for a hook should return:
 
 
-```
-[
-{emqttd_dashboard, [
-{listener,
-{emqttd_dashboard, 18083, [
-{acceptors, 4},
-{max_clients, 512}]}}
-]}
-].
-```
++-----------------+------------------------+
+| Return          | Description            |
++=================+========================+
+| ok              | Continue               |
++-----------------+------------------------+
+| {ok, NewAcc}    | Return Acc and Continue|
++-----------------+------------------------+
+| stop            | Break                  |
++-----------------+------------------------+
+| {stop, NewAcc}  | Return Acc and Break   |
++-----------------+------------------------+
 
 
-## rebar.config
+The input arguments for a callback function are depending on the types of hook. Clone the `emqttd_plugin_template`_ project to check the argument in detail.
 
 
-**Plugin should use 'rebar.config' to manage depencies**
+Hook Implementation
+-------------------
 
 
-emqttd_plugin_pgsql plugin's rebar.config, for example:
+The hook APIs defined in emqttd module:
 
 
-```
-%% -*- erlang -*-
+.. code:: erlang
 
 
-{deps, [
-{epgsql, ".*",{git, "https://github.com/epgsql/epgsql.git", {branch, "master"}}}
-]}.
-```
+    -module(emqttd).
 
 
-## Build emqttd with plugins
+    %% Hooks API
+    -export([hook/4, hook/3, unhook/2, run_hooks/3]).
+    hook(Hook :: atom(), Callback :: function(), InitArgs :: list(any())) -> ok | {error, any()}.
 
 
-Put all the plugins you required in 'plugins/' folder of emqttd project, and then:
+    hook(Hook :: atom(), Callback :: function(), InitArgs :: list(any()), Priority :: integer()) -> ok | {error, any()}.
 
 
-```
-make && make dist
-```
+    unhook(Hook :: atom(), Callback :: function()) -> ok | {error, any()}.
 
 
-## Load Plugin
+    run_hooks(Hook :: atom(), Args :: list(any()), Acc :: any()) -> {ok | stop, any()}.
 
 
-'./bin/emqttd_ctl' to load/unload plugin, when emqttd broker started.
+And implemented in emqttd_hook module:
 
 
-```
-./bin/emqttd_ctl plugins load emqttd_plugin_demo
+.. code:: erlang
 
 
-./bin/emqttd_ctl plugins unload emqttd_plugin_demo
-```
+    -module(emqttd_hook).
 
 
-## List Plugins
+    %% Hooks API
+    -export([add/3, add/4, delete/2, run/3, lookup/1]).
 
 
-```
-./bin/emqttd_ctl plugins list
-```
+    add(HookPoint :: atom(), Callback :: function(), InitArgs :: list(any())) -> ok.
 
 
-## API
+    add(HookPoint :: atom(), Callback :: function(), InitArgs :: list(any()), Priority :: integer()) -> ok.
 
 
-```
-%% Load all active plugins after broker started
-emqttd_plugins:load() 
+    delete(HookPoint :: atom(), Callback :: function()) -> ok.
 
 
-%% Load new plugin
-emqttd_plugins:load(Name)
+    run(HookPoint :: atom(), Args :: list(any()), Acc :: any()) -> any().
 
 
-%% Unload all active plugins before broker stopped
-emqttd_plugins:unload()
+    lookup(HookPoint :: atom()) -> [#callback{}].
+
+Hook Usage
+----------
+
+The `emqttd_plugin_template`_ project provides the examples for hook usage:
+
+.. code:: erlang
+
+    -module(emqttd_plugin_template).
+
+    -export([load/1, unload/0]).
+    
+    -export([on_message_publish/2, on_message_delivered/3, on_message_acked/3]).
+
+    load(Env) ->
+        emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
+        emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/3, [Env]),
+        emqttd:hook('message.acked', fun ?MODULE:on_message_acked/3, [Env]).
+
+    on_message_publish(Message, _Env) ->
+        io:format("publish ~s~n", [emqttd_message:format(Message)]),
+        {ok, Message}.
+
+    on_message_delivered(ClientId, Message, _Env) ->
+        io:format("delivered to client ~s: ~s~n", [ClientId, emqttd_message:format(Message)]),
+        {ok, Message}.
+
+    on_message_acked(ClientId, Message, _Env) ->
+        io:format("client ~s acked: ~s~n", [ClientId, emqttd_message:format(Message)]),
+        {ok, Message}.
+
+    unload() ->
+        emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
+        emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/3),
+        emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/3).
+
+.. _design_plugin:
+
+-------------
+Plugin Design
+-------------
+
+Plugin is a normal erlang application that can be started/stopped dynamically by a running emqttd broker.
+
+emqttd_plugins Module
+---------------------
+
+The plugin mechanism is implemented by emqttd_plugins module::
+
+    -module(emqttd_plugins).
+
+    -export([load/1, unload/1]).
+
+    %% @doc Load a Plugin
+    load(PluginName :: atom()) -> ok | {error, any()}.
+
+    %% @doc UnLoad a Plugin
+    unload(PluginName :: atom()) -> ok | {error, any()}.
+
+Load a Plugin
+-------------
+
+Use './bin/emqttd_ctl' CLI to load/unload a plugin::
+
+    ./bin/emqttd_ctl plugins load emqttd_plugin_redis
+
+    ./bin/emqttd_ctl plugins unload emqttd_plugin_redis
+
+Plugin Template
+---------------
 
 
-%% Unload a plugin
-emqttd_plugins:unload(Name)
-```
+http://github.com/emqtt/emqttd_plugin_template
 
 
+.. _eSockd: https://github.com/emqtt/esockd
+.. _Chain-of-responsibility_pattern: https://en.wikipedia.org/wiki/Chain-of-responsibility_pattern
+.. _emqttd_plugin_template: https://github.com/emqtt/emqttd_plugin_template/blob/master/src/emqttd_plugin_template.erl
 
 

+ 2 - 2
docs/source/getstarted.rst

@@ -55,11 +55,11 @@ Quick Start
 Download and Install
 Download and Install
 --------------------
 --------------------
 
 
-The emqttd broker is cross-platform, could be deployed on Linux, Mac, FreeBSD, Windows and Raspberry Pi.
+The emqttd broker is cross-platform, which could be deployed on Linux, FreeBSD, Mac, Windows and even Raspberry Pi.
 
 
 Download binary package from: http://emqtt.io/downloads.
 Download binary package from: http://emqtt.io/downloads.
 
 
-Installing on Mac, For example:
+Installing on Mac, for example:
 
 
 .. code:: console
 .. code:: console
 
 

+ 1 - 1
docs/source/guide.rst

@@ -450,7 +450,7 @@ MQTT(SSL) Listener, Default Port is 8883::
 HTTP Publish API
 HTTP Publish API
 ----------------
 ----------------
 
 
-The emqttd broker provides a HTTP API to help application servers to publish messages to MQTT clients.
+The emqttd broker provides a HTTP API to help application servers publish messages to MQTT clients.
 
 
 HTTP API: POST http://host:8083/mqtt/publish
 HTTP API: POST http://host:8083/mqtt/publish
 
 

+ 1 - 0
docs/source/index.rst

@@ -48,6 +48,7 @@ Contents:
    cluster
    cluster
    bridge
    bridge
    guide
    guide
+   design
    commands
    commands
    plugins
    plugins
    tune
    tune

+ 24 - 7
docs/source/install.rst

@@ -1,17 +1,21 @@
 
 
-=======================
+.. _install:
+
+============
 Installation
 Installation
-=======================
+============
 
 
-emqttd broker is cross-platform, could deploy on Linux, FreeBSD, Mac OS X and Windows.
+The emqttd broker is cross-platform, which could be deployed on Linux, FreeBSD, Mac, Windows and even Raspberry Pi.
 
 
 .. NOTE::
 .. NOTE::
 
 
     Linux, FreeBSD Recommended.
     Linux, FreeBSD Recommended.
 
 
------------------
-Download  Package
------------------
+.. _install_download:
+
+----------------
+Download Package
+----------------
 
 
 Download binary package from: http://emqtt.io/downloads
 Download binary package from: http://emqtt.io/downloads
 
 
@@ -31,6 +35,8 @@ The package name consists of platform, version and release time.
 
 
 For example: emqttd-centos64-0.16.0-beta-20160216.zip
 For example: emqttd-centos64-0.16.0-beta-20160216.zip
 
 
+.. _install_on_linux:
+
 --------------------
 --------------------
 Installing on Linux
 Installing on Linux
 --------------------
 --------------------
@@ -47,7 +53,7 @@ Start the broker in console mode::
 
 
     cd emqttd && ./bin/emqttd console
     cd emqttd && ./bin/emqttd console
 
 
-If the broker started successfully, console will print:
+If the broker is started successfully, console will print:
 
 
 .. code:: console
 .. code:: console
 
 
@@ -102,6 +108,7 @@ Stop the broker::
 
 
     ./bin/emqttd stop
     ./bin/emqttd stop
 
 
+.. _install_on_freebsd:
 
 
 ---------------------
 ---------------------
 Installing on FreeBSD
 Installing on FreeBSD
@@ -111,6 +118,7 @@ Download FreeBSD Package from: http://emqtt.io/downloads/freebsd
 
 
 The installing process is same to Linux.
 The installing process is same to Linux.
 
 
+.. _install_on_mac:
 
 
 ----------------------
 ----------------------
 Installing on Mac OS X
 Installing on Mac OS X
@@ -134,6 +142,8 @@ Configure 'lager' log level in 'etc/emqttd.config', all MQTT messages recevied/s
 
 
 The install and boot process on Mac are same to Linux.
 The install and boot process on Mac are same to Linux.
 
 
+.. _install_on_windows:
+
 ---------------------
 ---------------------
 Installing on Windows
 Installing on Windows
 ---------------------
 ---------------------
@@ -168,6 +178,8 @@ Uninstall emqttd service::
 
 
 .. WARNING:: './bin/emqttd_ctl' command line cannot work on Windows.
 .. WARNING:: './bin/emqttd_ctl' command line cannot work on Windows.
 
 
+.. _build_from_source:
+
 ----------------------
 ----------------------
 Installing From Source
 Installing From Source
 ----------------------
 ----------------------
@@ -194,6 +206,8 @@ The binary package output in folder::
 
 
     rel/emqttd
     rel/emqttd
 
 
+.. _tcp_ports:
+
 ------------------
 ------------------
 TCP Ports Occupied
 TCP Ports Occupied
 ------------------
 ------------------
@@ -228,6 +242,8 @@ The TCP ports could be configured in etc/emqttd.config:
 
 
 The 18083 port is used by Web Dashboard of the broker. Default login: admin, Password: public
 The 18083 port is used by Web Dashboard of the broker. Default login: admin, Password: public
 
 
+.. _quick_setup:
+
 -----------
 -----------
 Quick Setup
 Quick Setup
 -----------
 -----------
@@ -272,6 +288,7 @@ etc/emqttd.config文件listeners段落设置最大允许连接数:
 
 
 emqttd消息服务器详细设置,请参见文档: :ref:`config`
 emqttd消息服务器详细设置,请参见文档: :ref:`config`
 
 
+.. _init_d_emqttd:
 
 
 -------------------
 -------------------
 /etc/init.d/emqttd
 /etc/init.d/emqttd

+ 1 - 1
docs/source/plugins.rst

@@ -426,7 +426,7 @@ http://localhost:61616/index.html
 emqttd_recon - Recon Plugin
 emqttd_recon - Recon Plugin
 ---------------------------
 ---------------------------
 
 
-The plugin loads `recon`_ library on a running emqttd broker. Recon libray helps to debug and optimize an Erlang application.
+The plugin loads `recon`_ library on a running emqttd broker. Recon libray helps debug and optimize an Erlang application.
 
 
 Load emqttd_recon Plugin
 Load emqttd_recon Plugin
 ------------------------
 ------------------------

+ 17 - 17
src/emqttd_cm.erl

@@ -70,14 +70,14 @@ lookup_proc(ClientId) when is_binary(ClientId) ->
 %% @doc Register ClientId with Pid.
 %% @doc Register ClientId with Pid.
 -spec(register(Client :: mqtt_client()) -> ok).
 -spec(register(Client :: mqtt_client()) -> ok).
 register(Client = #mqtt_client{client_id = ClientId}) ->
 register(Client = #mqtt_client{client_id = ClientId}) ->
-    CmPid = gproc_pool:pick_worker(?POOL, ClientId),
-    gen_server2:cast(CmPid, {register, Client}).
+    gen_server2:call(pick(ClientId), {register, Client}, 120000).
 
 
 %% @doc Unregister clientId with pid.
 %% @doc Unregister clientId with pid.
 -spec(unregister(ClientId :: binary()) -> ok).
 -spec(unregister(ClientId :: binary()) -> ok).
 unregister(ClientId) when is_binary(ClientId) ->
 unregister(ClientId) when is_binary(ClientId) ->
-    CmPid = gproc_pool:pick_worker(?POOL, ClientId),
-    gen_server2:cast(CmPid, {unregister, ClientId, self()}).
+    gen_server2:cast(pick(ClientId), {unregister, ClientId, self()}).
+
+pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId).
 
 
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %% gen_server callbacks
@@ -85,16 +85,16 @@ unregister(ClientId) when is_binary(ClientId) ->
 
 
 init([Pool, Id, StatsFun]) ->
 init([Pool, Id, StatsFun]) ->
     ?GPROC_POOL(join, Pool, Id),
     ?GPROC_POOL(join, Pool, Id),
-    {ok, #state{pool = Pool, id = Id,
-                statsfun = StatsFun,
-                monitors = dict:new()}}.
+    {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}.
 
 
-prioritise_call(_Req, _From, _Len, _State) ->
-    1.
+prioritise_call(Req, _From, _Len, _State) ->
+    case Req of
+        {register,   _Client} -> 2;
+        _                     -> 1
+    end.
 
 
 prioritise_cast(Msg, _Len, _State) ->
 prioritise_cast(Msg, _Len, _State) ->
     case Msg of
     case Msg of
-        {register,   _Client}         -> 2;
         {unregister, _ClientId, _Pid} -> 9;
         {unregister, _ClientId, _Pid} -> 9;
         _                             -> 1
         _                             -> 1
     end.
     end.
@@ -102,19 +102,19 @@ prioritise_cast(Msg, _Len, _State) ->
 prioritise_info(_Msg, _Len, _State) ->
 prioritise_info(_Msg, _Len, _State) ->
     3.
     3.
 
 
-handle_call(Req, _From, State) ->
-    ?UNEXPECTED_REQ(Req, State).
-
-handle_cast({register, Client = #mqtt_client{client_id  = ClientId,
-                                             client_pid = Pid}}, State) ->
+handle_call({register, Client = #mqtt_client{client_id  = ClientId,
+                                             client_pid = Pid}}, _From, State) ->
     case lookup_proc(ClientId) of
     case lookup_proc(ClientId) of
         Pid ->
         Pid ->
-            {noreply, State};
+            {reply, ok, State};
         _ ->
         _ ->
             ets:insert(mqtt_client, Client),
             ets:insert(mqtt_client, Client),
-            {noreply, setstats(monitor_client(ClientId, Pid, State))}
+            {reply, ok, setstats(monitor_client(ClientId, Pid, State))}
     end;
     end;
 
 
+handle_call(Req, _From, State) ->
+    ?UNEXPECTED_REQ(Req, State).
+
 handle_cast({unregister, ClientId, Pid}, State) ->
 handle_cast({unregister, ClientId, Pid}, State) ->
     case lookup_proc(ClientId) of
     case lookup_proc(ClientId) of
         Pid ->
         Pid ->

+ 2 - 2
src/emqttd_plugins.erl

@@ -101,7 +101,7 @@ plugin(PluginsDir, AppFile0) ->
     Descr = proplists:get_value(description, Attrs, ""),
     Descr = proplists:get_value(description, Attrs, ""),
     #mqtt_plugin{name = Name, version = Ver, config = AppsEnv1, descr = Descr}.
     #mqtt_plugin{name = Name, version = Ver, config = AppsEnv1, descr = Descr}.
 
 
-%% @doc Load One Plugin
+%% @doc Load a Plugin
 -spec(load(atom()) -> ok | {error, any()}).
 -spec(load(atom()) -> ok | {error, any()}).
 load(PluginName) when is_atom(PluginName) ->
 load(PluginName) when is_atom(PluginName) ->
     case lists:member(PluginName, names(started_app)) of
     case lists:member(PluginName, names(started_app)) of
@@ -161,7 +161,7 @@ find_plugin(Name) ->
 find_plugin(Name, Plugins) ->
 find_plugin(Name, Plugins) ->
     lists:keyfind(Name, 2, Plugins). 
     lists:keyfind(Name, 2, Plugins). 
 
 
-%% @doc UnLoad One Plugin
+%% @doc UnLoad a Plugin
 -spec(unload(atom()) -> ok | {error, any()}).
 -spec(unload(atom()) -> ok | {error, any()}).
 unload(PluginName) when is_atom(PluginName) ->
 unload(PluginName) when is_atom(PluginName) ->
     case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of
     case {lists:member(PluginName, names(started_app)), lists:member(PluginName, names(plugin))} of