|
@@ -17,65 +17,103 @@
|
|
|
%% @doc The MQTT-SN Topic Registry
|
|
%% @doc The MQTT-SN Topic Registry
|
|
|
-module(emqx_mqttsn_registry).
|
|
-module(emqx_mqttsn_registry).
|
|
|
|
|
|
|
|
--behaviour(gen_server).
|
|
|
|
|
-
|
|
|
|
|
-include("emqx_mqttsn.hrl").
|
|
-include("emqx_mqttsn.hrl").
|
|
|
--include_lib("emqx/include/logger.hrl").
|
|
|
|
|
-
|
|
|
|
|
--export([start_link/2]).
|
|
|
|
|
-
|
|
|
|
|
--export([
|
|
|
|
|
- register_topic/3,
|
|
|
|
|
- unregister_topic/2,
|
|
|
|
|
- clean_predefined_topics/2
|
|
|
|
|
-]).
|
|
|
|
|
|
|
|
|
|
-export([
|
|
-export([
|
|
|
- lookup_topic/3,
|
|
|
|
|
- lookup_topic_id/3
|
|
|
|
|
|
|
+ persist_predefined_topics/1,
|
|
|
|
|
+ clear_predefined_topics/1
|
|
|
]).
|
|
]).
|
|
|
|
|
|
|
|
-%% gen_server callbacks
|
|
|
|
|
-export([
|
|
-export([
|
|
|
- init/1,
|
|
|
|
|
- handle_call/3,
|
|
|
|
|
- handle_cast/2,
|
|
|
|
|
- handle_info/2,
|
|
|
|
|
- terminate/2,
|
|
|
|
|
- code_change/3
|
|
|
|
|
|
|
+ init/0,
|
|
|
|
|
+ reg/2,
|
|
|
|
|
+ unreg/2,
|
|
|
|
|
+ lookup_topic/2,
|
|
|
|
|
+ lookup_topic_id/2
|
|
|
]).
|
|
]).
|
|
|
|
|
|
|
|
-%% Internal exports (RPC)
|
|
|
|
|
--export([
|
|
|
|
|
- do_register/4
|
|
|
|
|
-]).
|
|
|
|
|
-
|
|
|
|
|
--export([lookup_name/1]).
|
|
|
|
|
-
|
|
|
|
|
--define(SN_SHARD, emqx_mqttsn_shard).
|
|
|
|
|
|
|
+-define(PKEY(Id), {mqttsn, predef_topics, Id}).
|
|
|
|
|
+-define(PKEY_MAX_PREDEF_ID, {mqttsn, max_predef_topic_id}).
|
|
|
|
|
|
|
|
--record(state, {tabname, max_predef_topic_id = 0}).
|
|
|
|
|
|
|
+-type registry() :: #{
|
|
|
|
|
+ %% The next topic id to be assigned to new registration
|
|
|
|
|
+ next_topic_id := pos_integer(),
|
|
|
|
|
+ %% The mapping from topic id to topic name
|
|
|
|
|
+ id_to_name := map(),
|
|
|
|
|
+ %% The mapping from topic name to topic id
|
|
|
|
|
+ name_to_id := map()
|
|
|
|
|
+}.
|
|
|
|
|
|
|
|
--record(emqx_mqttsn_registry, {key, value}).
|
|
|
|
|
|
|
+%%-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
--type registry() :: {Tab :: atom(), RegistryPid :: pid()}.
|
|
|
|
|
|
|
+-spec persist_predefined_topics(list()) -> ok.
|
|
|
|
|
+persist_predefined_topics(PredefTopics) when is_list(PredefTopics) ->
|
|
|
|
|
+ MaxPredefId = lists:foldl(
|
|
|
|
|
+ fun(#{id := TopicId, topic := TopicName0}, AccId) ->
|
|
|
|
|
+ TopicName = iolist_to_binary(TopicName0),
|
|
|
|
|
+ persistent_term:put(?PKEY(TopicId), TopicName),
|
|
|
|
|
+ persistent_term:put(?PKEY(TopicName), TopicId),
|
|
|
|
|
+ case TopicId > AccId of
|
|
|
|
|
+ true -> TopicId;
|
|
|
|
|
+ false -> AccId
|
|
|
|
|
+ end
|
|
|
|
|
+ end,
|
|
|
|
|
+ 0,
|
|
|
|
|
+ PredefTopics
|
|
|
|
|
+ ),
|
|
|
|
|
+ persistent_term:put(?PKEY_MAX_PREDEF_ID, MaxPredefId),
|
|
|
|
|
+ ok.
|
|
|
|
|
|
|
|
-%%-----------------------------------------------------------------------------
|
|
|
|
|
|
|
+-spec clear_predefined_topics(list()) -> ok.
|
|
|
|
|
+clear_predefined_topics(PredefTopics) ->
|
|
|
|
|
+ lists:foreach(
|
|
|
|
|
+ fun(#{id := TopicId, topic := TopicName0}) ->
|
|
|
|
|
+ TopicName = iolist_to_binary(TopicName0),
|
|
|
|
|
+ persistent_term:erase(?PKEY(TopicId)),
|
|
|
|
|
+ persistent_term:erase(?PKEY(TopicName))
|
|
|
|
|
+ end,
|
|
|
|
|
+ PredefTopics
|
|
|
|
|
+ ),
|
|
|
|
|
+ persistent_term:erase(?PKEY_MAX_PREDEF_ID),
|
|
|
|
|
+ ok.
|
|
|
|
|
|
|
|
--spec start_link(atom(), list()) ->
|
|
|
|
|
- ignore
|
|
|
|
|
- | {ok, pid()}
|
|
|
|
|
- | {error, Reason :: term()}.
|
|
|
|
|
-start_link(InstaId, PredefTopics) ->
|
|
|
|
|
- gen_server:start_link(?MODULE, [InstaId, PredefTopics], []).
|
|
|
|
|
|
|
+-spec init() -> registry().
|
|
|
|
|
+init() ->
|
|
|
|
|
+ #{
|
|
|
|
|
+ next_topic_id => persistent_term:get(?PKEY_MAX_PREDEF_ID, 0),
|
|
|
|
|
+ id_to_name => #{},
|
|
|
|
|
+ name_to_id => #{}
|
|
|
|
|
+ }.
|
|
|
|
|
|
|
|
--spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic()) ->
|
|
|
|
|
- integer()
|
|
|
|
|
|
|
+-spec reg(emqx_types:topic(), registry()) ->
|
|
|
|
|
+ {ok, integer(), registry()}
|
|
|
| {error, term()}.
|
|
| {error, term()}.
|
|
|
-register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
|
|
|
|
|
|
|
+reg(
|
|
|
|
|
+ TopicName,
|
|
|
|
|
+ Registry = #{
|
|
|
|
|
+ next_topic_id := TopicId0,
|
|
|
|
|
+ id_to_name := IdMap,
|
|
|
|
|
+ name_to_id := NameMap
|
|
|
|
|
+ }
|
|
|
|
|
+) when is_binary(TopicName) ->
|
|
|
case emqx_topic:wildcard(TopicName) of
|
|
case emqx_topic:wildcard(TopicName) of
|
|
|
false ->
|
|
false ->
|
|
|
- gen_server:call(Pid, {register, ClientId, TopicName});
|
|
|
|
|
|
|
+ case maps:find(TopicName, NameMap) of
|
|
|
|
|
+ {ok, TopicId} ->
|
|
|
|
|
+ {ok, TopicId, Registry};
|
|
|
|
|
+ error ->
|
|
|
|
|
+ case next_topic_id(TopicId0) of
|
|
|
|
|
+ {error, too_large} ->
|
|
|
|
|
+ {error, too_large};
|
|
|
|
|
+ NextTopicId ->
|
|
|
|
|
+ NRegistry = Registry#{
|
|
|
|
|
+ next_topic_id := NextTopicId,
|
|
|
|
|
+ id_to_name := maps:put(NextTopicId, TopicName, IdMap),
|
|
|
|
|
+ name_to_id := maps:put(TopicName, NextTopicId, NameMap)
|
|
|
|
|
+ },
|
|
|
|
|
+ {ok, NextTopicId, NRegistry}
|
|
|
|
|
+ end
|
|
|
|
|
+ end;
|
|
|
%% TopicId: in case of “accepted” the value that will be used as topic
|
|
%% TopicId: in case of “accepted” the value that will be used as topic
|
|
|
%% id by the gateway when sending PUBLISH messages to the client (not
|
|
%% id by the gateway when sending PUBLISH messages to the client (not
|
|
|
%% relevant in case of subscriptions to a short topic name or to a topic
|
|
%% relevant in case of subscriptions to a short topic name or to a topic
|
|
@@ -84,192 +122,44 @@ register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
|
|
|
{error, wildcard_topic}
|
|
{error, wildcard_topic}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec lookup_topic(registry(), emqx_types:clientid(), pos_integer()) ->
|
|
|
|
|
|
|
+next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) ->
|
|
|
|
|
+ Id + 1;
|
|
|
|
|
+next_topic_id(Id) when is_integer(Id) ->
|
|
|
|
|
+ {error, too_large}.
|
|
|
|
|
+
|
|
|
|
|
+-spec lookup_topic(pos_integer(), registry()) ->
|
|
|
undefined
|
|
undefined
|
|
|
| binary().
|
|
| binary().
|
|
|
-lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) ->
|
|
|
|
|
- case lookup_element(Tab, {predef, TopicId}, 3) of
|
|
|
|
|
|
|
+lookup_topic(TopicId, _Registry = #{id_to_name := IdMap}) when is_integer(TopicId) ->
|
|
|
|
|
+ case persistent_term:get(?PKEY(TopicId), undefined) of
|
|
|
undefined ->
|
|
undefined ->
|
|
|
- lookup_element(Tab, {ClientId, TopicId}, 3);
|
|
|
|
|
|
|
+ maps:get(TopicId, IdMap, undefined);
|
|
|
Topic ->
|
|
Topic ->
|
|
|
Topic
|
|
Topic
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
--spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic()) ->
|
|
|
|
|
|
|
+-spec lookup_topic_id(emqx_types:topic(), registry()) ->
|
|
|
undefined
|
|
undefined
|
|
|
| pos_integer()
|
|
| pos_integer()
|
|
|
| {predef, integer()}.
|
|
| {predef, integer()}.
|
|
|
-lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) ->
|
|
|
|
|
- case lookup_element(Tab, {predef, TopicName}, 3) of
|
|
|
|
|
|
|
+lookup_topic_id(TopicName, _Registry = #{name_to_id := NameMap}) when is_binary(TopicName) ->
|
|
|
|
|
+ case persistent_term:get(?PKEY(TopicName), undefined) of
|
|
|
undefined ->
|
|
undefined ->
|
|
|
- lookup_element(Tab, {ClientId, TopicName}, 3);
|
|
|
|
|
|
|
+ maps:get(TopicName, NameMap, undefined);
|
|
|
TopicId ->
|
|
TopicId ->
|
|
|
{predef, TopicId}
|
|
{predef, TopicId}
|
|
|
end.
|
|
end.
|
|
|
|
|
|
|
|
-%% @private
|
|
|
|
|
-lookup_element(Tab, Key, Pos) ->
|
|
|
|
|
- try
|
|
|
|
|
- ets:lookup_element(Tab, Key, Pos)
|
|
|
|
|
- catch
|
|
|
|
|
- error:badarg -> undefined
|
|
|
|
|
- end.
|
|
|
|
|
-
|
|
|
|
|
--spec unregister_topic(registry(), emqx_types:clientid()) -> ok.
|
|
|
|
|
-unregister_topic({_, Pid}, ClientId) ->
|
|
|
|
|
- gen_server:call(Pid, {unregister, ClientId}).
|
|
|
|
|
-
|
|
|
|
|
--spec clean_predefined_topics(atom(), list()) -> ok.
|
|
|
|
|
-clean_predefined_topics(InstaId, PredefTopics) when is_list(PredefTopics) ->
|
|
|
|
|
- Tab = name(InstaId),
|
|
|
|
|
- F = fun(#{id := TopicId, topic := TopicName0}) ->
|
|
|
|
|
- TopicName = iolist_to_binary(TopicName0),
|
|
|
|
|
- mria:dirty_delete(Tab, {predef, TopicId}),
|
|
|
|
|
- mria:dirty_delete(Tab, {predef, TopicName})
|
|
|
|
|
- end,
|
|
|
|
|
- lists:foreach(F, PredefTopics).
|
|
|
|
|
-
|
|
|
|
|
-lookup_name(Pid) ->
|
|
|
|
|
- gen_server:call(Pid, name).
|
|
|
|
|
-
|
|
|
|
|
-%%-----------------------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
-name(InstaId) ->
|
|
|
|
|
- list_to_atom(lists:concat([emqx_mqttsn_, InstaId, '_registry'])).
|
|
|
|
|
-
|
|
|
|
|
-init([InstaId, PredefTopics]) ->
|
|
|
|
|
- %% {predef, TopicId} -> TopicName
|
|
|
|
|
- %% {predef, TopicName} -> TopicId
|
|
|
|
|
- %% {ClientId, TopicId} -> TopicName
|
|
|
|
|
- %% {ClientId, TopicName} -> TopicId
|
|
|
|
|
- Tab = name(InstaId),
|
|
|
|
|
- ok = mria:create_table(Tab, [
|
|
|
|
|
- {storage, ram_copies},
|
|
|
|
|
- {record_name, emqx_mqttsn_registry},
|
|
|
|
|
- {attributes, record_info(fields, emqx_mqttsn_registry)},
|
|
|
|
|
- {storage_properties, [{ets, [{read_concurrency, true}]}]},
|
|
|
|
|
- {rlog_shard, ?SN_SHARD}
|
|
|
|
|
- ]),
|
|
|
|
|
- ok = mria:wait_for_tables([Tab]),
|
|
|
|
|
- MaxPredefId = lists:foldl(
|
|
|
|
|
- fun(#{id := TopicId, topic := TopicName0}, AccId) ->
|
|
|
|
|
- TopicName = iolist_to_binary(TopicName0),
|
|
|
|
|
- mria:dirty_write(Tab, #emqx_mqttsn_registry{
|
|
|
|
|
- key = {predef, TopicId},
|
|
|
|
|
- value = TopicName
|
|
|
|
|
- }),
|
|
|
|
|
- mria:dirty_write(Tab, #emqx_mqttsn_registry{
|
|
|
|
|
- key = {predef, TopicName},
|
|
|
|
|
- value = TopicId
|
|
|
|
|
- }),
|
|
|
|
|
- case TopicId > AccId of
|
|
|
|
|
- true -> TopicId;
|
|
|
|
|
- false -> AccId
|
|
|
|
|
- end
|
|
|
|
|
- end,
|
|
|
|
|
- 0,
|
|
|
|
|
- PredefTopics
|
|
|
|
|
- ),
|
|
|
|
|
- {ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}.
|
|
|
|
|
-
|
|
|
|
|
-handle_call(
|
|
|
|
|
- {register, ClientId, TopicName},
|
|
|
|
|
- _From,
|
|
|
|
|
- State = #state{tabname = Tab, max_predef_topic_id = PredefId}
|
|
|
|
|
-) ->
|
|
|
|
|
- case lookup_topic_id({Tab, self()}, ClientId, TopicName) of
|
|
|
|
|
- {predef, PredefTopicId} when is_integer(PredefTopicId) ->
|
|
|
|
|
- {reply, PredefTopicId, State};
|
|
|
|
|
- TopicId when is_integer(TopicId) ->
|
|
|
|
|
- {reply, TopicId, State};
|
|
|
|
|
- undefined ->
|
|
|
|
|
- case next_topic_id(Tab, PredefId, ClientId) of
|
|
|
|
|
- TopicId when TopicId >= 16#FFFF ->
|
|
|
|
|
- {reply, {error, too_large}, State};
|
|
|
|
|
- TopicId ->
|
|
|
|
|
- case
|
|
|
|
|
- mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [
|
|
|
|
|
- Tab, ClientId, TopicId, TopicName
|
|
|
|
|
- ])
|
|
|
|
|
- of
|
|
|
|
|
- {atomic, ok} ->
|
|
|
|
|
- {reply, TopicId, State};
|
|
|
|
|
- {aborted, Error} ->
|
|
|
|
|
- {reply, {error, Error}, State}
|
|
|
|
|
- end
|
|
|
|
|
- end
|
|
|
|
|
- end;
|
|
|
|
|
-handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
|
|
|
|
|
- Registry = mnesia:dirty_match_object(
|
|
|
|
|
- Tab,
|
|
|
|
|
- {emqx_mqttsn_registry, {ClientId, '_'}, '_'}
|
|
|
|
|
- ),
|
|
|
|
|
- lists:foreach(
|
|
|
|
|
- fun(R) ->
|
|
|
|
|
- mria:dirty_delete_object(Tab, R)
|
|
|
|
|
- end,
|
|
|
|
|
- Registry
|
|
|
|
|
- ),
|
|
|
|
|
- {reply, ok, State};
|
|
|
|
|
-handle_call(name, _From, State = #state{tabname = Tab}) ->
|
|
|
|
|
- {reply, {Tab, self()}, State};
|
|
|
|
|
-handle_call(Req, _From, State) ->
|
|
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
- msg => "unexpected_call",
|
|
|
|
|
- call => Req
|
|
|
|
|
- }),
|
|
|
|
|
- {reply, ignored, State}.
|
|
|
|
|
-
|
|
|
|
|
-handle_cast(Msg, State) ->
|
|
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
- msg => "unexpected_cast",
|
|
|
|
|
- cast => Msg
|
|
|
|
|
- }),
|
|
|
|
|
- {noreply, State}.
|
|
|
|
|
-
|
|
|
|
|
-handle_info(Info, State) ->
|
|
|
|
|
- ?SLOG(error, #{
|
|
|
|
|
- msg => "unexpected_info",
|
|
|
|
|
- info => Info
|
|
|
|
|
- }),
|
|
|
|
|
- {noreply, State}.
|
|
|
|
|
-
|
|
|
|
|
-terminate(_Reason, _State) ->
|
|
|
|
|
- ok.
|
|
|
|
|
-
|
|
|
|
|
-code_change(_OldVsn, State, _Extra) ->
|
|
|
|
|
- {ok, State}.
|
|
|
|
|
-
|
|
|
|
|
-do_register(Tab, ClientId, TopicId, TopicName) ->
|
|
|
|
|
- mnesia:write(
|
|
|
|
|
- Tab,
|
|
|
|
|
- #emqx_mqttsn_registry{
|
|
|
|
|
- key = {ClientId, next_topic_id},
|
|
|
|
|
- value = TopicId + 1
|
|
|
|
|
- },
|
|
|
|
|
- write
|
|
|
|
|
- ),
|
|
|
|
|
- mnesia:write(
|
|
|
|
|
- Tab,
|
|
|
|
|
- #emqx_mqttsn_registry{
|
|
|
|
|
- key = {ClientId, TopicName},
|
|
|
|
|
- value = TopicId
|
|
|
|
|
- },
|
|
|
|
|
- write
|
|
|
|
|
- ),
|
|
|
|
|
- mnesia:write(
|
|
|
|
|
- Tab,
|
|
|
|
|
- #emqx_mqttsn_registry{
|
|
|
|
|
- key = {ClientId, TopicId},
|
|
|
|
|
- value = TopicName
|
|
|
|
|
- },
|
|
|
|
|
- write
|
|
|
|
|
- ).
|
|
|
|
|
-
|
|
|
|
|
-%%-----------------------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
-next_topic_id(Tab, PredefId, ClientId) ->
|
|
|
|
|
- case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of
|
|
|
|
|
- [#emqx_mqttsn_registry{value = Id}] -> Id;
|
|
|
|
|
- [] -> PredefId + 1
|
|
|
|
|
|
|
+-spec unreg(emqx_types:topic(), registry()) -> registry().
|
|
|
|
|
+unreg(TopicName, Registry = #{name_to_id := NameMap, id_to_name := IdMap}) when
|
|
|
|
|
+ is_binary(TopicName)
|
|
|
|
|
+->
|
|
|
|
|
+ case maps:find(TopicName, NameMap) of
|
|
|
|
|
+ {ok, TopicId} ->
|
|
|
|
|
+ Registry#{
|
|
|
|
|
+ name_to_id := maps:remove(TopicName, NameMap),
|
|
|
|
|
+ id_to_name := maps:remove(TopicId, IdMap)
|
|
|
|
|
+ };
|
|
|
|
|
+ error ->
|
|
|
|
|
+ Registry
|
|
|
end.
|
|
end.
|