|
|
@@ -46,6 +46,11 @@
|
|
|
code_change/3
|
|
|
]).
|
|
|
|
|
|
+%% Internal exports (RPC)
|
|
|
+-export([
|
|
|
+ do_register/4
|
|
|
+]).
|
|
|
+
|
|
|
-export([lookup_name/1]).
|
|
|
|
|
|
-define(SN_SHARD, emqx_sn_shard).
|
|
|
@@ -173,33 +178,11 @@ handle_call(
|
|
|
TopicId when TopicId >= 16#FFFF ->
|
|
|
{reply, {error, too_large}, State};
|
|
|
TopicId ->
|
|
|
- Fun = fun() ->
|
|
|
- mnesia:write(
|
|
|
- Tab,
|
|
|
- #emqx_sn_registry{
|
|
|
- key = {ClientId, next_topic_id},
|
|
|
- value = TopicId + 1
|
|
|
- },
|
|
|
- write
|
|
|
- ),
|
|
|
- mnesia:write(
|
|
|
- Tab,
|
|
|
- #emqx_sn_registry{
|
|
|
- key = {ClientId, TopicName},
|
|
|
- value = TopicId
|
|
|
- },
|
|
|
- write
|
|
|
- ),
|
|
|
- mnesia:write(
|
|
|
- Tab,
|
|
|
- #emqx_sn_registry{
|
|
|
- key = {ClientId, TopicId},
|
|
|
- value = TopicName
|
|
|
- },
|
|
|
- write
|
|
|
- )
|
|
|
- end,
|
|
|
- case mria:transaction(?SN_SHARD, Fun) of
|
|
|
+ case
|
|
|
+ mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [
|
|
|
+ Tab, ClientId, TopicId, TopicName
|
|
|
+ ])
|
|
|
+ of
|
|
|
{atomic, ok} ->
|
|
|
{reply, TopicId, State};
|
|
|
{aborted, Error} ->
|
|
|
@@ -248,6 +231,32 @@ terminate(_Reason, _State) ->
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
|
{ok, State}.
|
|
|
|
|
|
+do_register(Tab, ClientId, TopicId, TopicName) ->
|
|
|
+ mnesia:write(
|
|
|
+ Tab,
|
|
|
+ #emqx_sn_registry{
|
|
|
+ key = {ClientId, next_topic_id},
|
|
|
+ value = TopicId + 1
|
|
|
+ },
|
|
|
+ write
|
|
|
+ ),
|
|
|
+ mnesia:write(
|
|
|
+ Tab,
|
|
|
+ #emqx_sn_registry{
|
|
|
+ key = {ClientId, TopicName},
|
|
|
+ value = TopicId
|
|
|
+ },
|
|
|
+ write
|
|
|
+ ),
|
|
|
+ mnesia:write(
|
|
|
+ Tab,
|
|
|
+ #emqx_sn_registry{
|
|
|
+ key = {ClientId, TopicId},
|
|
|
+ value = TopicName
|
|
|
+ },
|
|
|
+ write
|
|
|
+ ).
|
|
|
+
|
|
|
%%-----------------------------------------------------------------------------
|
|
|
|
|
|
next_topic_id(Tab, PredefId, ClientId) ->
|