Преглед изворни кода

Update protocol_name/1, type_name/1 functions for MQTT 5

Feng Lee пре 9 година
родитељ
комит
bbbfafb759
1 измењених фајлова са 52 додато и 32 уклоњено
  1. 52 32
      src/emqttd_protocol.erl

+ 52 - 32
src/emqttd_protocol.erl

@@ -1,5 +1,5 @@
 %%--------------------------------------------------------------------
-%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>.
+%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
 
 -module(emqttd_protocol).
 
+-author("Feng Lee <feng@emqtt.io>").
+
 -include("emqttd.hrl").
 
 -include("emqttd_protocol.hrl").
@@ -25,9 +27,11 @@
 -import(proplists, [get_value/2, get_value/3]).
 
 %% API
--export([init/3, info/1, clientid/1, client/1, session/1]).
+-export([init/3, info/1, stats/1, clientid/1, client/1, session/1]).
+
+-export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]).
 
--export([received/2, handle/2, send/2, redeliver/2, shutdown/2]).
+-export([received/2, send/2]).
 
 -export([process/2]).
 
@@ -39,17 +43,20 @@
                       session, ws_initial_headers, %% Headers from first HTTP request for websocket client
                       connected_at}).
 
--type proto_state() :: #proto_state{}.
+-type(proto_state() :: #proto_state{}).
 
 -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name,
                     keepalive, will_msg, ws_initial_headers, connected_at]).
 
+-define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]).
+
 -define(LOG(Level, Format, Args, State),
             lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format,
                         [State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])).
 
 %% @doc Init protocol
 init(Peername, SendFun, Opts) ->
+    lists:foreach(fun(K) -> put(K, 0) end, ?STATS_KEYS),
     MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
     WsInitialHeaders = get_value(ws_initial_headers, Opts),
     #proto_state{peername           = Peername,
@@ -61,6 +68,9 @@ init(Peername, SendFun, Opts) ->
 info(ProtoState) ->
     ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS).
 
+stats(_ProtoState) ->
+    [{K, get(K)} || K <- ?STATS_KEYS].
+
 clientid(#proto_state{client_id = ClientId}) ->
     ClientId.
 
@@ -115,22 +125,22 @@ received(Packet = ?PACKET(_Type), State) ->
             {error, Reason, State}
     end.
 
-handle({subscribe, RawTopicTable}, ProtoState = #proto_state{client_id = ClientId,
-                                                             username  = Username,
-                                                             session   = Session}) ->
+subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId,
+                                                   username  = Username,
+                                                   session   = Session}) ->
     TopicTable = parse_topic_table(RawTopicTable),
-    case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of
+    case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
         {ok, TopicTable1} ->
             emqttd_session:subscribe(Session, TopicTable1);
         {stop, _} ->
             ok
     end,
-    {ok, ProtoState};
+    {ok, ProtoState}.
 
-handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId,
-                                                           username  = Username,
-                                                           session   = Session}) ->
-    case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
+unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId,
+                                                 username  = Username,
+                                                 session   = Session}) ->
+    case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
         {ok, TopicTable} ->
             emqttd_session:unsubscribe(Session, TopicTable);
         {stop, _} ->
@@ -138,6 +148,9 @@ handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId,
     end,
     {ok, ProtoState}.
 
+%% @doc Send PUBREL
+pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State).
+
 process(Packet = ?CONNECT_PACKET(Var), State0) ->
 
     #mqtt_packet_connect{proto_ver  = ProtoVer,
@@ -187,7 +200,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
             {ReturnCode, false, State1}
     end,
     %% Run hooks
-    emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)),
+    emqttd_hooks:run('client.connected', [ReturnCode1], client(State3)),
     %% Send connack
     send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
     %% stop if authentication failure
@@ -220,8 +233,11 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?SUBACK_PACKET(PacketId, []), State);
 
 %% TODO: refactor later...
-process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session = Session,
-        client_id = ClientId, username = Username, is_superuser = IsSuperuser}) ->
+process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable),
+        State = #proto_state{session      = Session,
+                             client_id    = ClientId,
+                             username     = Username,
+                             is_superuser = IsSuperuser}) ->
     Client = client(State), TopicTable = parse_topic_table(RawTopicTable),
     AllowDenies = if
                     IsSuperuser -> [];
