Просмотр исходного кода

Merge branch 'dev' of github.com:emqtt/emqtt into dev

Ery Lee 11 лет назад
Родитель
Сommit
2f2e4ff3db

+ 28 - 0
CHANGELOG.md

@@ -1,6 +1,34 @@
 eMQTT ChangeLog
 ==================
 
+v0.3.0-alpha (2015-01-18)
+------------------------
+
+NOTICE: Full MQTT 3.1.1 support now!
+
+Feature: Passed org.eclipse.paho.mqtt.testing/interoperability tests
+
+Feature: Qos0, Qos1 and Qos2 publish and suscribe
+
+Feature: session(clean_sess=false) management and offline messages
+
+Feature: redeliver awaiting puback/pubrec messages(doc: Chapter 4.4)
+
+Feature: retain messages, add emqtt_server module
+
+Feature: MQTT 3.1.1 null client_id support
+
+Bugfix: keepalive timeout to send will message 
+
+Improve: overlapping subscription support
+
+Improve: add emqtt_packet:dump to dump packets
+
+Test: passed org.eclipse.paho.mqtt.testing/interoperability
+
+Test: simple cluster test
+
+
 v0.2.1-beta (2015-01-08)
 ------------------------
 

+ 11 - 0
apps/emqtt/src/emqtt_message.erl

@@ -32,6 +32,8 @@
 
 -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
 
+-export([dump/1]).
+
 %%----------------------------------------------------------------------------
 
 -ifdef(use_specs).
@@ -120,3 +122,12 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
     Msg#mqtt_message{retain = false};
 unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
 
+
+%%
+%% @doc dump message
+%% 
+dump(#mqtt_message{msgid= MsgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic}) ->
+    io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
+              [ MsgId, Qos, Retain, Dup, Topic ]).
+
+

+ 0 - 114
apps/emqtt/src/emqtt_retained.erl

@@ -1,114 +0,0 @@
-%%-----------------------------------------------------------------------------
-%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
-%% 
-%% Permission is hereby granted, free of charge, to any person obtaining a copy
-%% of this software and associated documentation files (the "Software"), to deal
-%% in the Software without restriction, including without limitation the rights
-%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-%% copies of the Software, and to permit persons to whom the Software is
-%% furnished to do so, subject to the following conditions:
-%% 
-%% The above copyright notice and this permission notice shall be included in all
-%% copies or substantial portions of the Software.
-%% 
-%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-%% SOFTWARE.
-%%------------------------------------------------------------------------------
-
--module(emqtt_retained).
-
--author('feng@emqtt.io').
-
-%%TODO: FIXME Later...
-
-%%
-%% <<MQTT_V3.1_Protocol_Specific>>
-
-%% RETAIN
-%% Position: byte 1, bit 0.
-
-%% This flag is only used on PUBLISH messages. When a client sends a PUBLISH to a server, if the Retain flag is set (1), the server should hold on to the message after it has been delivered to the current subscribers.
-
-%% When a new subscription is established on a topic, the last retained message on that topic should be sent to the subscriber with the Retain flag set. If there is no retained message, nothing is sent
-
-%% This is useful where publishers send messages on a "report by exception" basis, where it might be some time between messages. This allows new subscribers to instantly receive data with the retained, or Last Known Good, value.
-
-%% When a server sends a PUBLISH to a client as a result of a subscription that already existed when the original PUBLISH arrived, the Retain flag should not be set, regardless of the Retain flag of the original PUBLISH. This allows a client to distinguish messages that are being received because they were retained and those that are being received "live".
-
-%% Retained messages should be kept over restarts of the server.
-
-%% A server may delete a retained message if it receives a message with a zero-length payload and the Retain flag set on the same topic.
-
--include("emqtt.hrl").
-
--export([start_link/0,
-        retain/1,
-		lookup/1,
-		insert/2,
-		delete/1,
-		send/2]).
-
--behaviour(gen_server).
-
--export([init/1,
-		handle_call/3,
-		handle_cast/2,
-		handle_info/2,
-		terminate/2,
-		code_change/3]).
-
--record(state, {}).
-
-start_link() ->
-	gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-
-retain(Msg = #mqtt_message{retain = true}) ->
-	Msg;
-
-retain(Msg) -> Msg.
-
-lookup(Topic) ->
-	ets:lookup(retained_msg, Topic).
-
-insert(Topic, Msg) ->
-	gen_server:cast(?MODULE, {insert, Topic, Msg}).
-
-delete(Topic) ->
-	gen_server:cast(?MODULE, {delete, Topic}).
-
-send(Topic, Client) ->
-    [Client ! {dispatch, {self(), Msg}} ||{_, Msg} <- lookup(Topic)].
-
-init([]) ->
-	ets:new(retained_msg, [set, protected, named_table]),
-	{ok, #state{}}.
-
-handle_call(Req, _From, State) ->
-	{stop, {badreq,Req}, State}.
-
-handle_cast({insert, Topic, Msg}, State) ->
-	ets:insert(retained_msg, {Topic, Msg}),
-	{noreply, State};
-
-handle_cast({delete, Topic}, State) ->
-	ets:delete(retained_msg, Topic),
-	{noreply, State};
-
-handle_cast(Msg, State) ->
-	{stop, {badmsg, Msg}, State}.
-
-handle_info(Info, State) ->
-	{stop, {badinfo, Info}, State}.
-
-terminate(_Reason, _State) ->
-	ok.
-
-code_change(_OldVsn, State, _Extra) ->
-	{ok, State}.
-
-

+ 2 - 1
apps/emqtt/src/emqtt_router.erl

@@ -65,8 +65,9 @@ start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
 route(Msg) ->
+    lager:info("Route message: ~s", [emqtt_message:dump(Msg)]),
     % need to retain?
-    emqtt_retained:retain(Msg),
+    emqtt_server:retain(Msg),
     % unset flag and pubsub
 	emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ).
 

+ 9 - 6
apps/emqtt/src/emqtt_server.erl

@@ -25,6 +25,7 @@
 -author('feng@slimpp.io').
 
 -include("emqtt.hrl").
+
 -include("emqtt_topic.hrl").
 
 -behaviour(gen_server).
@@ -71,7 +72,9 @@ retain(Msg = #mqtt_message{retain = true}) ->
 
 %% 
 subscribe(Topics, CPid) when is_pid(CPid) ->
+    lager:info("Retained Topics: ~p", [match(Topics)]),
     RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]),
