Просмотр исходного кода

Update copyright, format record, add 'AUTH' packet type for MQTT 5.0

Feng Lee 9 лет назад
Родитель
Сommit
4df2a71c27
5 измененных файлов с 205 добавлено и 174 удалено
  1. 77 71
      include/emqttd.hrl
  2. 1 1
      include/emqttd_cli.hrl
  3. 1 1
      include/emqttd_internal.hrl
  4. 111 86
      include/emqttd_protocol.hrl
  5. 15 15
      include/emqttd_trie.hrl

+ 77 - 71
include/emqttd.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
 %% Banner
 %%--------------------------------------------------------------------
 
--define(COPYRIGHT, "Copyright (C) 2012-2017, Feng Lee <feng@emqtt.io>").
+-define(COPYRIGHT, "Copyright (c) 2013-2017 EMQ Enterprise, Inc.").
 
 -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0").
 
@@ -48,21 +48,21 @@
 %% MQTT Topic
 %%--------------------------------------------------------------------
 
--record(mqtt_topic, {
-    topic      :: binary(),
-    flags = [] :: [retained | static]
-}).
+-record(mqtt_topic,
+        { topic      :: binary(),
+          flags = [] :: [retained | static]
+        }).
 
 -type(mqtt_topic() :: #mqtt_topic{}).
 
 %%--------------------------------------------------------------------
 %% MQTT Subscription
 %%--------------------------------------------------------------------
--record(mqtt_subscription, {
-    subid :: binary() | atom(),
-    topic :: binary(),
-    qos   :: 0 | 1 | 2
-}).
+-record(mqtt_subscription,
+        { subid :: binary() | atom(),
+          topic :: binary(),
+          qos   :: 0 | 1 | 2
+        }).
 
 -type(mqtt_subscription() :: #mqtt_subscription{}).
 
@@ -73,18 +73,18 @@
 -type(ws_header_key() :: atom() | binary() | string()).
 -type(ws_header_val() :: atom() | binary() | string() | integer()).
 
--record(mqtt_client, {
-    client_id     :: binary() | undefined,
-    client_pid    :: pid(),
-    username      :: binary() | undefined,
-    peername      :: {inet:ip_address(), integer()},
-    clean_sess    :: boolean(),
-    proto_ver     :: 3 | 4,
-    keepalive = 0,
-    will_topic    :: undefined | binary(),
-    ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
-    connected_at  :: erlang:timestamp()
-}).
+-record(mqtt_client,
+        { client_id     :: binary() | undefined,
+          client_pid    :: pid(),
+          username      :: binary() | undefined,
+          peername      :: {inet:ip_address(), inet:port_number()},
+          clean_sess    :: boolean(),
+          proto_ver     :: 3 | 4,
+          keepalive = 0,
+          will_topic    :: undefined | binary(),
+          ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
+          connected_at  :: erlang:timestamp()
+        }).
 
 -type(mqtt_client() :: #mqtt_client{}).
 
@@ -92,33 +92,46 @@
 %% MQTT Session
 %%--------------------------------------------------------------------
 
--record(mqtt_session, {
-    client_id  :: binary(),
-    sess_pid   :: pid(),
-    persistent :: boolean()
-}).
+-record(mqtt_session,
+        { client_id  :: binary(),
+          sess_pid   :: pid(),
+          clean_sess :: boolean()
+        }).
 
 -type(mqtt_session() :: #mqtt_session{}).
 
 %%--------------------------------------------------------------------
 %% MQTT Message
 %%--------------------------------------------------------------------
+
 -type(mqtt_msgid() :: binary() | undefined).
+
 -type(mqtt_pktid() :: 1..16#ffff | undefined).
 
--record(mqtt_message, {
-    id              :: mqtt_msgid(),         %% Global unique message ID
-    pktid           :: mqtt_pktid(),         %% PacketId
-    from            :: {binary(), undefined | binary()}, %% ClientId and Username
-    topic           :: binary(),             %% Topic that the message is published to
-    qos     = 0     :: 0 | 1 | 2,            %% Message QoS
-    flags   = []    :: [retain | dup | sys], %% Message Flags
-    retain  = false :: boolean(),            %% Retain flag
-    dup     = false :: boolean(),            %% Dup flag
-    sys     = false :: boolean(),            %% $SYS flag
-    headers = []    :: list(),
-    payload         :: binary(),             %% Payload
-    timestamp       :: pos_integer()         %% os:timestamp to seconds
+-record(mqtt_message,
+        { %% Global unique message ID
+          id              :: mqtt_msgid(),
+          %% PacketId
+          pktid           :: mqtt_pktid(),
+          %% ClientId and Username
+          from            :: {binary(), undefined | binary()},
+          %% Topic that the message is published to
+          topic           :: binary(),
+          %% Message QoS
+          qos     = 0     :: 0 | 1 | 2,
+          %% Message Flags
+          flags   = []    :: [retain | dup | sys],
+          %% Retain flag
+          retain  = false :: boolean(),
+          %% Dup flag
+          dup     = false :: boolean(),
+          %% $SYS flag
+          sys     = false :: boolean(),
+          headers = []    :: list(),
+          %% Payload
+          payload         :: binary(),
+          %% Timestamp
+          timestamp       :: erlang:timestamp()
 }).
 
 -type(mqtt_message() :: #mqtt_message{}).
@@ -126,46 +139,45 @@
 %%--------------------------------------------------------------------
 %% MQTT Delivery
 %%--------------------------------------------------------------------
--record(mqtt_delivery, {
-    sender  :: pid(),          %% Pid of the sender/publisher
-    message :: mqtt_message(), %% Message
-    flows   :: list()
-}).
+
+-record(mqtt_delivery,
+        { sender  :: pid(),          %% Pid of the sender/publisher
+          message :: mqtt_message(), %% Message
+          flows   :: list()
+        }).
 
 -type(mqtt_delivery() :: #mqtt_delivery{}).
 
 %%--------------------------------------------------------------------
 %% MQTT Route
 %%--------------------------------------------------------------------
--record(mqtt_route, {
-    topic   :: binary(),
-    node    :: node()
-}).
+
+-record(mqtt_route,
+        { topic   :: binary(),
+          node    :: node()
+        }).
 
 -type(mqtt_route() :: #mqtt_route{}).
 
 %%--------------------------------------------------------------------
 %% MQTT Alarm
 %%--------------------------------------------------------------------
--record(mqtt_alarm, {
-    id          :: binary(),
-    severity    :: warning | error | critical,
-    title       :: iolist() | binary(),
-    summary     :: iolist() | binary(),
-    timestamp   :: erlang:timestamp() %% Timestamp
-}).
+
+-record(mqtt_alarm,
+        { id          :: binary(),
+          severity    :: warning | error | critical,
+          title       :: iolist() | binary(),
+          summary     :: iolist() | binary(),
+          timestamp   :: erlang:timestamp() %% Timestamp
+        }).
 
 -type(mqtt_alarm() :: #mqtt_alarm{}).
 
 %%--------------------------------------------------------------------
 %% MQTT Plugin
 %%--------------------------------------------------------------------
--record(mqtt_plugin, {
-    name,
-    version,
-    descr,
-    active = false
-}).
+
+-record(mqtt_plugin, { name, version, descr, active = false }).
 
 -type(mqtt_plugin() :: #mqtt_plugin{}).
 
@@ -173,14 +185,8 @@
 %% MQTT CLI Command
 %% For example: 'broker metrics'
 %%--------------------------------------------------------------------
--record(mqtt_cli, {
-    name,
-    action,
-    args = [],
-    opts = [],
-    usage,
-    descr
-}).
+
+-record(mqtt_cli, { name, action, args = [], opts = [], usage, descr }).
 
 -type(mqtt_cli() :: #mqtt_cli{}).
 

+ 1 - 1
include/emqttd_cli.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 1 - 1
include/emqttd_internal.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.

+ 111 - 86
include/emqttd_protocol.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -14,23 +14,32 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT Protocol Header
+%%--------------------------------------------------------------------
+%% MQTT SockOpts
+%%--------------------------------------------------------------------
+
+-define(MQTT_SOCKOPTS, [binary, {packet, raw}, {reuseaddr, true},
+                        {backlog, 512}, {nodelay, true}]).
 
 %%--------------------------------------------------------------------
 %% MQTT Protocol Version and Levels
 %%--------------------------------------------------------------------
--define(MQTT_PROTO_V31,  3).
--define(MQTT_PROTO_V311, 4).
+
+-define(MQTT_PROTO_V3, 3).
+-define(MQTT_PROTO_V4, 4).
+-define(MQTT_PROTO_V5, 5).
 
 -define(PROTOCOL_NAMES, [
-    {?MQTT_PROTO_V31,  <<"MQIsdp">>},
-    {?MQTT_PROTO_V311, <<"MQTT">>}]).
+    {?MQTT_PROTO_V3, <<"MQIsdp">>},
+    {?MQTT_PROTO_V4, <<"MQTT">>},
+    {?MQTT_PROTO_V5, <<"MQTT">>}]).
 
--type(mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311).
+-type(mqtt_vsn() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5).
 
 %%--------------------------------------------------------------------
-%% MQTT QoS
+%% MQTT QoS Level
 %%--------------------------------------------------------------------
+
 -define(QOS_0, 0). %% At most once
 -define(QOS_1, 1). %% At least once
 -define(QOS_2, 2). %% Exactly once
@@ -63,28 +72,30 @@
     end).
 
 %%--------------------------------------------------------------------
-%% Max ClientId Length. Why 1024? NiDongDe...
+%% Max ClientId Length. Why 1024?
 %%--------------------------------------------------------------------
+
 -define(MAX_CLIENTID_LEN, 1024).
 
 %%--------------------------------------------------------------------
 %% MQTT Control Packet Types
 %%--------------------------------------------------------------------
--define(RESERVED,     0).   %% Reserved
--define(CONNECT,      1).   %% Client request to connect to Server
--define(CONNACK,      2).   %% Server to Client: Connect acknowledgment
--define(PUBLISH,      3).   %% Publish message
--define(PUBACK,       4).   %% Publish acknowledgment
--define(PUBREC,       5).   %% Publish received (assured delivery part 1)
--define(PUBREL,       6).   %% Publish release (assured delivery part 2)
--define(PUBCOMP,      7).   %% Publish complete (assured delivery part 3)
--define(SUBSCRIBE,    8).   %% Client subscribe request
--define(SUBACK,       9).   %% Server Subscribe acknowledgment
--define(UNSUBSCRIBE, 10).   %% Unsubscribe request
--define(UNSUBACK,    11).   %% Unsubscribe acknowledgment
--define(PINGREQ,     12).   %% PING request
--define(PINGRESP,    13).   %% PING response
--define(DISCONNECT,  14).   %% Client is disconnecting
+-define(RESERVED,     0). %% Reserved
+-define(CONNECT,      1). %% Client request to connect to Server
+-define(CONNACK,      2). %% Server to Client: Connect acknowledgment
+-define(PUBLISH,      3). %% Publish message
+-define(PUBACK,       4). %% Publish acknowledgment
+-define(PUBREC,       5). %% Publish received (assured delivery part 1)
+-define(PUBREL,       6). %% Publish release (assured delivery part 2)
+-define(PUBCOMP,      7). %% Publish complete (assured delivery part 3)
+-define(SUBSCRIBE,    8). %% Client subscribe request
+-define(SUBACK,       9). %% Server Subscribe acknowledgment
+-define(UNSUBSCRIBE, 10). %% Unsubscribe request
+-define(UNSUBACK,    11). %% Unsubscribe acknowledgment
+-define(PINGREQ,     12). %% PING request
+-define(PINGRESP,    13). %% PING response
+-define(DISCONNECT,  14). %% Client is disconnecting
+-define(AUTH,        15). %% Authentication exchange
 
 -define(TYPE_NAMES, [
     'CONNECT',
@@ -100,25 +111,28 @@
     'UNSUBACK',
     'PINGREQ',
     'PINGRESP',
-    'DISCONNECT']).
+    'DISCONNECT',
+    'AUTH']).
 
 -type(mqtt_packet_type() :: ?RESERVED..?DISCONNECT).
 
 %%--------------------------------------------------------------------
 %% MQTT Connect Return Codes
 %%--------------------------------------------------------------------
--define(CONNACK_ACCEPT,      0).    %% Connection accepted
--define(CONNACK_PROTO_VER,   1).    %% Unacceptable protocol version
--define(CONNACK_INVALID_ID,  2).    %% Client Identifier is correct UTF-8 but not allowed by the Server
--define(CONNACK_SERVER,      3).    %% Server unavailable
--define(CONNACK_CREDENTIALS, 4).    %% Username or password is malformed
--define(CONNACK_AUTH,        5).    %% Client is not authorized to connect
+
+-define(CONNACK_ACCEPT,      0). %% Connection accepted
+-define(CONNACK_PROTO_VER,   1). %% Unacceptable protocol version
+-define(CONNACK_INVALID_ID,  2). %% Client Identifier is correct UTF-8 but not allowed by the Server
+-define(CONNACK_SERVER,      3). %% Server unavailable
+-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
+-define(CONNACK_AUTH,        5). %% Client is not authorized to connect
 
 -type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
 
 %%--------------------------------------------------------------------
 %% MQTT Parser and Serializer
 %%--------------------------------------------------------------------
+
 -define(MAX_LEN, 16#fffffff).
 -define(HIGHBIT, 2#10000000).
 -define(LOWBITS, 2#01111111).
@@ -126,76 +140,87 @@
 %%--------------------------------------------------------------------
 %% MQTT Packet Fixed Header
 %%--------------------------------------------------------------------
+
 -record(mqtt_packet_header, {
-    type   = ?RESERVED  :: mqtt_packet_type(),
-    dup    = false      :: boolean(),
-    qos    = ?QOS_0     :: mqtt_qos(),
-    retain = false      :: boolean()}).
+    type   = ?RESERVED :: mqtt_packet_type(),
+    dup    = false     :: boolean(),
+    qos    = ?QOS_0    :: mqtt_qos(),
+    retain = false     :: boolean()}).
 
 %%--------------------------------------------------------------------
 %% MQTT Packets
 %%--------------------------------------------------------------------
--type(mqtt_client_id()  :: binary()).
--type(mqtt_username()   :: binary() | undefined).
--type(mqtt_packet_id()  :: 1..16#ffff | undefined).
 
--record(mqtt_packet_connect,  {
-    client_id   = <<>>              :: mqtt_client_id(),
-    proto_ver   = ?MQTT_PROTO_V311  :: mqtt_vsn(),
-    proto_name  = <<"MQTT">>        :: binary(),
-    will_retain = false             :: boolean(),
-    will_qos    = ?QOS_0            :: mqtt_qos(),
-    will_flag   = false             :: boolean(),
-    clean_sess  = false             :: boolean(),
-    keep_alive  = 60                :: non_neg_integer(),
-    will_topic  = undefined         :: undefined | binary(),
-    will_msg    = undefined         :: undefined | binary(),
-    username    = undefined         :: undefined | binary(),
-    password    = undefined         :: undefined | binary()}).
-
--record(mqtt_packet_connack, {
-    ack_flags = ?RESERVED   :: 0 | 1,
-    return_code             :: mqtt_connack() }).
-
--record(mqtt_packet_publish, {
-    topic_name  :: binary(),
-    packet_id   :: mqtt_packet_id() }).
-
--record(mqtt_packet_puback, {
-    packet_id   :: mqtt_packet_id() }).
-
--record(mqtt_packet_subscribe, {
-    packet_id   :: mqtt_packet_id(),
-    topic_table :: list({binary(), mqtt_qos()}) }).
-
--record(mqtt_packet_unsubscribe, {
-    packet_id   :: mqtt_packet_id(),
-    topics      :: list(binary()) }).
-
--record(mqtt_packet_suback, {
-    packet_id   :: mqtt_packet_id(),
-    qos_table   :: list(mqtt_qos() | 128) }).
-
--record(mqtt_packet_unsuback, {
-    packet_id   :: mqtt_packet_id() }).
+-type(mqtt_client_id() :: binary()).
+-type(mqtt_username()  :: binary() | undefined).
+-type(mqtt_packet_id() :: 1..16#ffff | undefined).
+
+-record(mqtt_packet_connect,
+        { client_id   = <<>>           :: mqtt_client_id(),
+          proto_ver   = ?MQTT_PROTO_V4 :: mqtt_vsn(),
+          proto_name  = <<"MQTT">>     :: binary(),
+          will_retain = false          :: boolean(),
+          will_qos    = ?QOS_0         :: mqtt_qos(),
+          will_flag   = false          :: boolean(),
+          clean_sess  = false          :: boolean(),
+          keep_alive  = 60             :: non_neg_integer(),
+          will_topic  = undefined      :: undefined | binary(),
+          will_msg    = undefined      :: undefined | binary(),
+          username    = undefined      :: undefined | binary(),
+          password    = undefined      :: undefined | binary()
+        }).
+
+-record(mqtt_packet_connack,
+        { ack_flags = ?RESERVED :: 0 | 1,
+          return_code           :: mqtt_connack()
+        }).
+
+-record(mqtt_packet_publish,
+        { topic_name :: binary(),
+          packet_id  :: mqtt_packet_id()
+        }).
+
+-record(mqtt_packet_puback,
+        { packet_id :: mqtt_packet_id() }).
+
+-record(mqtt_packet_subscribe,
+        { packet_id   :: mqtt_packet_id(),
+          topic_table :: list({binary(), mqtt_qos()})
+        }).
+
+-record(mqtt_packet_unsubscribe,
+        { packet_id :: mqtt_packet_id(),
+          topics    :: list(binary())
+        }).
+
+-record(mqtt_packet_suback,
+        { packet_id   :: mqtt_packet_id(),
+          qos_table   :: list(mqtt_qos() | 128)
+        }).
+
+-record(mqtt_packet_unsuback,
+        { packet_id   :: mqtt_packet_id() }).
 
 %%--------------------------------------------------------------------
 %% MQTT Control Packet
 %%--------------------------------------------------------------------
--record(mqtt_packet, {
-    header    :: #mqtt_packet_header{},
-    variable  :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
-                | #mqtt_packet_publish{} | #mqtt_packet_puback{}
-                | #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
-                | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
-                | mqtt_packet_id() | undefined,
-    payload   :: binary() | undefined }).
 
--type mqtt_packet() :: #mqtt_packet{}.
+-record(mqtt_packet,
+        { header    :: #mqtt_packet_header{},
+          variable  :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
+                     | #mqtt_packet_publish{} | #mqtt_packet_puback{}
+                     | #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
+                     | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
+                     | mqtt_packet_id() | undefined,
+          payload   :: binary() | undefined
+        }).
+
+-type(mqtt_packet() :: #mqtt_packet{}).
 
 %%--------------------------------------------------------------------
 %% MQTT Packet Match
 %%--------------------------------------------------------------------
+
 -define(CONNECT_PACKET(Var),
     #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}).
 

+ 15 - 15
include/emqttd_trie.hrl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2016-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -16,20 +16,20 @@
 
 -type(trie_node_id() :: binary() | atom()).
 
--record(trie_node, {
-    node_id         :: trie_node_id(),
-    edge_count = 0  :: non_neg_integer(),
-    topic           :: binary() | undefined,
-    flags           :: [retained | static]
-}).
+-record(trie_node,
+        { node_id        :: trie_node_id(),
+          edge_count = 0 :: non_neg_integer(),
+          topic          :: binary() | undefined,
+          flags          :: [retained | static]
+        }).
 
--record(trie_edge, {
-    node_id :: trie_node_id(),
-    word    :: binary() | atom()
-}).
+-record(trie_edge,
+        { node_id :: trie_node_id(),
+          word    :: binary() | atom()
+        }).
 
--record(trie, {
-    edge    :: #trie_edge{},
-    node_id :: trie_node_id()
-}).
+-record(trie,
+        { edge    :: #trie_edge{},
+          node_id :: trie_node_id()
+        }).