@@ -232,7 +248,7 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session
             ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State),
             send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
         false ->
-            case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of
+            case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of
                 {ok, TopicTable1} ->
                     emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State};
                 {stop, _} ->
@@ -244,9 +260,11 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session
 process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?UNSUBACK_PACKET(PacketId), State);
 
-process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), State = #proto_state{
-        client_id = ClientId, username = Username, session = Session}) ->
-    case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
+process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics),
+        State = #proto_state{client_id = ClientId,
+                             username  = Username,
+                             session   = Session}) ->
+    case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of
         {ok, TopicTable} ->
             emqttd_session:unsubscribe(Session, TopicTable);
         {stop, _} ->
@@ -262,7 +280,9 @@ process(?PACKET(?DISCONNECT), State) ->
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
-        #proto_state{client_id = ClientId, username = Username, session = Session}) ->
+        #proto_state{client_id = ClientId,
+                     username  = Username,
+                     session   = Session}) ->
     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
     emqttd_session:publish(Session, Msg);
 
@@ -287,7 +307,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
 -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
 send(Msg, State = #proto_state{client_id = ClientId, username = Username})
         when is_record(Msg, mqtt_message) ->
-    emqttd:run_hooks('message.delivered', [ClientId, Username], Msg),
+    emqttd_hooks:run('message.delivered', [ClientId, Username], Msg),
     send(emqttd_message:to_packet(Msg), State);
 
 send(Packet, State = #proto_state{sendfun = SendFun})
@@ -297,15 +317,15 @@ send(Packet, State = #proto_state{sendfun = SendFun})
     SendFun(Packet),
     {ok, State}.
 
-trace(recv, Packet, ProtoState) ->
+trace(recv, Packet = ?PACKET(Type), ProtoState) ->
+    inc(recv_pkt), ?IF(Type =:= ?PUBLISH, inc(recv_msg), ok),
     ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState);
 
-trace(send, Packet, ProtoState) ->
+trace(send, Packet = ?PACKET(Type), ProtoState) ->
+    inc(send_pkt), ?IF(Type =:= ?PUBLISH, inc(send_msg), ok),
     ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState).
 
-%% @doc redeliver PUBREL PacketId
-redeliver({?PUBREL, PacketId}, State) ->
-    send(?PUBREL_PACKET(PacketId), State).
+inc(Key) -> put(Key, get(Key) + 1).
 
 stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH ->
     {stop, {shutdown, auth_failure}, State};
@@ -325,7 +345,7 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
     ?LOG(info, "Shutdown for ~p", [Error], State),
     Client = client(State),
     send_willmsg(Client, WillMsg),
-    emqttd:run_hooks('client.disconnected', [Error], Client),
+    emqttd_hooks:run('client.disconnected', [Error], Client),
     %% let it down
     %% emqttd_cm:unreg(ClientId).
     ok.
@@ -375,19 +395,19 @@ validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
 
 validate_clientid(#mqtt_packet_connect{client_id = ClientId},
                   #proto_state{max_clientid_len = MaxLen})
-    when (size(ClientId) >= 1) andalso (size(ClientId) =< MaxLen) ->
+    when (byte_size(ClientId) >= 1) andalso (byte_size(ClientId) =< MaxLen) ->
     true;
 
 %% Issue#599: Null clientId and clean_sess = false
 validate_clientid(#mqtt_packet_connect{client_id  = ClientId,
                                        clean_sess = CleanSess}, _ProtoState)
-    when size(ClientId) == 0 andalso (not CleanSess) ->
+    when byte_size(ClientId) == 0 andalso (not CleanSess) ->
     false;
 
 %% MQTT3.1.1 allow null clientId.
-validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311,
+validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V4,
                                        client_id = ClientId}, _ProtoState)
-    when size(ClientId) =:= 0 ->
+    when byte_size(ClientId) =:= 0 ->
     true;
 
 validate_clientid(#mqtt_packet_connect{proto_ver  = ProtoVer,