Просмотр исходного кода

Merge pull request #210 from emqtt/dev

Dev
Feng Lee 10 лет назад
Родитель
Сommit
38d32cf529

+ 14 - 0
CHANGELOG.md

@@ -2,6 +2,20 @@
 emqttd ChangeLog
 ==================
 
+0.9.3-alpha (2015-07-25)
+-------------------------
+
+Wiki: [Bridge](https://github.com/emqtt/emqttd/wiki/Bridge)
+
+Improve: emqttd_protocol.hrl to define 'QOS_I'
+
+Improve: emqttd_pubsub to add subscribe/2 API
+
+Improve: ./bin/emqttd_ctl to support new bridges command
+
+Bugfix: issue #206 - Cannot bridge two nodes
+
+
 0.9.2-alpha (2015-07-18)
 -------------------------
 

+ 20 - 0
include/emqttd_protocol.hrl

@@ -49,6 +49,26 @@
 
 -type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2.
 
+-type mqtt_qos_name() :: qos0 | at_most_once  |
+                         qos1 | at_least_once |
+                         qos2 | exactly_once.
+
+-define(QOS_I(Name),
+    begin
+        (case Name of
+            ?QOS_0        -> ?QOS_0;
+            qos0          -> ?QOS_0;
+            at_most_once  -> ?QOS_0;
+            ?QOS_1        -> ?QOS_1;
+            qos1          -> ?QOS_1;
+            at_least_once -> ?QOS_1;
+            ?QOS_2        -> ?QOS_2;
+            qos2          -> ?QOS_2;
+            exactly_once  -> ?QOS_2
+        end)
+    end).
+
+
 %%------------------------------------------------------------------------------
 %% Max ClientId Length. Why 1024? NiDongDe!
 %%------------------------------------------------------------------------------

+ 10 - 2
rel/files/emqttd_ctl

@@ -225,13 +225,19 @@ case "$1" in
         fi
         if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then
             $NODETOOL rpc emqttd_ctl bridges list
-        elif [ $# -eq 4 ]; then
+        elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then
+            $NODETOOL rpc emqttd_ctl bridges options
+        elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then
+            shift
+            $NODETOOL rpc emqttd_ctl bridges $@
+        elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then
             shift
             $NODETOOL rpc emqttd_ctl bridges $@
         else
             echo "Usage: "
             echo "$SCRIPT bridges list"
             echo "$SCRIPT bridges start <Node> <Topic>"
+            echo "$SCRIPT bridges start <Node> <Topic> <Options>"
             echo "$SCRIPT bridges stop  <Node> <Topic>"
             exit 1
         fi
@@ -308,8 +314,10 @@ case "$1" in
         echo "  plugins unload <Plugin>       #unload plugin"
         echo "  ----------------------------------------------------------------"
         echo "  bridges list                  #query bridges"
+        echo "  bridges options               #bridge options"
         echo "  bridges start <Node> <Topic>  #start bridge"
-        echo "  bridges stop <Node> <Topic>   #stop bridge"
+        echo "  bridges start <Node> <Topic> <Options> #start bridge with options"
+        echo "  bridges stop  <Node> <Topic>   #stop bridge"
         echo "  ----------------------------------------------------------------"
 		echo "  useradd <Username> <Password> #add user"
 		echo "  userdel <Username>            #delete user"

+ 1 - 1
src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.9.2"},
+  {vsn, "0.9.3"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 10 - 7
src/emqttd_bridge.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_bridge).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -44,18 +45,18 @@
 -define(PING_DOWN_INTERVAL, 1000).
 
 -record(state, {node, subtopic,
-                qos,
+                qos                = ?QOS_2,
                 topic_suffix       = <<>>,
                 topic_prefix       = <<>>,
-                mqueue             = emqttd_mqueue:mqueue(),
-                max_queue_len      = 0,
+                mqueue            :: emqttd_mqueue:mqueue(),
+                max_queue_len      = 10000,
                 ping_down_interval = ?PING_DOWN_INTERVAL,
                 status             = up}).
 
--type option()  :: {max_queue_len, pos_integer()} |
-                   {qos, mqtt_qos()} |
+-type option()  :: {qos, mqtt_qos()} |
                    {topic_suffix, binary()} |
                    {topic_prefix, binary()} |
+                   {max_queue_len, pos_integer()} |
                    {ping_down_interval, pos_integer()}.
 
 -export_type([option/0]).
@@ -85,7 +86,7 @@ init([Node, SubTopic, Options]) ->
             MQueue = emqttd_mqueue:new(qname(Node, SubTopic),
                                        [{max_len, State#state.max_queue_len}],
                                        emqttd_alarm:alarm_fun()),
-            emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
+            emqttd_pubsub:subscribe(SubTopic, State#state.qos),
             {ok, State#state{mqueue = MQueue}};
         false -> 
             {stop, {cannot_connect, Node}}
@@ -102,7 +103,9 @@ parse_opts([{topic_prefix, Prefix} | Opts], State) ->
 parse_opts([{max_queue_len, Len} | Opts], State) ->
     parse_opts(Opts, State#state{max_queue_len = Len});
 parse_opts([{ping_down_interval, Interval} | Opts], State) ->
-    parse_opts(Opts, State#state{ping_down_interval = Interval*1000}).
+    parse_opts(Opts, State#state{ping_down_interval = Interval*1000});
+parse_opts([_Opt | Opts], State) ->
+    parse_opts(Opts, State).
 
 qname(Node, SubTopic) when is_atom(Node) ->
     qname(atom_to_list(Node), SubTopic);

+ 8 - 2
src/emqttd_bridge_sup.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_bridge_sup).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -63,8 +64,13 @@ start_bridge(Node, SubTopic) when is_atom(Node) and is_binary(SubTopic) ->
 
 -spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}.
 start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) ->
-    Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options),
-    supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)).
+    case Node =:= node() of
+        true  ->
+            {error, bridge_to_self};
+        false ->
+            Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options),
+            supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1))
+    end.
 
 %%------------------------------------------------------------------------------
 %% @doc Stop a bridge

