Explorar o código

Merge branch 'emq20' of github.com:emqtt/emqttd into emq20

Feng %!s(int64=9) %!d(string=hai) anos
pai
achega
321077179c
Modificáronse 3 ficheiros con 50 adicións e 40 borrados
  1. 17 38
      include/emqttd.hrl
  2. 16 2
      src/emqttd_app.erl
  3. 17 0
      src/emqttd_ctl.erl

+ 17 - 38
include/emqttd.hrl

@@ -14,8 +14,6 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT Broker Header
-
 %%--------------------------------------------------------------------
 %% Banner
 %%--------------------------------------------------------------------
@@ -77,30 +75,10 @@
 
 -type(mqtt_subscription() :: #mqtt_subscription{}).
 
-%% {<<"a/b/c">>, '$queue', <<"client1">>}
-%% {<<"a/b/c">>, undefined, <0.31.0>}
-%% {<<"a/b/c">>, <<"group1">>, <<"client2">>}
-%% -record(mqtt_subscription, {topic, share, destination :: pid() | binary()}).
-
-%%--------------------------------------------------------------------
-%% MQTT Credential
-%%--------------------------------------------------------------------
--record(mqtt_credential, {
-    clientid :: binary() | undefined, %% ClientId
-    username :: binary() | undefined, %% Username
-    token    :: binary() | undefined,
-    cookie   :: binary() | undefined
-}).
-
--type(mqtt_credential() :: #mqtt_credential{}).
-
 %%--------------------------------------------------------------------
 %% MQTT Client
 %%--------------------------------------------------------------------
 
--type ws_header_key() :: atom() | binary() | string().
--type ws_header_val() :: atom() | binary() | string() | integer().
-
 -record(mqtt_client, {
     client_id     :: binary() | undefined,
     client_pid    :: pid(),
@@ -110,7 +88,9 @@
     proto_ver     :: 3 | 4,
     keepalive = 0,
     will_topic    :: undefined | binary(),
-    ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
+    token         :: binary() | undefined, %% auth token
+    cookie        :: binary() | undefined, %% auth cookie
+    %%ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
     connected_at  :: erlang:timestamp()
 }).
 
@@ -135,20 +115,17 @@
 -type(mqtt_pktid() :: 1..16#ffff | undefined).
 
 -record(mqtt_message, {
-    msgid          :: mqtt_msgid(),         %% Global unique message ID
-    pktid          :: mqtt_pktid(),         %% PacketId
-    topic          :: binary(),             %% Topic that the message is published to
-    sender         :: pid(),                %% Pid of the sender/publisher
-    from,
-    credential     :: mqtt_credential(),    %% Credential of the sender/publisher
-    qos    = 0     :: 0 | 1 | 2,            %% Message QoS
-    flags  = []    :: [retain | dup | sys], %% Message Flags
-    retain = false :: boolean(),            %% Retain flag
-    dup    = false :: boolean(),            %% Dup flag
-    sys    = false :: boolean(),            %% $SYS flag
-    payload        :: binary(),             %% Payload
-    timestamp      :: erlang:timestamp(),   %% os:timestamp
-    extra = []     :: list()
+    msgid           :: mqtt_msgid(),         %% Global unique message ID
+    pktid           :: mqtt_pktid(),         %% PacketId
+    topic           :: binary(),             %% Topic that the message is published to
+    qos     = 0     :: 0 | 1 | 2,            %% Message QoS
+    flags   = []    :: [retain | dup | sys], %% Message Flags
+    retain  = false :: boolean(),            %% Retain flag
+    dup     = false :: boolean(),            %% Dup flag
+    sys     = false :: boolean(),            %% $SYS flag
+    headers = []    :: list(),
+    payload         :: binary(),             %% Payload
+    timestamp       :: erlang:timestamp()    %% os:timestamp
 }).
 
 -type(mqtt_message() :: #mqtt_message{}).
@@ -157,7 +134,9 @@
 %% MQTT Delivery
 %%--------------------------------------------------------------------
 -record(mqtt_delivery, {
-    message :: mqtt_message(), %% Message
+    sender  :: pid(),          %% Pid of the sender/publisher
+    from    :: binary(),
+    message :: mqtt_message(),  %% Message
     flows   :: list()
 }).
 

+ 16 - 2
src/emqttd_app.erl

@@ -159,8 +159,7 @@ load_mod({module, Name, Opts}) ->
 
 %% @doc Is module enabled?
 -spec(is_mod_enabled(Name :: atom()) -> boolean()).
-is_mod_enabled(Name) ->
-    lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
+is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)).
 
 %%--------------------------------------------------------------------
 %% Start Listeners
@@ -206,3 +205,18 @@ stop_listeners() -> lists:foreach(fun stop_listener/1, gen_conf:list(listener)).
 %% @private
 stop_listener({listener, Protocol, ListenOn, _Opts}) -> esockd:close(Protocol, ListenOn).
 
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+merge_sockopts_test_() ->
+    Opts =  [{acceptors, 16}, {max_clients, 512}],
+    ?_assert(merge_sockopts(Opts) == [{sockopts, [binary, {packet, raw}, {reuseaddr, true},
+                    {backlog, 512}, {nodelay, true}]}, {acceptors, 16}, {max_clients, 512}]).
+
+load_all_mods_test_() ->
+    ?_assert(load_all_mods() == ok).
+
+is_mod_enabled_test_() ->
+    ?_assert(is_mod_enabled(presence) == {module, presence, [{qos, 0}]}),
+    ?_assert(is_mod_enabled(test) == false).
+
+-endif.

+ 17 - 0
src/emqttd_ctl.erl

@@ -133,3 +133,20 @@ noreply(State) ->
 next_seq(State = #state{seq = Seq}) ->
     State#state{seq = Seq + 1}.
 
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+register_cmd_test_() ->
+    {setup, 
+        fun() ->
+            {ok, InitState} = emqttd_ctl:init([]),
+            InitState
+        end,
+        fun(State) ->
+            ok = emqttd_ctl:terminate(shutdown, State)
+        end,
+        fun(State = #state{seq = Seq}) -> 
+                emqttd_ctl:handle_cast({register_cmd, test0, {?MODULE, test0}, []}, State),
+                [?_assertMatch([{{0,test0},{?MODULE, test0}, []}], ets:lookup(?CMD_TAB, {Seq,test0}))]
+        end
+    }.
+-endif.