Просмотр исходного кода

HTTP Publish: support 'topics' parameter

Feng 10 лет назад
Родитель
Сommit
978413298d
1 измененных файлов с 46 добавлено и 25 удалено
  1. 46 25
      src/emqttd_http.erl

+ 46 - 25
src/emqttd_http.erl

@@ -44,27 +44,9 @@ handle_request('GET', "/status", Req) ->
 %%--------------------------------------------------------------------
 
 handle_request('POST', "/mqtt/publish", Req) ->
-    Params = mochiweb_request:parse_post(Req),
-    lager:info("HTTP Publish: ~p", [Params]),
     case authorized(Req) of
-    true ->
-        ClientId = get_value("client", Params, http),
-        Qos      = int(get_value("qos", Params, "0")),
-        Retain   = bool(get_value("retain", Params,  "0")),
-        Topic    = list_to_binary(get_value("topic", Params)),
-        Payload  = list_to_binary(get_value("message", Params)),
-        case {validate(qos, Qos), validate(topic, Topic)} of
-            {true, true} ->
-                Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
-                emqttd:publish(Msg#mqtt_message{retain  = Retain}),
-                Req:ok({"text/plain", <<"ok">>});
-           {false, _} ->
-                Req:respond({400, [], <<"Bad QoS">>});
-            {_, false} ->
-                Req:respond({400, [], <<"Bad Topic">>})
-        end;
-    false ->
-        Req:respond({401, [], <<"Fobbiden">>})
+        true  -> http_publish(Req);
+        false -> Req:respond({401, [], <<"Fobbiden">>})
     end;
 
 %%--------------------------------------------------------------------
@@ -97,9 +79,53 @@ handle_request(Method, Path, Req) ->
     lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
     Req:not_found().
 
+%%--------------------------------------------------------------------
+%% HTTP Publish
+%%--------------------------------------------------------------------
+
+http_publish(Req) ->
+    Params = mochiweb_request:parse_post(Req),
+    lager:info("HTTP Publish: ~p", [Params]),
+    Topics   = topics(Params),
+    ClientId = get_value("client", Params, http),
+    Qos      = int(get_value("qos", Params, "0")),
+    Retain   = bool(get_value("retain", Params, "0")),
+    Payload  = list_to_binary(get_value("message", Params)),
+    case {validate(qos, Qos), validate(topics, Topics)} of
+        {true, true} ->
+            lists:foreach(fun(Topic) ->
+                Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
+                emqttd:publish(Msg#mqtt_message{retain  = Retain})
+            end, Topics),
+            Req:ok({"text/plain", <<"ok">>});
+       {false, _} ->
+            Req:respond({400, [], <<"Bad QoS">>});
+        {_, false} ->
+            Req:respond({400, [], <<"Bad Topics">>})
+    end.
+
+topics(Params) ->
+    Tokens = [get_value("topic", Params) | string:tokens(get_value("topics", Params, ""), ",")],
+    [list_to_binary(Token) || Token <- Tokens, Token =/= undefined].
+
+validate(qos, Qos) ->
+    (Qos >= ?QOS_0) and (Qos =< ?QOS_2);
+
+validate(topics, [Topic|Left]) ->
+    case validate(topic, Topic) of
+        true  -> validate(topics, Left);
+        false -> false
+    end;
+validate(topics, []) ->
+    true;
+
+validate(topic, Topic) ->
+    emqttd_topic:validate({name, Topic}).
+
 %%--------------------------------------------------------------------
 %% basic authorization
 %%--------------------------------------------------------------------
+
 authorized(Req) ->
     case Req:get_header_value("Authorization") of
     undefined ->
@@ -118,11 +144,6 @@ authorized(Req) ->
 user_passwd(BasicAuth) ->
     list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). 
 
-validate(qos, Qos) ->
-    (Qos >= ?QOS_0) and (Qos =< ?QOS_2); 
-
-validate(topic, Topic) ->
-    emqttd_topic:validate({name, Topic}).
 
 int(S) -> list_to_integer(S).