+ 35 - 3
src/emqttd_ctl.erl

@@ -147,13 +147,29 @@ listeners([]) ->
         end, esockd:listeners()).
 
 bridges(["list"]) ->
-    lists:foreach(fun({{Node, Topic}, _Pid}) -> 
-                ?PRINT("bridge: ~s ~s~n", [Node, Topic]) 
+    lists:foreach(fun({{Node, Topic}, _Pid}) ->
+                ?PRINT("bridge: ~s ~s~n", [Node, Topic])
         end, emqttd_bridge_sup:bridges());
 
+bridges(["options"]) ->
+    ?PRINT_MSG("Options:~n"),
+    ?PRINT_MSG("  qos     = 0 | 1 | 2~n"),
+    ?PRINT_MSG("  prefix  = string~n"),
+    ?PRINT_MSG("  suffix  = string~n"),
+    ?PRINT_MSG("  queue   = integer~n"),
+    ?PRINT_MSG("Example:~n"),
+    ?PRINT_MSG("  qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
+
 bridges(["start", SNode, Topic]) ->
     case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of
-        {ok, _} -> ?PRINT_MSG("bridge is started.~n"); 
+        {ok, _} -> ?PRINT_MSG("bridge is started.~n");
+        {error, Error} -> ?PRINT("error: ~p~n", [Error])
+    end;
+
+bridges(["start", SNode, Topic, OptStr]) ->
+    Opts = parse_opts(bridge, OptStr),
+    case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of
+        {ok, _} -> ?PRINT_MSG("bridge is started.~n");
         {error, Error} -> ?PRINT("error: ~p~n", [Error])
     end;
 
@@ -229,3 +245,19 @@ node_name(SNode) ->
 bin(S) when is_list(S) -> list_to_binary(S);
 bin(B) when is_binary(B) -> B.
 
+parse_opts(Cmd, OptStr) ->
+    Tokens = string:tokens(OptStr, ","),
+    [parse_opt(Cmd, list_to_atom(Opt), Val)
+        || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
+
+parse_opt(bridge, qos, Qos) ->
+    {qos, list_to_integer(Qos)};
+parse_opt(bridge, suffix, Suffix) ->
+    {topic_suffix, list_to_binary(Suffix)};
+parse_opt(bridge, prefix, Prefix) ->
+    {topic_prefix, list_to_binary(Prefix)};
+parse_opt(bridge, queue, Len) ->
+    {max_queue_len, list_to_integer(Len)};
+parse_opt(_Cmd, Opt, _Val) ->
+    ?PRINT("Bad Option: ~s~n", [Opt]).
+

+ 3 - 3
src/emqttd_message.erl

@@ -55,14 +55,14 @@ make(From, Topic, Payload) ->
 
 -spec make(From, Qos, Topic, Payload) -> mqtt_message() when
     From    :: atom() | binary(),
-    Qos     :: mqtt_qos(),
+    Qos     :: mqtt_qos() | mqtt_qos_name(),
     Topic   :: binary(),
     Payload :: binary().
 make(From, Qos, Topic, Payload) ->
     #mqtt_message{msgid     = msgid(Qos),
                   topic     = Topic,
                   from      = From,
-                  qos       = Qos,
+                  qos       = ?QOS_I(Qos),
                   payload   = Payload,
                   timestamp = os:timestamp()}.
 
@@ -107,7 +107,7 @@ from_packet(ClientId, Packet) ->
 
 msgid(?QOS_0) ->
     undefined;
-msgid(_Qos)   ->
+msgid(Qos) when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 ->
     emqttd_guid:gen().
 
 %%------------------------------------------------------------------------------

+ 12 - 6
src/emqttd_pubsub.erl

@@ -43,7 +43,7 @@
 -export([start_link/2]).
 
 -export([create/1,
-         subscribe/1,
+         subscribe/1, subscribe/2,
          unsubscribe/1,
          publish/1]).
 
@@ -128,15 +128,21 @@ create(Topic) when is_binary(Topic) ->
 %% @doc Subscribe topic
 %% @end
 %%------------------------------------------------------------------------------
--spec subscribe({Topic, Qos} | list({Topic, Qos})) -> 
+-spec subscribe({Topic, Qos} | list({Topic, Qos})) ->
     {ok, Qos | list(Qos)} | {error, any()} when
     Topic   :: binary(),
-    Qos     :: mqtt_qos().
-subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
-    call({subscribe, self(), Topic, Qos});
+    Qos     :: mqtt_qos() | mqtt_qos_name().
+subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) ->
+    call({subscribe, self(), Topic, ?QOS_I(Qos)});
 
 subscribe(Topics = [{_Topic, _Qos} | _]) ->
-    call({subscribe, self(), Topics}).
+    call({subscribe, self(), [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- Topics]}).
+
+-spec subscribe(Topic, Qos) -> {ok, Qos} when
+     Topic :: binary(),
+     Qos   :: mqtt_qos() | mqtt_qos_name().
+subscribe(Topic, Qos) ->
+    subscribe({Topic, Qos}).
 
 %%------------------------------------------------------------------------------
 %% @doc Unsubscribe Topic or Topics

+ 47 - 0
src/emqttd_qos.erl

@@ -0,0 +1,47 @@
+%%%-----------------------------------------------------------------------------
+%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
+%%%
+%%% Permission is hereby granted, free of charge, to any person obtaining a copy
+%%% of this software and associated documentation files (the "Software"), to deal
+%%% in the Software without restriction, including without limitation the rights
+%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+%%% copies of the Software, and to permit persons to whom the Software is
+%%% furnished to do so, subject to the following conditions:
+%%%
+%%% The above copyright notice and this permission notice shall be included in all
+%%% copies or substantial portions of the Software.
+%%%
+%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+%%% SOFTWARE.
+%%%-----------------------------------------------------------------------------
+%%% @doc
+%%% emqttd Qos Functions.
+%%%
+%%% @end
+%%%-----------------------------------------------------------------------------
+-module(emqttd_qos).
+
+-include("emqttd_protocol.hrl").
+
+-export([a/1, i/1]).
+
+a(?QOS_0) -> qos0;
+a(?QOS_1) -> qos1;
+a(?QOS_2) -> qos2;
+a(qos0)   -> qos0;
+a(qos1)   -> qos1;
+a(qos2)   -> qos2.
+
+i(?QOS_0) -> ?QOS_0;
+i(?QOS_1) -> ?QOS_1;
+i(?QOS_2) -> ?QOS_2;
+i(qos0)   -> ?QOS_0;
+i(qos1)   -> ?QOS_1;
+i(qos2)   -> ?QOS_2.
+
+