Преглед на файлове

Merge branch 'dev' of github.com:emqtt/emqtt into dev

Feng преди 10 години
родител
ревизия
12b5722369
променени са 8 файла, в които са добавени 91 реда и са изтрити 22 реда
  1. 14 0
      CHANGELOG.md
  2. 1 1
      README.md
  3. 10 2
      rel/files/emqttd_ctl
  4. 1 1
      src/emqttd.app.src
  5. 10 7
      src/emqttd_bridge.erl
  6. 8 2
      src/emqttd_bridge_sup.erl
  7. 35 3
      src/emqttd_ctl.erl
  8. 12 6
      src/emqttd_pubsub.erl

+ 14 - 0
CHANGELOG.md

@@ -2,6 +2,20 @@
 emqttd ChangeLog
 ==================
 
+0.9.3-alpha (2015-07-25)
+-------------------------
+
+Wiki: [Bridge](https://github.com/emqtt/emqttd/wiki/Bridge)
+
+Improve: emqttd_protocol.hrl to define 'QOS_I'
+
+Improve: emqttd_pubsub to add subscribe/2 API
+
+Improve: ./bin/emqttd_ctl to support new bridges command
+
+Bugfix: issue #206 - Cannot bridge two nodes
+
+
 0.9.2-alpha (2015-07-18)
 -------------------------
 

+ 1 - 1
README.md

@@ -31,7 +31,7 @@ emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQT
 * Client Authentication with username, password.
 * Client ACL control with ipaddress, clientid, username.
 * Cluster brokers on several servers.
-* Bridge brokers locally or remotelly
+* [Bridge](https://github.com/emqtt/emqttd/wiki/Bridge) brokers locally or remotelly
 * 500K+ concurrent clients connections per server
 * Extensible architecture with Hooks, Modules and Plugins
 * Passed eclipse paho interoperability tests

+ 10 - 2
rel/files/emqttd_ctl

@@ -225,13 +225,19 @@ case "$1" in
         fi
         if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then
             $NODETOOL rpc emqttd_ctl bridges list
-        elif [ $# -eq 4 ]; then
+        elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then
+            $NODETOOL rpc emqttd_ctl bridges options
+        elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then
+            shift
+            $NODETOOL rpc emqttd_ctl bridges $@
+        elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then
             shift
             $NODETOOL rpc emqttd_ctl bridges $@
         else
             echo "Usage: "
             echo "$SCRIPT bridges list"
             echo "$SCRIPT bridges start <Node> <Topic>"
+            echo "$SCRIPT bridges start <Node> <Topic> <Options>"
             echo "$SCRIPT bridges stop  <Node> <Topic>"
             exit 1
         fi
@@ -308,8 +314,10 @@ case "$1" in
         echo "  plugins unload <Plugin>       #unload plugin"
         echo "  ----------------------------------------------------------------"
         echo "  bridges list                  #query bridges"
+        echo "  bridges options               #bridge options"
         echo "  bridges start <Node> <Topic>  #start bridge"
-        echo "  bridges stop <Node> <Topic>   #stop bridge"
+        echo "  bridges start <Node> <Topic> <Options> #start bridge with options"
+        echo "  bridges stop  <Node> <Topic>   #stop bridge"
         echo "  ----------------------------------------------------------------"
 		echo "  useradd <Username> <Password> #add user"
 		echo "  userdel <Username>            #delete user"

+ 1 - 1
src/emqttd.app.src

@@ -1,7 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
-  {vsn, "0.9.2"},
+  {vsn, "0.9.3"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 10 - 7
src/emqttd_bridge.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_bridge).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -44,18 +45,18 @@
 -define(PING_DOWN_INTERVAL, 1000).
 
 -record(state, {node, subtopic,
-                qos,
+                qos                = ?QOS_2,
                 topic_suffix       = <<>>,
                 topic_prefix       = <<>>,
-                mqueue             = emqttd_mqueue:mqueue(),
-                max_queue_len      = 0,
+                mqueue            :: emqttd_mqueue:mqueue(),
+                max_queue_len      = 10000,
                 ping_down_interval = ?PING_DOWN_INTERVAL,
                 status             = up}).
 
--type option()  :: {max_queue_len, pos_integer()} |
-                   {qos, mqtt_qos()} |
+-type option()  :: {qos, mqtt_qos()} |
                    {topic_suffix, binary()} |
                    {topic_prefix, binary()} |
+                   {max_queue_len, pos_integer()} |
                    {ping_down_interval, pos_integer()}.
 
 -export_type([option/0]).
@@ -85,7 +86,7 @@ init([Node, SubTopic, Options]) ->
             MQueue = emqttd_mqueue:new(qname(Node, SubTopic),
                                        [{max_len, State#state.max_queue_len}],
                                        emqttd_alarm:alarm_fun()),
-            emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
+            emqttd_pubsub:subscribe(SubTopic, State#state.qos),
             {ok, State#state{mqueue = MQueue}};
         false -> 
             {stop, {cannot_connect, Node}}
@@ -102,7 +103,9 @@ parse_opts([{topic_prefix, Prefix} | Opts], State) ->
 parse_opts([{max_queue_len, Len} | Opts], State) ->
     parse_opts(Opts, State#state{max_queue_len = Len});
 parse_opts([{ping_down_interval, Interval} | Opts], State) ->
-    parse_opts(Opts, State#state{ping_down_interval = Interval*1000}).
+    parse_opts(Opts, State#state{ping_down_interval = Interval*1000});
+parse_opts([_Opt | Opts], State) ->
+    parse_opts(Opts, State).
 
 qname(Node, SubTopic) when is_atom(Node) ->
     qname(atom_to_list(Node), SubTopic);

+ 8 - 2
src/emqttd_bridge_sup.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_bridge_sup).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -63,8 +64,13 @@ start_bridge(Node, SubTopic) when is_atom(Node) and is_binary(SubTopic) ->
 
 -spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}.
 start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) ->
-    Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options),
-    supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)).
+    case Node =:= node() of
+        true  ->
+            {error, bridge_to_self};
+        false ->
+            Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options),
+            supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1))
+    end.
 
 %%------------------------------------------------------------------------------
 %% @doc Stop a bridge

+ 35 - 3
src/emqttd_ctl.erl

@@ -147,13 +147,29 @@ listeners([]) ->
         end, esockd:listeners()).
 
 bridges(["list"]) ->
