Просмотр исходного кода

Merge branch 'emq20' of github.com:emqtt/emqttd into emq20

Feng Lee 9 лет назад
Родитель
Сommit
764e506f8d

+ 60 - 2
test/emqttd_SUITE.erl

@@ -47,7 +47,8 @@ all() ->
      {group, http},
      {group, cluster},
      {group, alarms},
-     {group, cli}].
+     {group, cli},
+     {group, cleanSession}].
 
 groups() ->
     [{protocol, [sequence],
@@ -103,7 +104,11 @@ groups() ->
        cli_bridges,
        cli_plugins,
        cli_listeners,
-       cli_vm]}].
+       cli_vm]},
+    {cleanSession, [sequence],
+      [cleanSession_validate,
+       cleanSession_validate1,
+       cleanSession_validate2]}].
 
 init_per_suite(Config) ->
     application:start(lager),
@@ -618,6 +623,59 @@ cli_vm(_) ->
     emqttd_cli:vm([]),
     emqttd_cli:vm(["ports"]).
 
+cleanSession_validate(_) ->
+    {ok, C1} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, false}]),
+    timer:sleep(10),
+    emqttc:subscribe(C1, <<"topic">>, qos0),
+    ok = emqttd_cli:sessions(["list", "persistent"]),
+    emqttc:disconnect(C1),
+    {ok, Pub} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"pub">>}]),
+
+    emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 0}]),
+    timer:sleep(10),
+    {ok, C11} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, false}]),
+    timer:sleep(100),
+    Metrics = emqttd_metrics:all(),
+    ct:log("Metrics:~p~n", [Metrics]),
+    ?assertEqual(1, proplists:get_value('messages/qos0/sent', Metrics)),
+    ?assertEqual(1, proplists:get_value('messages/qos0/received', Metrics)),
+    emqttc:disconnect(Pub),
+    emqttc:disconnect(C11).
+
+cleanSession_validate1(_) ->
+    {ok, C1} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, true}]),
+    timer:sleep(10),
+    emqttc:subscribe(C1, <<"topic">>, qos1),
+    ok = emqttd_cli:sessions(["list", "transient"]),
+    emqttc:disconnect(C1),
+    {ok, Pub} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"pub">>}]),
+
+    emqttc:publish(Pub, <<"topic">>, <<"m1">>, [{qos, 1}]),
+    timer:sleep(10),
+    {ok, C11} = emqttc:start_link([{host, "localhost"},
+                                         {port, 1883},
+                                         {client_id, <<"c1">>},
+                                         {clean_sess, false}]),
+    timer:sleep(100),
+    Metrics = emqttd_metrics:all(),
+    ?assertEqual(0, proplists:get_value('messages/qos1/sent', Metrics)),
+    ?assertEqual(1, proplists:get_value('messages/qos1/received', Metrics)),
+    emqttc:disconnect(Pub),
+    emqttc:disconnect(C11).
+
 
 ensure_ok(ok) -> ok;
 ensure_ok({error, {already_started, _}}) -> ok.

+ 68 - 30
test/emqttd_SUITE_data/emqttd.conf

@@ -1,3 +1,8 @@
+
+##===================================================================
+## EMQ Configuration R2.1
+##===================================================================
+
 ##--------------------------------------------------------------------
 ## Node Args
 ##--------------------------------------------------------------------
@@ -11,6 +16,11 @@ node.cookie = emq_dist_cookie
 ## SMP support: enable, auto, disable
 node.smp = auto
 
+## vm.args: -heart
+## Heartbeat monitoring of an Erlang runtime system
+## Value should be 'on' or comment the line
+## node.heartbeat = on
+
 ## Enable kernel poll
 node.kernel_poll = on
 
@@ -40,21 +50,30 @@ node.crash_dump = log/crash.dump
 node.dist_net_ticktime = 60
 
 ## Distributed node port range
-## node.dist_listen_min = 6000
-## node.dist_listen_max = 6999
+## node.dist_listen_min = 6369
+## node.dist_listen_max = 6369
 
 ##--------------------------------------------------------------------
 ## Log
 ##--------------------------------------------------------------------
 
