Sfoglia il codice sorgente

refactor(rlog): Fix initialization of emqx_cm_registry table

k32 4 anni fa
parent
commit
dc1deff3f3
1 ha cambiato i file con 5 aggiunte e 2 eliminazioni
  1. 5 2
      apps/emqx/src/emqx_cm_registry.erl

+ 5 - 2
apps/emqx/src/emqx_cm_registry.erl

@@ -48,7 +48,9 @@
 -define(TAB, emqx_channel_registry).
 -define(TAB, emqx_channel_registry).
 -define(LOCK, {?MODULE, cleanup_down}).
 -define(LOCK, {?MODULE, cleanup_down}).
 
 
--rlog_shard({?ROUTE_SHARD, ?TAB}).
+-define(CM_SHARD, emqx_cm_shard).
+
+-rlog_shard({?CM_SHARD, ?TAB}).
 
 
 -record(channel, {chid, pid}).
 -record(channel, {chid, pid}).
 
 
@@ -111,6 +113,7 @@ init([]) ->
                 {storage_properties, [{ets, [{read_concurrency, true},
                 {storage_properties, [{ets, [{read_concurrency, true},
                                              {write_concurrency, true}]}]}]),
                                              {write_concurrency, true}]}]}]),
     ok = ekka_mnesia:copy_table(?TAB, ram_copies),
     ok = ekka_mnesia:copy_table(?TAB, ram_copies),
+    ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
     ok = ekka:monitor(membership),
     ok = ekka:monitor(membership),
     {ok, #{}}.
     {ok, #{}}.
 
 
@@ -125,7 +128,7 @@ handle_cast(Msg, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
 handle_info({membership, {mnesia, down, Node}}, State) ->
     global:trans({?LOCK, self()},
     global:trans({?LOCK, self()},
                  fun() ->
                  fun() ->
-                     ekka_mnesia:transaction(?ROUTE_SHARD, fun cleanup_channels/1, [Node])
+                     ekka_mnesia:transaction(?CM_SHARD, fun cleanup_channels/1, [Node])
                  end),
                  end),
     {noreply, State};
     {noreply, State};