Преглед изворни кода

fix issue#3 'PUBLISH' RETAIN

erylee пре 13 година
родитељ
комит
8fa63244bb
5 измењених фајлова са 112 додато и 4 уклоњено
  1. 1 0
      docs/keepalive.md
  2. 10 0
      src/emqtt_client.erl
  3. 97 0
      src/emqtt_retained.erl
  4. 3 2
      src/emqtt_router.erl
  5. 1 2
      src/emqtt_sup.erl

+ 1 - 0
docs/keepalive.md

@@ -0,0 +1 @@
+

+ 10 - 0
src/emqtt_client.erl

@@ -238,7 +238,11 @@ process_request(?PUBLISH,
 					 dup        = Dup,
 					 message_id = MessageId,
 					 payload    = Payload },
+	
 	emqtt_router:publish(Topic, Msg),
+
+	%Retained?
+	retained(Retain, Topic, Msg),
 	
 	send_frame(Sock,
 	  #mqtt_frame{ fixed    = #mqtt_frame_fixed{ type = ?PUBACK },
@@ -357,4 +361,10 @@ valid_client_id(ClientId) ->
     ClientIdLen = size(ClientId),
     1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN.
 
+retained(false, _Topic, _Msg) ->
+	ignore;
+retained(true, Topic, #mqtt_msg{payload = <<>>}) ->
+	emqtt_retained:delete(Topic);
+retained(true, Topic, Msg) ->
+	emqtt_retained:insert(Topic, Msg).
 

+ 97 - 0
src/emqtt_retained.erl

@@ -0,0 +1,97 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% Developer of the eMQTT Code is <ery.lee@gmail.com>
+%% Copyright (c) 2012 Ery Lee.  All rights reserved.
+%%
+
+-module(emqtt_retained).
+
+%%
+%% <<MQTT_V3.1_Protocol_Specific>>
+
+%% RETAIN
+%% Position: byte 1, bit 0.
+
+%% This flag is only used on PUBLISH messages. When a client sends a PUBLISH to a server, if the Retain flag is set (1), the server should hold on to the message after it has been delivered to the current subscribers.
+
+%% When a new subscription is established on a topic, the last retained message on that topic should be sent to the subscriber with the Retain flag set. If there is no retained message, nothing is sent
+
+%% This is useful where publishers send messages on a "report by exception" basis, where it might be some time between messages. This allows new subscribers to instantly receive data with the retained, or Last Known Good, value.
+
+%% When a server sends a PUBLISH to a client as a result of a subscription that already existed when the original PUBLISH arrived, the Retain flag should not be set, regardless of the Retain flag of the original PUBLISH. This allows a client to distinguish messages that are being received because they were retained and those that are being received "live".
+
+%% Retained messages should be kept over restarts of the server.
+
+%% A server may delete a retained message if it receives a message with a zero-length payload and the Retain flag set on the same topic.
+
+-include("emqtt.hrl").
+
+-export([start_link/0,
+		lookup/1,
+		insert/2,
+		delete/1,
+		send/2]).
+
+-behaviour(gen_server).
+
+-export([init/1,
+		handle_call/3,
+		handle_cast/2,
+		handle_info/2,
+		terminate/2,
+		code_change/3]).
+
+-record(state, {}).
+
+start_link() ->
+	gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+lookup(Topic) ->
+	ets:lookup(retained_msg, Topic).
+
+insert(Topic, Msg) ->
+	gen_server2:cast(?MODULE, {insert, Topic, Msg}).
+
+delete(Topic) ->
+	gen_server2:cast(?MODULE, {delete, Topic}).
+
+send(Topic, Client) ->
+	[Client ! {route, Msg} ||{_, Msg} <- lookup(Topic)].
+
+init([]) ->
+	ets:new(retained_msg, [set, protected, named_table]),
+	?INFO("~p is started.", [?MODULE]),
+	{ok, #state{}}.
+
+handle_call(Req, _From, State) ->
+	{stop, {badreq,Req}, State}.
+
+handle_cast({insert, Topic, Msg}, State) ->
+	ets:insert(retained_msg, {Topic, Msg}),
+	{noreply, State};
+
+handle_cast({delete, Topic}, State) ->
+	ets:delete(retained_msg, Topic),
+	{noreply, State};
+
+handle_cast(Msg, State) ->
+	{stop, {badmsg, Msg}, State}.
+
+handle_info(Info, State) ->
+	{stop, {badinfo, Info}, State}.
+
+terminate(_Reason, _State) ->
+	ok.
+
+code_change(_OldVsn, State, _Extra) ->
+	{ok, State}.
+
+

+ 3 - 2
src/emqtt_router.erl

@@ -73,6 +73,7 @@ handle_call({subscribe, Topic, Client}, _From, State) ->
 	end,
 	Ref = erlang:monitor(process, Client),
 	ets:insert(subscriber, #subscriber{topic=Topic, client=Client, monref=Ref}),
+	emqtt_retained:send(Topic, Client),
 	{reply, ok, State};
 
 handle_call(Req, _From, State) ->
@@ -104,8 +105,8 @@ handle_info(Info, State) ->
 terminate(_Reason, _State) ->
 	ok.
 
-code_change(_OldVsn, _State, _Extra) ->
-	ok.
+code_change(_OldVsn, State, _Extra) ->
+	{ok, State}.
 
 %--------------------------------------
 % internal functions

+ 1 - 2
src/emqtt_sup.erl

@@ -27,6 +27,7 @@ start_link(Listeners) ->
 init([Listeners]) ->
     {ok, { {one_for_all, 5, 10}, [
 		?CHILD(emqtt_auth, worker),
+		?CHILD(emqtt_retained, worker),
 		?CHILD(emqtt_router, worker),
 		?CHILD(emqtt_client_sup, supervisor)
 		| listener_children(Listeners) ]}
@@ -37,5 +38,3 @@ listener_children(Listeners) ->
 		{emqtt_client_sup, start_client, []}) || Listener <- Listeners]).
 
 
-
-