|
|
@@ -119,11 +119,7 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
|
|
|
ok = emqx_router_helper:monitor(Dest),
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
true ->
|
|
|
- lock_router(),
|
|
|
- try trans(fun insert_trie_route/1, [Route])
|
|
|
- after
|
|
|
- unlock_router()
|
|
|
- end;
|
|
|
+ maybe_trans(fun insert_trie_route/1, [Route]);
|
|
|
false -> insert_direct_route(Route)
|
|
|
end
|
|
|
end.
|
|
|
@@ -170,11 +166,7 @@ do_delete_route(Topic, Dest) ->
|
|
|
Route = #route{topic = Topic, dest = Dest},
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
true ->
|
|
|
- lock_router(),
|
|
|
- try trans(fun delete_trie_route/1, [Route])
|
|
|
- after
|
|
|
- unlock_router()
|
|
|
- end;
|
|
|
+ maybe_trans(fun delete_trie_route/1, [Route]);
|
|
|
false -> delete_direct_route(Route)
|
|
|
end.
|
|
|
|
|
|
@@ -257,9 +249,30 @@ delete_trie_route(Route = #route{topic = Topic}) ->
|
|
|
end.
|
|
|
|
|
|
%% @private
|
|
|
+-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
|
|
+maybe_trans(Fun, Args) ->
|
|
|
+ case persistent_term:get(emqx_route_lock_type) of
|
|
|
+ key ->
|
|
|
+ trans(Fun, Args);
|
|
|
+ global ->
|
|
|
+ lock_router(),
|
|
|
+ try mnesia:sync_dirty(Fun, Args)
|
|
|
+ after
|
|
|
+ unlock_router()
|
|
|
+ end;
|
|
|
+ tab ->
|
|
|
+ trans(fun() ->
|
|
|
+ emqx_trie:lock_tables(),
|
|
|
+ apply(Fun, Args)
|
|
|
+ end, [])
|
|
|
+ end.
|
|
|
+
|
|
|
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
|
|
trans(Fun, Args) ->
|
|
|
- mnesia:async_dirty(Fun, Args).
|
|
|
+ case mnesia:transaction(Fun, Args) of
|
|
|
+ {atomic, Ok} -> Ok;
|
|
|
+ {aborted, Reason} -> {error, Reason}
|
|
|
+ end.
|
|
|
|
|
|
lock_router() ->
|
|
|
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|