+## Set the log dir
+log.dir = {{ platform_log_dir }}
+
 ## Console log. Enum: off, file, console, both
 log.console = console
 
+## Syslog. Enum: on, off
+log.syslog = on
+
+##  syslog level. Enum: debug, info, notice, warning, error, critical, alert, emergency
+log.syslog.level = error
+
 ## Console log level. Enum: debug, info, notice, warning, error, critical, alert, emergency
 log.console.level = error
 
 ## Console log file
-## log.console.file = log/console.log
+## log.console.file = {{ platform_log_dir }}/console.log
 
 ## Error log file
 log.error.file = log/error.log
@@ -64,6 +83,19 @@ log.crash = on
 
 log.crash.file = log/crash.log
 
+##--------------------------------------------------------------------
+## Allow Anonymous and Default ACL
+##--------------------------------------------------------------------
+
+## Allow Anonymous authentication
+mqtt.allow_anonymous = true
+
+## Default ACL File
+mqtt.acl_file = etc/acl.conf
+
+## Cache ACL for PUBLISH
+mqtt.cache_acl = true
+
 ##--------------------------------------------------------------------
 ## MQTT Protocol
 ##--------------------------------------------------------------------
@@ -74,34 +106,38 @@ mqtt.max_clientid_len = 1024
 ## Max Packet Size Allowed, 64K by default.
 mqtt.max_packet_size = 64KB
 
+##--------------------------------------------------------------------
+## MQTT Client
+##--------------------------------------------------------------------
+
 ## Client Idle Timeout (Second)
-mqtt.client_idle_timeout = 30
+mqtt.client.idle_timeout = 30s
 
-## Allow Anonymous authentication
-mqtt.allow_anonymous = true
-
-## Default ACL File
-mqtt.acl_file = etc/acl.conf
+## Enable client Stats: seconds or off
+mqtt.client.enable_stats = off
 
 ##--------------------------------------------------------------------
 ## MQTT Session
 ##--------------------------------------------------------------------
 
+## Upgrade QoS?
+mqtt.session.upgrade_qos = off
+
 ## Max number of QoS 1 and 2 messages that can be “inflight” at one time.
 ## 0 means no limit
-mqtt.session.max_inflight = 100
+mqtt.session.max_inflight = 32
 
-## Retry interval for redelivering QoS1/2 messages.
-mqtt.session.retry_interval = 60
-
-## Awaiting PUBREL Timeout
-mqtt.session.await_rel_timeout = 20
+## Retry Interval for redelivering QoS1/2 messages.
+mqtt.session.retry_interval = 20s
 
 ## Max Packets that Awaiting PUBREL, 0 means no limit
-mqtt.session.max_awaiting_rel = 0
+mqtt.session.max_awaiting_rel = 100
 
-## Statistics Collection Interval(seconds)
-mqtt.session.collect_interval = 0
+## Awaiting PUBREL Timeout
+mqtt.session.await_rel_timeout = 20s
+
+## Enable Statistics at the Interval(seconds)
+mqtt.session.enable_stats = off
 
 ## Expired after 1 day:
 ## w - week
@@ -109,7 +145,7 @@ mqtt.session.collect_interval = 0
 ## h - hour
 ## m - minute
 ## s - second
-mqtt.session.expired_after = 1d
+mqtt.session.expiry_interval = 2h
 
 ##--------------------------------------------------------------------
 ## MQTT Queue
@@ -204,12 +240,13 @@ mqtt.listener.ssl.max_clients = 512
 ## Rate Limit. Format is 'burst,rate', Unit is KB/Sec
 ## mqtt.listener.ssl.rate_limit = 100,10
 
