瀏覽代碼

Merge pull request #1013 from emqtt/emq20

Merge emq20
Feng Lee 8 年之前
父節點
當前提交
fc1329d09a
共有 5 個文件被更改,包括 23 次插入13 次删除
  1. 1 1
      Makefile
  2. 2 2
      src/emqttd_bridge.erl
  3. 3 3
      src/emqttd_cli.erl
  4. 1 1
      src/emqttd_http.erl
  5. 16 6
      src/emqttd_session.erl

+ 1 - 1
Makefile

@@ -1,6 +1,6 @@
 PROJECT = emqttd
 PROJECT = emqttd
 PROJECT_DESCRIPTION = Erlang MQTT Broker
 PROJECT_DESCRIPTION = Erlang MQTT Broker
-PROJECT_VERSION = 2.2
+PROJECT_VERSION = 2.1.2
 
 
 DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt
 DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt
 
 

+ 2 - 2
src/emqttd_bridge.erl

@@ -37,7 +37,7 @@
 
 
 -record(state, {pool, id,
 -record(state, {pool, id,
                 node, subtopic,
                 node, subtopic,
-                qos                = ?QOS_2,
+                qos                = ?QOS_0,
                 topic_suffix       = <<>>,
                 topic_suffix       = <<>>,
                 topic_prefix       = <<>>,
                 topic_prefix       = <<>>,
                 mqueue             :: emqttd_mqueue:mqueue(),
                 mqueue             :: emqttd_mqueue:mqueue(),
@@ -74,7 +74,7 @@ init([Pool, Id, Node, Topic, Options]) ->
         true -> 
         true -> 
             true = erlang:monitor_node(Node, true),
             true = erlang:monitor_node(Node, true),
             Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
             Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
-            emqttd:subscribe(Topic, self(), [local, {share, Share}]),
+            emqttd:subscribe(Topic, self(), [local, {share, Share}, {qos, ?QOS_0}]),
             State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
             State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
             MQueue = emqttd_mqueue:new(qname(Node, Topic),
             MQueue = emqttd_mqueue:new(qname(Node, Topic),
                                        [{max_len, State#state.max_queue_len}],
                                        [{max_len, State#state.max_queue_len}],

+ 3 - 3
src/emqttd_cli.erl

@@ -548,15 +548,15 @@ print({ClientId, _ClientPid, _Persistent, SessInfo}) ->
                 deliver_msg,
                 deliver_msg,
                 enqueue_msg,
                 enqueue_msg,
                 created_at],
                 created_at],
-    ?PRINT("Session(~s, clean_sess=~s, max_inflight=~w, inflight=~w, "
+    ?PRINT("Session(~s, clean_sess=~s, subscriptions=~w, max_inflight=~w, inflight=~w, "
            "mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, "
            "mqueue_len=~w, mqueue_dropped=~w, awaiting_rel=~w, "
            "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n",
            "deliver_msg=~w, enqueue_msg=~w, created_at=~w)~n",
             [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]).
             [ClientId | [format(Key, get_value(Key, Data)) || Key <- InfoKeys]]).
 
 
-print(subscription, {Sub, Topic}) when is_pid(Sub) ->
-    ?PRINT("~p -> ~s~n", [Sub, Topic]);
 print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) ->
 print(subscription, {Sub, {_Share, Topic}}) when is_pid(Sub) ->
     ?PRINT("~p -> ~s~n", [Sub, Topic]);
     ?PRINT("~p -> ~s~n", [Sub, Topic]);
+print(subscription, {Sub, Topic}) when is_pid(Sub) ->
+    ?PRINT("~p -> ~s~n", [Sub, Topic]);
 print(subscription, {Sub, {_Share, Topic}}) ->
 print(subscription, {Sub, {_Share, Topic}}) ->
     ?PRINT("~s -> ~s~n", [Sub, Topic]);
     ?PRINT("~s -> ~s~n", [Sub, Topic]);
 print(subscription, {Sub, Topic}) ->
 print(subscription, {Sub, Topic}) ->

+ 1 - 1
src/emqttd_http.erl

@@ -31,7 +31,7 @@
 handle_request(Req) ->
 handle_request(Req) ->
     handle_request(Req:get(method), Req:get(path), Req).
     handle_request(Req:get(method), Req:get(path), Req).
 
 
-handle_request('GET', "/status", Req) ->
+handle_request(Method, "/status", Req) when Method =:= 'HEAD'; Method =:= 'GET' ->
     {InternalStatus, _ProvidedStatus} = init:get_status(),
     {InternalStatus, _ProvidedStatus} = init:get_status(),
     AppStatus =
     AppStatus =
     case lists:keysearch(emqttd, 1, application:which_applications()) of
     case lists:keysearch(emqttd, 1, application:which_applications()) of

+ 16 - 6
src/emqttd_session.erl

@@ -737,16 +737,26 @@ await(Msg = #mqtt_message{pktid = PacketId},
 acked(puback, PacketId, State = #state{client_id = ClientId,
 acked(puback, PacketId, State = #state{client_id = ClientId,
                                        username  = Username,
                                        username  = Username,
                                        inflight  = Inflight}) ->
                                        inflight  = Inflight}) ->
-    {publish, Msg, _Ts} = Inflight:lookup(PacketId),
-    emqttd_hooks:run('message.acked', [ClientId, Username], Msg),
-    State#state{inflight = Inflight:delete(PacketId)};
+    case Inflight:lookup(PacketId) of
+        {publish, Msg, _Ts} ->
+            emqttd_hooks:run('message.acked', [ClientId, Username], Msg),
+            State#state{inflight = Inflight:delete(PacketId)};
+        _ ->
+            ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
+            State
+    end;
 
 
 acked(pubrec, PacketId, State = #state{client_id = ClientId,
 acked(pubrec, PacketId, State = #state{client_id = ClientId,
                                        username  = Username,
                                        username  = Username,
                                        inflight  = Inflight}) ->
                                        inflight  = Inflight}) ->
-    {publish, Msg, _Ts} = Inflight:lookup(PacketId),
-    emqttd_hooks:run('message.acked', [ClientId, Username], Msg),
-    State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})};
+    case Inflight:lookup(PacketId) of
+        {publish, Msg, _Ts} ->
+            emqttd_hooks:run('message.acked', [ClientId, Username], Msg),
+            State#state{inflight = Inflight:update(PacketId, {pubrel, PacketId, os:timestamp()})}; 
+        {pubrel, PacketId, _Ts} ->
+            ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
+            State
+    end;
 
 
 acked(pubcomp, PacketId, State = #state{inflight = Inflight}) ->
 acked(pubcomp, PacketId, State = #state{inflight = Inflight}) ->
     State#state{inflight = Inflight:delete(PacketId)}.
     State#state{inflight = Inflight:delete(PacketId)}.