|
|
@@ -758,6 +758,12 @@ subscribe(#{clientid := ClientID, topic := Topic} = Sub) ->
|
|
|
case do_subscribe(ClientID, Topic, Opts) of
|
|
|
{error, channel_not_found} ->
|
|
|
{404, ?CLIENTID_NOT_FOUND};
|
|
|
+ {error, invalid_subopts_nl} ->
|
|
|
+ {400, #{
|
|
|
+ code => <<"INVALID_PARAMETER">>,
|
|
|
+ message =>
|
|
|
+ <<"Invalid Subscribe options: `no_local` not allowed for shared-sub. See [MQTT-3.8.3-4]">>
|
|
|
+ }};
|
|
|
{error, Reason} ->
|
|
|
Message = list_to_binary(io_lib:format("~p", [Reason])),
|
|
|
{500, #{code => <<"UNKNOWN_ERROR">>, message => Message}};
|
|
|
@@ -804,18 +810,25 @@ unsubscribe_batch(#{clientid := ClientID, topics := Topics}) ->
|
|
|
%% internal function
|
|
|
|
|
|
do_subscribe(ClientID, Topic0, Options) ->
|
|
|
- {Topic, Opts} = emqx_topic:parse(Topic0, Options),
|
|
|
- TopicTable = [{Topic, Opts}],
|
|
|
- case emqx_mgmt:subscribe(ClientID, TopicTable) of
|
|
|
- {error, Reason} ->
|
|
|
- {error, Reason};
|
|
|
- {subscribe, Subscriptions, Node} ->
|
|
|
- case proplists:is_defined(Topic, Subscriptions) of
|
|
|
- true ->
|
|
|
- {ok, Options#{node => Node, clientid => ClientID, topic => Topic0}};
|
|
|
- false ->
|
|
|
- {error, unknow_error}
|
|
|
+ try emqx_topic:parse(Topic0, Options) of
|
|
|
+ {Topic, Opts} ->
|
|
|
+ TopicTable = [{Topic, Opts}],
|
|
|
+ case emqx_mgmt:subscribe(ClientID, TopicTable) of
|
|
|
+ {error, Reason} ->
|
|
|
+ {error, Reason};
|
|
|
+ {subscribe, Subscriptions, Node} ->
|
|
|
+ case proplists:is_defined(Topic, Subscriptions) of
|
|
|
+ true ->
|
|
|
+ {ok, Options#{node => Node, clientid => ClientID, topic => Topic0}};
|
|
|
+ false ->
|
|
|
+ {error, unknow_error}
|
|
|
+ end
|
|
|
end
|
|
|
+ catch
|
|
|
+ error:{invalid_subopts_nl, _} ->
|
|
|
+ {error, invalid_subopts_nl};
|
|
|
+ _:Reason ->
|
|
|
+ {error, Reason}
|
|
|
end.
|
|
|
|
|
|
-spec do_unsubscribe(emqx_types:clientid(), emqx_types:topic()) ->
|