-## Configuring SSL Options
-## See http://erlang.org/doc/man/ssl.html
-mqtt.listener.ssl.handshake_timeout = 15
+## Configuring SSL Options. See http://erlang.org/doc/man/ssl.html
+### TLS only for POODLE attack
+mqtt.listener.ssl.tls_versions = tlsv1.2,tlsv1.1,tlsv1
+mqtt.listener.ssl.handshake_timeout = 15s
 mqtt.listener.ssl.keyfile = certs/key.pem
 mqtt.listener.ssl.certfile = certs/cert.pem
-## mqtt.listener.ssl.cacertfile = etc/certs/cacert.pem
+## mqtt.listener.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
 ## mqtt.listener.ssl.verify = verify_peer
 ## mqtt.listener.ssl.fail_if_no_peer_cert = true
 
@@ -219,13 +256,14 @@ mqtt.listener.http.acceptors = 4
 mqtt.listener.http.max_clients = 64
 
 ## HTTP(SSL) Listener
-## mqtt.listener.https = 8084
-## mqtt.listener.https.acceptors = 4
-## mqtt.listener.https.max_clients = 64
-## mqtt.listener.https.handshake_timeout = 15
-## mqtt.listener.https.certfile = etc/certs/cert.pem
-## mqtt.listener.https.keyfile = etc/certs/key.pem
-## mqtt.listener.https.cacertfile = etc/certs/cacert.pem
+mqtt.listener.https = 8084
+mqtt.listener.https.acceptors = 4
+mqtt.listener.https.max_clients = 64
+mqtt.listener.https.handshake_timeout = 15
+mqtt.listener.https.keyfile = certs/key.pem
+mqtt.listener.https.certfile = certs/cert.pem
+## mqtt.listener.https.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
+
 ## mqtt.listener.https.verify = verify_peer
 ## mqtt.listener.https.fail_if_no_peer_cert = true
 

+ 144 - 62
test/emqttd_SUITE_data/emqttd.schema

@@ -22,6 +22,19 @@
   hidden
 ]}.
 
+%% @doc http://erlang.org/doc/man/heart.html
+{mapping, "node.heartbeat", "vm_args.-heart", [
+  {datatype, flag},
+  hidden
+]}.
+
+{translation, "vm_args.-heart", fun(Conf) ->
+    case cuttlefish:conf_get("node.heartbeat", Conf) of
+        true  -> "";
+        false -> cuttlefish:invalid("should be 'on' or comment the line!")
+    end
+end}.
+
 %% @doc Enable Kernel Poll
 {mapping, "node.kernel_poll", "vm_args.+K", [
   {default, on},
@@ -135,8 +148,13 @@
 %% Log
 %%--------------------------------------------------------------------
 
+{mapping, "log.dir", "lager.log_dir", [
+  {default, "log"},
+  {datatype, string}
+]}.
+
 {mapping, "log.console", "lager.handlers", [
-  {default, file },
+  {default, file},
   {datatype, {enum, [off, file, console, both]}}
 ]}.
 
@@ -155,6 +173,26 @@
   {datatype, file}
 ]}.
 
+{mapping, "log.syslog", "lager.handlers", [
+  {default,  off},
+  {datatype, flag}
+]}.
+
+{mapping, "log.syslog.identity", "lager.handlers", [
+  {default, "emq"},
+  {datatype, string}
+]}.
+
+{mapping, "log.syslog.facility", "lager.handlers", [
+  {default, local0},
+  {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}}
+]}.
+
+{mapping, "log.syslog.level", "lager.handlers", [
+  {default, err},
+  {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}}
+]}.
+
 {mapping, "log.error.redirect", "lager.error_logger_redirect", [
   {default, on},
   {datatype, flag},
@@ -196,7 +234,16 @@
       both -> [ConsoleHandler, ConsoleFileHandler];
       _ -> []
     end,
-    ConsoleHandlers ++ ErrorHandler
+
+    SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of
+      false -> [];
+      true  -> [{lager_syslog_backend,
+                  [cuttlefish:conf_get("log.syslog.identity", Conf),
+                   cuttlefish:conf_get("log.syslog.facility", Conf),
+                   cuttlefish:conf_get("log.syslog.level", Conf)]}]
+    end,
+
+    ConsoleHandlers ++ ErrorHandler ++ SyslogHandler
   end
 }.
 
@@ -226,6 +273,28 @@
   hidden
 ]}.
 
