|
|
@@ -61,7 +61,7 @@ schema("/publish/bulk") ->
|
|
|
}
|
|
|
}.
|
|
|
|
|
|
-fields(publish_message) ->
|
|
|
+fields(message) ->
|
|
|
[
|
|
|
{topic,
|
|
|
hoconsc:mk(binary(), #{
|
|
|
@@ -75,7 +75,7 @@ fields(publish_message) ->
|
|
|
required => false,
|
|
|
default => 0
|
|
|
})},
|
|
|
- {from,
|
|
|
+ {clientid,
|
|
|
hoconsc:mk(binary(), #{
|
|
|
desc => <<"From client ID">>,
|
|
|
required => false,
|
|
|
@@ -94,34 +94,74 @@ fields(publish_message) ->
|
|
|
default => false
|
|
|
})}
|
|
|
];
|
|
|
+fields(publish_message) ->
|
|
|
+ [
|
|
|
+ {payload_encoding,
|
|
|
+ hoconsc:mk(hoconsc:enum([plain, base64]), #{
|
|
|
+ desc => <<"MQTT Payload Encoding, base64 or plain">>,
|
|
|
+ required => false,
|
|
|
+ default => plain
|
|
|
+ })}
|
|
|
+ ] ++ fields(message);
|
|
|
fields(publish_message_info) ->
|
|
|
[
|
|
|
{id,
|
|
|
hoconsc:mk(binary(), #{
|
|
|
desc => <<"Internal Message ID">>
|
|
|
})}
|
|
|
- ] ++ fields(publish_message).
|
|
|
+ ] ++ fields(message).
|
|
|
|
|
|
publish(post, #{body := Body}) ->
|
|
|
- Message = message(Body),
|
|
|
- _ = emqx_mgmt:publish(Message),
|
|
|
- {200, format_message(Message)}.
|
|
|
+ case message(Body) of
|
|
|
+ {ok, Message} ->
|
|
|
+ _ = emqx_mgmt:publish(Message),
|
|
|
+ {200, format_message(Message)};
|
|
|
+ {error, R} ->
|
|
|
+ {400, 'BAD_REQUEST', to_binary(R)}
|
|
|
+ end.
|
|
|
|
|
|
publish_batch(post, #{body := Body}) ->
|
|
|
- Messages = messages(Body),
|
|
|
- _ = [emqx_mgmt:publish(Message) || Message <- Messages],
|
|
|
- {200, format_message(Messages)}.
|
|
|
+ case messages(Body) of
|
|
|
+ {ok, Messages} ->
|
|
|
+ _ = [emqx_mgmt:publish(Message) || Message <- Messages],
|
|
|
+ {200, format_message(Messages)};
|
|
|
+ {error, R} ->
|
|
|
+ {400, 'BAD_REQUEST', to_binary(R)}
|
|
|
+ end.
|
|
|
|
|
|
message(Map) ->
|
|
|
- From = maps:get(<<"from">>, Map, http_api),
|
|
|
- QoS = maps:get(<<"qos">>, Map, 0),
|
|
|
- Topic = maps:get(<<"topic">>, Map),
|
|
|
- Payload = maps:get(<<"payload">>, Map),
|
|
|
- Retain = maps:get(<<"retain">>, Map, false),
|
|
|
- emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{}).
|
|
|
+ Encoding = maps:get(<<"payload_encoding">>, Map, plain),
|
|
|
+ case encode_payload(Encoding, maps:get(<<"payload">>, Map)) of
|
|
|
+ {ok, Payload} ->
|
|
|
+ From = maps:get(<<"clientid">>, Map, http_api),
|
|
|
+ QoS = maps:get(<<"qos">>, Map, 0),
|
|
|
+ Topic = maps:get(<<"topic">>, Map),
|
|
|
+ Retain = maps:get(<<"retain">>, Map, false),
|
|
|
+ {ok, emqx_message:make(From, QoS, Topic, Payload, #{retain => Retain}, #{})};
|
|
|
+ {error, R} ->
|
|
|
+ {error, R}
|
|
|
+ end.
|
|
|
+
|
|
|
+encode_payload(plain, Payload) -> {ok, Payload};
|
|
|
+encode_payload(base64, Payload) ->
|
|
|
+ try
|
|
|
+ {ok, base64:decode(Payload)}
|
|
|
+ catch _:_ ->
|
|
|
+ {error, {decode_base64_payload_failed, Payload}}
|
|
|
+ end.
|
|
|
|
|
|
messages(List) ->
|
|
|
- [message(MessageMap) || MessageMap <- List].
|
|
|
+ messages(List, []).
|
|
|
+
|
|
|
+messages([], Res) ->
|
|
|
+ {ok, lists:reverse(Res)};
|
|
|
+messages([MessageMap | List], Res) ->
|
|
|
+ case message(MessageMap) of
|
|
|
+ {ok, Message} ->
|
|
|
+ messages(List, [Message | Res]);
|
|
|
+ {error, R} ->
|
|
|
+ {error, R}
|
|
|
+ end.
|
|
|
|
|
|
format_message(Messages) when is_list(Messages) ->
|
|
|
[format_message(Message) || Message <- Messages];
|
|
|
@@ -134,7 +174,7 @@ format_message(#message{
|
|
|
topic => Topic,
|
|
|
payload => Payload,
|
|
|
retain => maps:get(retain, Flags, false),
|
|
|
- from => to_binary(From)
|
|
|
+ clientid => to_binary(From)
|
|
|
}.
|
|
|
|
|
|
to_binary(Data) when is_binary(Data) ->
|