+    lager:info("Retained Messages: ~p", [RetainedMsgs]),
     lists:foreach(fun(Msg) -> 
                 CPid ! {dispatch, {self(), retained_msg(Msg)}}
         end, RetainedMsgs).
@@ -89,8 +92,8 @@ init([RetainOpts]) ->
     Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT),
     {ok, #state{store_limit = Limit}}.
 
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
+handle_call(Req, _From, State) ->
+    {stop, {badreq, Req}, State}.
 
 handle_cast({retain, Msg = #mqtt_message{ qos = Qos, 
                                           topic = Topic, 
@@ -106,11 +109,11 @@ handle_cast({retain, Msg = #mqtt_message{ qos = Qos,
     end,
     {noreply, State};
 
-handle_cast(_Msg, State) ->
-    {noreply, State}.
+handle_cast(Msg, State) ->
+    {stop, {badmsg, Msg}, State}.
 
-handle_info(_Info, State) ->
-    {noreply, State}.
+handle_info(Info, State) ->
+    {stop, {badinfo, Info}, State}.
 
 terminate(_Reason, _State) ->
     ok.

+ 0 - 50
apps/emqtt/src/x.erl

@@ -1,50 +0,0 @@
--module(x).
--behaviour(gen_server).
--define(SERVER, ?MODULE).
-
-%% ------------------------------------------------------------------
-%% API Function Exports
-%% ------------------------------------------------------------------
-
--export([start_link/0]).
-
-%% ------------------------------------------------------------------
-%% gen_server Function Exports
-%% ------------------------------------------------------------------
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
-         terminate/2, code_change/3]).
-
-%% ------------------------------------------------------------------
-%% API Function Definitions
-%% ------------------------------------------------------------------
-
-start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-%% ------------------------------------------------------------------
-%% gen_server Function Definitions
-%% ------------------------------------------------------------------
-
-init(Args) ->
-    {ok, Args}.
-
-handle_call(_Request, _From, State) ->
-    {reply, ok, State}.
-
-handle_cast(_Msg, State) ->
-    {noreply, State}.
-
-handle_info(_Info, State) ->
-    {noreply, State}.
-
-terminate(_Reason, _State) ->
-    ok.
-
-code_change(_OldVsn, State, _Extra) ->
-    {ok, State}.
-
-%% ------------------------------------------------------------------
-%% Internal Function Definitions
-%% ------------------------------------------------------------------
-

+ 4 - 3
rel/files/vm.args

@@ -1,8 +1,8 @@
 ## Name of the node
--sname emqtt
+-name emqtt@127.0.0.1
 
 ## Cookie for distributed erlang
--setcookie emqtt
+-setcookie emqttsecretcookie
 
 ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
 ## (Disabled by default..use with caution!)
@@ -14,7 +14,7 @@
 +A 32
 
 ## max process numbers
-+P 100000
++P 1000000
 
 ## Increase number of concurrent ports/sockets
 -env ERL_MAX_PORTS 4096
@@ -25,3 +25,4 @@
 
 ## Tweak GC to run more often
 ##-env ERL_FULLSWEEP_AFTER 10
+#

+ 19 - 0
scripts/mosquitto_test

@@ -0,0 +1,19 @@
+#!/bin/sh
+# -*- tab-width:4;indent-tabs-mode:nil -*-
+# ex: ts=4 sw=4 et
+
+# slimple publish
+mosquitto_pub -t xxx/yyy -m hello 
+if [ "$?" == 0 ]; then
+    echo "[Success]: slimple publish"
+else
+    echo "[Failure]: slimple publish"
+fi
+
+# publish will willmsg 
+mosquitto_pub -q 1 -t a/b/c -m hahah -u test -P public --will-topic /will --will-payload willmsg --will-qos 1
+if [ "$?" == 0 ]; then
+    echo "[Success]: publish with willmsg"
+else
+    echo "[Failure]: publish with willmsg"
+fi