+%%--------------------------------------------------------------------
+%% Allow Anonymous and Default ACL
+%%--------------------------------------------------------------------
+
+%% @doc Allow Anonymous
+{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
+  {default, false},
+  {datatype, {enum, [true, false]}}
+]}.
+
+%% @doc Default ACL File
+{mapping, "mqtt.acl_file", "emqttd.acl_file", [
+  {datatype, string},
+  hidden
+]}.
+
+%% @doc Cache ACL for PUBLISH
+{mapping, "mqtt.cache_acl", "emqttd.cache_acl", [
+  {default, true},
+  {datatype, {enum, [true, false]}}
+]}.
+
 %%--------------------------------------------------------------------
 %% MQTT Protocol
 %%--------------------------------------------------------------------
@@ -242,35 +311,42 @@
   {datatype, bytesize}
 ]}.
 
-%% @doc Client Idle Timeout.
-{mapping, "mqtt.client_idle_timeout", "emqttd.protocol", [
-  {default, 30},
-  {datatype, integer}
-]}.
-
 {translation, "emqttd.protocol", fun(Conf) ->
-  [{max_clientid_len,    cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
-   {max_packet_size,     cuttlefish:conf_get("mqtt.max_packet_size", Conf)},
-   {client_idle_timeout, cuttlefish:conf_get("mqtt.client_idle_timeout", Conf)}]
+  [{max_clientid_len, cuttlefish:conf_get("mqtt.max_clientid_len", Conf)},
+   {max_packet_size,  cuttlefish:conf_get("mqtt.max_packet_size", Conf)}]
 end}.
 
-%% @doc Allow Anonymous
-{mapping, "mqtt.allow_anonymous", "emqttd.allow_anonymous", [
-  {default, false},
-  {datatype, {enum, [true, false]}},
-  hidden
+%%--------------------------------------------------------------------
+%% MQTT Client
+%%--------------------------------------------------------------------
+
+%% @doc Client Idle Timeout.
+{mapping, "mqtt.client.idle_timeout", "emqttd.client", [
+  {default, "30s"},
+  {datatype, {duration, ms}}
 ]}.
 
-%% @doc Default ACL File
-{mapping, "mqtt.acl_file", "emqttd.acl_file", [
-  {datatype, string},
-  hidden
+%% @doc Enable Stats of Client.
+{mapping, "mqtt.client.enable_stats", "emqttd.client", [
+  {default, off},
+  {datatype, [{duration, ms}, flag]}
 ]}.
 
+%% @doc Client
+{translation, "emqttd.client", fun(Conf) ->
+  [{client_idle_timeout, cuttlefish:conf_get("mqtt.client.idle_timeout", Conf)},
+   {client_enable_stats, cuttlefish:conf_get("mqtt.client.enable_stats", Conf)}]
+end}.
+
 %%--------------------------------------------------------------------
 %% MQTT Session
 %%--------------------------------------------------------------------
 
+%% @doc Upgrade QoS?
+{mapping, "mqtt.session.upgrade_qos", "emqttd.session", [
+  {default, off},
+  {datatype, flag}
+]}.
 %% @doc Max number of QoS 1 and 2 messages that can be “inflight” at one time.
 %% 0 means no limit
 {mapping, "mqtt.session.max_inflight", "emqttd.session", [
@@ -278,17 +354,10 @@ end}.
   {datatype, integer}
 ]}.
 
-
 %% @doc Retry interval for redelivering QoS1/2 messages.
 {mapping, "mqtt.session.retry_interval", "emqttd.session", [
-  {default, 60},
-  {datatype, integer}
-]}.
-
-%% @doc Awaiting PUBREL Timeout
-{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
-  {default, 30},
-  {datatype, integer}
+  {default, "20s"},
+  {datatype, {duration, ms}}
 ]}.
 
 %% @doc Max Packets that Awaiting PUBREL, 0 means no limit
@@ -297,25 +366,32 @@ end}.
   {datatype, integer}
 ]}.
 
