|
|
@@ -513,7 +513,10 @@ fields(keepalive) ->
|
|
|
fields(subscribe) ->
|
|
|
[
|
|
|
{topic, hoconsc:mk(binary(), #{desc => <<"Topic">>})},
|
|
|
- {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})}
|
|
|
+ {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>})},
|
|
|
+ {nl, hoconsc:mk(integer(), #{default => 0, desc => <<"No Local">>})},
|
|
|
+ {rap, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain as Published">>})},
|
|
|
+ {rh, hoconsc:mk(integer(), #{default => 0, desc => <<"Retain Handling">>})}
|
|
|
];
|
|
|
fields(unsubscribe) ->
|
|
|
[
|
|
|
@@ -536,9 +539,8 @@ authz_cache(delete, #{bindings := Bindings}) ->
|
|
|
clean_authz_cache(Bindings).
|
|
|
|
|
|
subscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
|
|
|
- Topic = maps:get(<<"topic">>, TopicInfo),
|
|
|
- Qos = maps:get(<<"qos">>, TopicInfo, 0),
|
|
|
- subscribe(#{clientid => ClientID, topic => Topic, qos => Qos}).
|
|
|
+ Opts = emqx_map_lib:unsafe_atom_key_map(TopicInfo),
|
|
|
+ subscribe(Opts#{clientid => ClientID}).
|
|
|
|
|
|
unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
|
|
|
Topic = maps:get(<<"topic">>, TopicInfo),
|
|
|
@@ -548,11 +550,7 @@ unsubscribe(post, #{bindings := #{clientid := ClientID}, body := TopicInfo}) ->
|
|
|
subscribe_batch(post, #{bindings := #{clientid := ClientID}, body := TopicInfos}) ->
|
|
|
Topics =
|
|
|
[
|
|
|
- begin
|
|
|
- Topic = maps:get(<<"topic">>, TopicInfo),
|
|
|
- Qos = maps:get(<<"qos">>, TopicInfo, 0),
|
|
|
- #{topic => Topic, qos => Qos}
|
|
|
- end
|
|
|
+ emqx_map_lib:unsafe_atom_key_map(TopicInfo)
|
|
|
|| TopicInfo <- TopicInfos
|
|
|
],
|
|
|
subscribe_batch(#{clientid => ClientID, topics => Topics}).
|
|
|
@@ -661,21 +659,16 @@ clean_authz_cache(#{clientid := ClientID}) ->
|
|
|
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}}
|
|
|
end.
|
|
|
|
|
|
-subscribe(#{clientid := ClientID, topic := Topic, qos := Qos}) ->
|
|
|
- case do_subscribe(ClientID, Topic, Qos) of
|
|
|
+subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
|
|
|
+ Opts = maps:with([qos, nl, rap, rh], Sub),
|
|
|
+ case do_subscribe(ClientID, Topic, Opts) of
|
|
|
{error, channel_not_found} ->
|
|
|
{404, ?CLIENT_ID_NOT_FOUND};
|
|
|
{error, Reason} ->
|
|
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
|
{500, #{code => <<"UNKNOW_ERROR">>, message => Message}};
|
|
|
{ok, Node} ->
|
|
|
- Response =
|
|
|
- #{
|
|
|
- clientid => ClientID,
|
|
|
- topic => Topic,
|
|
|
- qos => Qos,
|
|
|
- node => Node
|
|
|
- },
|
|
|
+ Response = Sub#{node => Node},
|
|
|
{200, Response}
|
|
|
end.
|
|
|
|
|
|
@@ -688,15 +681,18 @@ unsubscribe(#{clientid := ClientID, topic := Topic}) ->
|
|
|
end.
|
|
|
|
|
|
subscribe_batch(#{clientid := ClientID, topics := Topics}) ->
|
|
|
- ArgList = [[ClientID, Topic, Qos] || #{topic := Topic, qos := Qos} <- Topics],
|
|
|
+ ArgList = [
|
|
|
+ [ClientID, Topic, maps:with([qos, nl, rap, rh], Sub)]
|
|
|
+ || #{topic := Topic} = Sub <- Topics
|
|
|
+ ],
|
|
|
emqx_mgmt_util:batch_operation(?MODULE, do_subscribe, ArgList).
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% internal function
|
|
|
|
|
|
-do_subscribe(ClientID, Topic0, Qos) ->
|
|
|
- {Topic, Opts} = emqx_topic:parse(Topic0),
|
|
|
- TopicTable = [{Topic, Opts#{qos => Qos}}],
|
|
|
+do_subscribe(ClientID, Topic0, Options) ->
|
|
|
+ {Topic, Opts} = emqx_topic:parse(Topic0, Options),
|
|
|
+ TopicTable = [{Topic, Opts}],
|
|
|
case emqx_mgmt:subscribe(ClientID, TopicTable) of
|
|
|
{error, Reason} ->
|
|
|
{error, Reason};
|