|
|
@@ -118,7 +118,12 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
|
|
|
false ->
|
|
|
ok = emqx_router_helper:monitor(Dest),
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
- true -> trans(fun insert_trie_route/1, [Route]);
|
|
|
+ true ->
|
|
|
+ lock_router(),
|
|
|
+ try trans(fun insert_trie_route/1, [Route])
|
|
|
+ after
|
|
|
+ unlock_router()
|
|
|
+ end;
|
|
|
false -> insert_direct_route(Route)
|
|
|
end
|
|
|
end.
|
|
|
@@ -164,7 +169,12 @@ do_delete_route(Topic) when is_binary(Topic) ->
|
|
|
do_delete_route(Topic, Dest) ->
|
|
|
Route = #route{topic = Topic, dest = Dest},
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
- true -> trans(fun delete_trie_route/1, [Route]);
|
|
|
+ true ->
|
|
|
+ lock_router(),
|
|
|
+ try trans(fun delete_trie_route/1, [Route])
|
|
|
+ after
|
|
|
+ unlock_router()
|
|
|
+ end;
|
|
|
false -> delete_direct_route(Route)
|
|
|
end.
|
|
|
|
|
|
@@ -249,8 +259,19 @@ delete_trie_route(Route = #route{topic = Topic}) ->
|
|
|
%% @private
|
|
|
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
|
|
trans(Fun, Args) ->
|
|
|
- case mnesia:transaction(Fun, Args) of
|
|
|
- {atomic, Ok} -> Ok;
|
|
|
- {aborted, Reason} -> {error, Reason}
|
|
|
+ mnesia:sync_dirty(Fun, Args).
|
|
|
+
|
|
|
+lock_router() ->
|
|
|
+ %% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
|
|
+ %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
|
|
+ case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
|
|
|
+ false ->
|
|
|
+ %% Force to sleep 1ms instead.
|
|
|
+ timer:sleep(1),
|
|
|
+ lock_router();
|
|
|
+ true ->
|
|
|
+ ok
|
|
|
end.
|
|
|
|
|
|
+unlock_router() ->
|
|
|
+ global:del_lock({?MODULE, self()}).
|