-%% @doc Statistics Collection Interval(seconds)
-{mapping, "mqtt.session.collect_interval", "emqttd.session", [
-  {default, 0},
-  {datatype, integer}
+%% @doc Awaiting PUBREL Timeout
+{mapping, "mqtt.session.await_rel_timeout", "emqttd.session", [
+  {default, "20s"},
+  {datatype, {duration, ms}}
+]}.
+
+%% @doc Enable Stats
+{mapping, "mqtt.session.enable_stats", "emqttd.session", [
+  {default, off},
+  {datatype, [{duration, ms}, flag]}
 ]}.
 
-%% @doc Session expired after...
-{mapping, "mqtt.session.expired_after", "emqttd.session", [
-  {default, "2d"},
-  {datatype, {duration, s}}
+%% @doc Session Expiry Interval
+{mapping, "mqtt.session.expiry_interval", "emqttd.session", [
+  {default, "2h"},
+  {datatype, {duration, ms}}
 ]}.
 
 {translation, "emqttd.session", fun(Conf) ->
-  [{max_inflight, cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
-   {retry_interval, cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
+  [{upgrade_qos,       cuttlefish:conf_get("mqtt.session.upgrade_qos", Conf)},
+   {max_inflight,      cuttlefish:conf_get("mqtt.session.max_inflight", Conf)},
+   {retry_interval,    cuttlefish:conf_get("mqtt.session.retry_interval", Conf)},
+   {max_awaiting_rel,  cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
    {await_rel_timeout, cuttlefish:conf_get("mqtt.session.await_rel_timeout", Conf)},
-   {max_awaiting_rel, cuttlefish:conf_get("mqtt.session.max_awaiting_rel", Conf)},
-   {collect_interval, cuttlefish:conf_get("mqtt.session.collect_interval", Conf)},
-   {expired_after, cuttlefish:conf_get("mqtt.session.expired_after", Conf)}]
+   {enable_stats,      cuttlefish:conf_get("mqtt.session.enable_stats", Conf)},
+   {expiry_interval,   cuttlefish:conf_get("mqtt.session.expiry_interval", Conf)}]
 end}.
 
 %%--------------------------------------------------------------------
@@ -331,28 +407,25 @@ end}.
 %% @doc Topic Priority: 0~255, Default is 0
 {mapping, "mqtt.queue.priority", "emqttd.queue", [
   {default, ""},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 %% @doc Max queue length. Enqueued messages when persistent client disconnected, or inflight window is full.
 {mapping, "mqtt.queue.max_length", "emqttd.queue", [
   {default, infinity},
-  {datatype, [atom, integer]}
+  {datatype, [integer, {atom, infinity}]}
 ]}.
 
 %% @doc Low-water mark of queued messages
 {mapping, "mqtt.queue.low_watermark", "emqttd.queue", [
   {default, "20%"},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 %% @doc High-water mark of queued messages
 {mapping, "mqtt.queue.high_watermark", "emqttd.queue", [
   {default, "60%"},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 %% @doc Queue Qos0 messages?
@@ -405,8 +478,7 @@ end}.
 
 {mapping, "mqtt.pubsub.async", "emqttd.pubsub", [
   {default, true},
-  {datatype, {enum, [true, false]}},
-  hidden
+  {datatype, {enum, [true, false]}}
 ]}.
 
 {translation, "emqttd.pubsub", fun(Conf) ->
@@ -451,7 +523,7 @@ end}.
 %%--------------------------------------------------------------------
 
 {mapping, "mqtt.listener.tcp", "emqttd.listeners", [
-  {default, 1883},
+  %% {default, 1883},
   {datatype, [integer, ip]}
 ]}.
 
@@ -467,8 +539,7 @@ end}.
 
 {mapping, "mqtt.listener.tcp.rate_limit", "emqttd.listeners", [
   {default, undefined},
-  {datatype, string},
-  hidden
+  {datatype, string}
 ]}.
 
 {mapping, "mqtt.listener.tcp.backlog", "emqttd.listeners", [
@@ -497,7 +568,7 @@ end}.
 ]}.
 
 {mapping, "mqtt.listener.ssl", "emqttd.listeners", [
-  {default, 8883},
+  %% {default, 8883},
   {datatype, [integer, ip]}
 ]}.
 
@@ -515,9 +586,13 @@ end}.
   {datatype, string}
 ]}.
 
+{mapping, "mqtt.listener.ssl.tls_versions", "emqttd.listeners", [
+  {datatype, string}
+]}.
+
 {mapping, "mqtt.listener.ssl.handshake_timeout", "emqttd.listeners", [
-  {default, 15},
-  {datatype, integer}
+  {default, "15s"},
+  {datatype, {duration, ms}}
 ]}.
 
 {mapping, "mqtt.listener.ssl.keyfile", "emqttd.listeners", [
@@ -541,7 +616,7 @@ end}.
 ]}.
 
 {mapping, "mqtt.listener.http", "emqttd.listeners", [
-  {default, 8883},
+  %% {default, 8083},
   {datatype, [integer, ip]}
 ]}.
 
@@ -556,9 +631,8 @@ end}.
 ]}.
 
 {mapping, "mqtt.listener.https", "emqttd.listeners", [
-  {default, undefined},
-  {datatype, [integer, ip]},
-  hidden
+  %%{default, 8084},
+  {datatype, [integer, ip]}
 ]}.
 
 {mapping, "mqtt.listener.https.acceptors", "emqttd.listeners", [
@@ -610,8 +684,16 @@ end}.
                            {buffer,  cuttlefish:conf_get(Prefix ++ ".buffer", Conf, undefined)},
                            {nodelay, cuttlefish:conf_get(Prefix ++ ".nodelay", Conf, true)}])
               end,
+
+    SplitFun = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
+
     SslOpts = fun(Prefix) ->
-                  Filter([{handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf) * 1000},
+                 Versions = case SplitFun(cuttlefish:conf_get(Prefix ++ ".tls_versions", Conf, undefined)) of
+                                undefined -> undefined;
+                                L -> [list_to_atom(V) || V <- L]
+                            end,
+                  Filter([{versions, Versions},
+                          {handshake_timeout, cuttlefish:conf_get(Prefix ++ ".handshake_timeout", Conf), undefined},
                           {keyfile,    cuttlefish:conf_get(Prefix ++ ".keyfile", Conf, undefined)},
                           {certfile,   cuttlefish:conf_get(Prefix ++ ".certfile", Conf, undefined)},
                           {cacertfile, cuttlefish:conf_get(Prefix ++ ".cacertfile", Conf, undefined)},

+ 1 - 1
test/emqttd_net_SUITE.erl

@@ -28,7 +28,7 @@ groups() -> [{keepalive, [], [t_keepalive]}].
 %%--------------------------------------------------------------------
 
 t_keepalive(_) ->
-    KA = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
+    {ok, KA} = emqttd_keepalive:start(fun() -> {ok, 1} end, 1, {keepalive, timeout}),
     [resumed, timeout] = lists:reverse(keepalive_recv(KA, [])).
 
 keepalive_recv(KA, Acc) ->

+ 3 - 1
test/emqttd_protocol_SUITE.erl

@@ -22,6 +22,8 @@
 
 -include("emqttd.hrl").
 
+-include_lib("eunit/include/eunit.hrl").
+
 -include("emqttd_protocol.hrl").
 
 all() ->
@@ -344,7 +346,7 @@ message_make(_) ->
 message_from_packet(_) ->
     Msg = emqttd_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
     ?assertEqual(1, Msg#mqtt_message.qos),
-    ?assertEqual(10, Msg#mqtt_message.packet_id),
+    ?assertEqual(10, Msg#mqtt_message.pktid),
     ?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
     WillMsg = emqttd_message:from_packet(#mqtt_packet_connect{will_flag  = true,
                                                               will_topic = <<"WillTopic">>,