-    lists:foreach(fun({{Node, Topic}, _Pid}) -> 
-                ?PRINT("bridge: ~s ~s~n", [Node, Topic]) 
+    lists:foreach(fun({{Node, Topic}, _Pid}) ->
+                ?PRINT("bridge: ~s ~s~n", [Node, Topic])
         end, emqttd_bridge_sup:bridges());
 
+bridges(["options"]) ->
+    ?PRINT_MSG("Options:~n"),
+    ?PRINT_MSG("  qos     = 0 | 1 | 2~n"),
+    ?PRINT_MSG("  prefix  = string~n"),
+    ?PRINT_MSG("  suffix  = string~n"),
+    ?PRINT_MSG("  queue   = integer~n"),
+    ?PRINT_MSG("Example:~n"),
+    ?PRINT_MSG("  qos=2,prefix=abc/,suffix=/yxz,queue=1000~n");
+
 bridges(["start", SNode, Topic]) ->
     case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of
-        {ok, _} -> ?PRINT_MSG("bridge is started.~n"); 
+        {ok, _} -> ?PRINT_MSG("bridge is started.~n");
+        {error, Error} -> ?PRINT("error: ~p~n", [Error])
+    end;
+
+bridges(["start", SNode, Topic, OptStr]) ->
+    Opts = parse_opts(bridge, OptStr),
+    case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of
+        {ok, _} -> ?PRINT_MSG("bridge is started.~n");
         {error, Error} -> ?PRINT("error: ~p~n", [Error])
     end;
 
@@ -229,3 +245,19 @@ node_name(SNode) ->
 bin(S) when is_list(S) -> list_to_binary(S);
 bin(B) when is_binary(B) -> B.
 
+parse_opts(Cmd, OptStr) ->
+    Tokens = string:tokens(OptStr, ","),
+    [parse_opt(Cmd, list_to_atom(Opt), Val)
+        || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]].
+
+parse_opt(bridge, qos, Qos) ->
+    {qos, list_to_integer(Qos)};
+parse_opt(bridge, suffix, Suffix) ->
+    {topic_suffix, list_to_binary(Suffix)};
+parse_opt(bridge, prefix, Prefix) ->
+    {topic_prefix, list_to_binary(Prefix)};
+parse_opt(bridge, queue, Len) ->
+    {max_queue_len, list_to_integer(Len)};
+parse_opt(_Cmd, Opt, _Val) ->
+    ?PRINT("Bad Option: ~s~n", [Opt]).
+

+ 12 - 6
src/emqttd_pubsub.erl

@@ -43,7 +43,7 @@
 -export([start_link/2]).
 
 -export([create/1,
-         subscribe/1,
+         subscribe/1, subscribe/2,
          unsubscribe/1,
          publish/1]).
 
@@ -128,15 +128,21 @@ create(Topic) when is_binary(Topic) ->
 %% @doc Subscribe topic
 %% @end
 %%------------------------------------------------------------------------------
--spec subscribe({Topic, Qos} | list({Topic, Qos})) -> 
+-spec subscribe({Topic, Qos} | list({Topic, Qos})) ->
     {ok, Qos | list(Qos)} | {error, any()} when
     Topic   :: binary(),
-    Qos     :: mqtt_qos().
-subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
-    call({subscribe, self(), Topic, Qos});
+    Qos     :: mqtt_qos() | mqtt_qos_name().
+subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) ->
+    call({subscribe, self(), Topic, ?QOS_I(Qos)});
 
 subscribe(Topics = [{_Topic, _Qos} | _]) ->
-    call({subscribe, self(), Topics}).
+    call({subscribe, self(), [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- Topics]}).
+
+-spec subscribe(Topic, Qos) -> {ok, Qos} when
+     Topic :: binary(),
+     Qos   :: mqtt_qos() | mqtt_qos_name().
+subscribe(Topic, Qos) ->
+    subscribe({Topic, Qos}).
 
 %%------------------------------------------------------------------------------
 %% @doc Unsubscribe Topic or Topics