|
|
@@ -95,6 +95,18 @@
|
|
|
unused = [] :: nil()
|
|
|
}).
|
|
|
|
|
|
+-define(node_patterns(Node), [Node, {'_', Node}]).
|
|
|
+
|
|
|
+-define(UNSUPPORTED, unsupported).
|
|
|
+
|
|
|
+-define(with_fallback(Expr, FallbackExpr),
|
|
|
+ try
|
|
|
+ Expr
|
|
|
+ catch
|
|
|
+ throw:?UNSUPPORTED -> FallbackExpr
|
|
|
+ end
|
|
|
+).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% Mnesia bootstrap
|
|
|
%%--------------------------------------------------------------------
|
|
|
@@ -293,8 +305,6 @@ pick(Topic) ->
|
|
|
%% Schema v1
|
|
|
%% --------------------------------------------------------------------
|
|
|
|
|
|
--dialyzer({nowarn_function, [cleanup_routes_v1/1]}).
|
|
|
-
|
|
|
mria_insert_route_v1(Topic, Dest) ->
|
|
|
Route = #route{topic = Topic, dest = Dest},
|
|
|
case emqx_topic:wildcard(Topic) of
|
|
|
@@ -356,10 +366,18 @@ has_route_tab_entry(Topic, Dest) ->
|
|
|
[] =/= ets:match(?ROUTE_TAB, #route{topic = Topic, dest = Dest}).
|
|
|
|
|
|
cleanup_routes_v1(Node) ->
|
|
|
- Patterns = [
|
|
|
- #route{_ = '_', dest = Node},
|
|
|
- #route{_ = '_', dest = {'_', Node}}
|
|
|
- ],
|
|
|
+ ?with_fallback(
|
|
|
+ lists:foreach(
|
|
|
+ fun(Pattern) ->
|
|
|
+ throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern)))
|
|
|
+ end,
|
|
|
+ ?node_patterns(Node)
|
|
|
+ ),
|
|
|
+ cleanup_routes_v1_fallback(Node)
|
|
|
+ ).
|
|
|
+
|
|
|
+cleanup_routes_v1_fallback(Node) ->
|
|
|
+ Patterns = [make_route_rec_pat(P) || P <- ?node_patterns(Node)],
|
|
|
mria:transaction(?ROUTE_SHARD, fun() ->
|
|
|
[
|
|
|
mnesia:delete_object(?ROUTE_TAB, Route, write)
|
|
|
@@ -435,8 +453,25 @@ has_route_v2(Topic, Dest) ->
|
|
|
end.
|
|
|
|
|
|
cleanup_routes_v2(Node) ->
|
|
|
- % NOTE
|
|
|
- % No point in transaction here because all the operations on filters table are dirty.
|
|
|
+ ?with_fallback(
|
|
|
+ lists:foreach(
|
|
|
+ fun(Pattern) ->
|
|
|
+ _ = throw_unsupported(
|
|
|
+ mria:match_delete(
|
|
|
+ ?ROUTE_TAB_FILTERS,
|
|
|
+ #routeidx{entry = emqx_trie_search:make_pat('_', Pattern)}
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ throw_unsupported(mria:match_delete(?ROUTE_TAB, make_route_rec_pat(Pattern)))
|
|
|
+ end,
|
|
|
+ ?node_patterns(Node)
|
|
|
+ ),
|
|
|
+ cleanup_routes_v2_fallback(Node)
|
|
|
+ ).
|
|
|
+
|
|
|
+cleanup_routes_v2_fallback(Node) ->
|
|
|
+ %% NOTE
|
|
|
+ %% No point in transaction here because all the operations on filters table are dirty.
|
|
|
ok = ets:foldl(
|
|
|
fun(#routeidx{entry = K}, ok) ->
|
|
|
case get_dest_node(emqx_topic_index:get_id(K)) of
|
|
|
@@ -467,6 +502,19 @@ get_dest_node({_, Node}) ->
|
|
|
get_dest_node(Node) ->
|
|
|
Node.
|
|
|
|
|
|
+throw_unsupported({error, unsupported_otp_version}) ->
|
|
|
+ throw(?UNSUPPORTED);
|
|
|
+throw_unsupported(Other) ->
|
|
|
+ Other.
|
|
|
+
|
|
|
+%% Make Dialyzer happy
|
|
|
+make_route_rec_pat(DestPattern) ->
|
|
|
+ erlang:make_tuple(
|
|
|
+ record_info(size, route),
|
|
|
+ '_',
|
|
|
+ [{1, route}, {#route.dest, DestPattern}]
|
|
|
+ ).
|
|
|
+
|
|
|
select_v2(Spec, Limit, undefined) ->
|
|
|
Stream = mk_route_stream(Spec),
|
|
|
select_next(Limit, Stream);
|