|
|
@@ -92,9 +92,11 @@
|
|
|
]).
|
|
|
|
|
|
-export_type([dest/0]).
|
|
|
+-export_type([schemavsn/0]).
|
|
|
|
|
|
-type group() :: binary().
|
|
|
-type dest() :: node() | {group(), node()}.
|
|
|
+-type schemavsn() :: v1 | v2.
|
|
|
|
|
|
%% Operation :: {add, ...} | {delete, ...}.
|
|
|
-type batch() :: #{batch_route() => _Operation :: tuple()}.
|
|
|
@@ -643,8 +645,8 @@ match_to_route(M) ->
|
|
|
|
|
|
-define(PT_SCHEMA_VSN, {?MODULE, schemavsn}).
|
|
|
|
|
|
--type schemavsn() :: v1 | v2.
|
|
|
-
|
|
|
+%% @doc Get the schema version in use.
|
|
|
+%% BPAPI RPC Target @ emqx_router_proto
|
|
|
-spec get_schema_vsn() -> schemavsn().
|
|
|
get_schema_vsn() ->
|
|
|
persistent_term:get(?PT_SCHEMA_VSN).
|
|
|
@@ -654,23 +656,23 @@ init_schema() ->
|
|
|
ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
|
|
|
ok = emqx_trie:wait_for_tables(),
|
|
|
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
|
|
|
- Schema = choose_schema_vsn(ConfSchema),
|
|
|
+ {ClusterSchema, ClusterState} = discover_cluster_schema_vsn(),
|
|
|
+ Schema = choose_schema_vsn(ConfSchema, ClusterSchema, ClusterState),
|
|
|
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
|
|
|
- case Schema of
|
|
|
- ConfSchema ->
|
|
|
+ case Schema =:= ConfSchema of
|
|
|
+ true ->
|
|
|
?SLOG(info, #{
|
|
|
msg => "routing_schema_used",
|
|
|
schema => Schema
|
|
|
});
|
|
|
- _ ->
|
|
|
+ false ->
|
|
|
?SLOG(notice, #{
|
|
|
msg => "configured_routing_schema_ignored",
|
|
|
schema_in_use => Schema,
|
|
|
configured => ConfSchema,
|
|
|
reason =>
|
|
|
"Could not use configured routing storage schema because "
|
|
|
- "there are already non-empty routing tables pertaining to "
|
|
|
- "another schema."
|
|
|
+ "cluster is already running with a different schema."
|
|
|
})
|
|
|
end.
|
|
|
|
|
|
@@ -679,34 +681,147 @@ deinit_schema() ->
|
|
|
_ = persistent_term:erase(?PT_SCHEMA_VSN),
|
|
|
ok.
|
|
|
|
|
|
--spec choose_schema_vsn(schemavsn()) -> schemavsn().
|
|
|
-choose_schema_vsn(ConfType) ->
|
|
|
- IsEmptyIndex = emqx_trie:empty(),
|
|
|
- IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
|
|
|
- case {IsEmptyIndex, IsEmptyFilters} of
|
|
|
- {true, true} ->
|
|
|
- ConfType;
|
|
|
- {false, true} ->
|
|
|
- v1;
|
|
|
- {true, false} ->
|
|
|
- v2;
|
|
|
- {false, false} ->
|
|
|
- ?SLOG(critical, #{
|
|
|
- msg => "conflicting_routing_schemas_detected_in_cluster",
|
|
|
- configured => ConfType,
|
|
|
+-spec discover_cluster_schema_vsn() ->
|
|
|
+ {schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}.
|
|
|
+discover_cluster_schema_vsn() ->
|
|
|
+ discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]).
|
|
|
+
|
|
|
+-spec discover_cluster_schema_vsn([node()]) ->
|
|
|
+ {schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}.
|
|
|
+discover_cluster_schema_vsn([]) ->
|
|
|
+ %% single node
|
|
|
+ {undefined, []};
|
|
|
+discover_cluster_schema_vsn(Nodes) ->
|
|
|
+ Responses = lists:zipwith(
|
|
|
+ fun
|
|
|
+ (Node, {ok, Schema}) ->
|
|
|
+ {Node, Schema, configured};
|
|
|
+ (Node, {error, {exception, undef, _Stacktrace}}) ->
|
|
|
+ %% No such function on the remote node, assuming it doesn't know about v2 routing.
|
|
|
+ {Node, v1, legacy};
|
|
|
+ (Node, {error, {exception, badarg, _Stacktrace}}) ->
|
|
|
+ %% Likely, persistent term is not defined yet.
|
|
|
+ {Node, unknown, starting};
|
|
|
+ (Node, Error) ->
|
|
|
+ {Node, unknown, Error}
|
|
|
+ end,
|
|
|
+ Nodes,
|
|
|
+ emqx_router_proto_v1:get_routing_schema_vsn(Nodes)
|
|
|
+ ),
|
|
|
+ case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of
|
|
|
+ [Vsn] when Vsn =:= v1; Vsn =:= v2 ->
|
|
|
+ {Vsn, Responses};
|
|
|
+ [] ->
|
|
|
+ ?SLOG(warning, #{
|
|
|
+ msg => "cluster_routing_schema_discovery_failed",
|
|
|
+ responses => Responses,
|
|
|
reason =>
|
|
|
- "There are records in the routing tables related to both v1 "
|
|
|
- "and v2 storage schemas. This probably means that some nodes "
|
|
|
- "in the cluster use v1 schema and some use v2, independently "
|
|
|
- "of each other. The routing is likely broken. Manual intervention "
|
|
|
- "and full cluster restart is required. This node will shut down."
|
|
|
+ "Could not determine configured routing storage schema in peer nodes."
|
|
|
+ }),
|
|
|
+ {undefined, Responses};
|
|
|
+ [_ | _] ->
|
|
|
+ Desc = schema_conflict_reason(config, Responses),
|
|
|
+ io:format(standard_error, "Error: ~ts~n", [Desc]),
|
|
|
+ ?SLOG(critical, #{
|
|
|
+ msg => "conflicting_routing_schemas_in_cluster",
|
|
|
+ responses => Responses,
|
|
|
+ description => Desc
|
|
|
+ }),
|
|
|
+ error(conflicting_routing_schemas_configured_in_cluster)
|
|
|
+ end.
|
|
|
+
|
|
|
+-spec choose_schema_vsn(
|
|
|
+ schemavsn(),
|
|
|
+ _ClusterSchema :: schemavsn() | undefined,
|
|
|
+ _ClusterState :: [{node(), schemavsn() | undefined, _Details}]
|
|
|
+) -> schemavsn().
|
|
|
+choose_schema_vsn(ConfSchema, ClusterSchema, State) ->
|
|
|
+ case detect_table_schema_vsn() of
|
|
|
+ [] ->
|
|
|
+ %% No records in the tables, use schema configured in the cluster if any,
|
|
|
+ %% otherwise use configured.
|
|
|
+ emqx_maybe:define(ClusterSchema, ConfSchema);
|
|
|
+ [Schema] when Schema =:= ClusterSchema ->
|
|
|
+ %% Table contents match configured schema in the cluster.
|
|
|
+ Schema;
|
|
|
+ [Schema] when ClusterSchema =:= undefined ->
|
|
|
+ %% There are existing records following some schema, we have to use it.
|
|
|
+ Schema;
|
|
|
+ _Conflicting when ClusterSchema =/= undefined ->
|
|
|
+ %% There are existing records in both v1 and v2 schema,
|
|
|
+ %% we have to use what the peer nodes agreed on.
|
|
|
+ %% because it could be THIS node which caused the cnoflict.
|
|
|
+ %%
|
|
|
+ %% The stale records will be left-over, but harmless
|
|
|
+ Desc =
|
|
|
+ "Conflicting schema version detected for routing records, but "
|
|
|
+ "all the peer nodes are running the same version, so this node "
|
|
|
+ "will use the same schema but discard the harmless stale records. "
|
|
|
+ "This warning will go away after the next full cluster (non-rolling) restart.",
|
|
|
+ ?SLOG(warning, #{
|
|
|
+ msg => "conflicting_routing_storage_detected",
|
|
|
+ resolved => ClusterSchema,
|
|
|
+ description => Desc
|
|
|
+ }),
|
|
|
+ ClusterSchema;
|
|
|
+ _Conflicting ->
|
|
|
+ Desc = schema_conflict_reason(records, State),
|
|
|
+ io:format(standard_error, "Error: ~ts~n", [Desc]),
|
|
|
+ ?SLOG(critical, #{
|
|
|
+ msg => "conflicting_routing_storage_in_cluster",
|
|
|
+ description => Desc
|
|
|
}),
|
|
|
error(conflicting_routing_schemas_detected_in_cluster)
|
|
|
end.
|
|
|
|
|
|
+schema_conflict_reason(Type, State) ->
|
|
|
+ Observe =
|
|
|
+ case Type of
|
|
|
+ config ->
|
|
|
+ "Peer nodes have route storage schema resolved into conflicting versions.\n";
|
|
|
+ records ->
|
|
|
+ "There are conflicting routing records found.\n"
|
|
|
+ end,
|
|
|
+ Cause =
|
|
|
+ "\nThis was caused by a race-condition when the cluster was rolling upgraded "
|
|
|
+ "from an older version to 5.4.0, 5.4.1, 5.5.0 or 5.5.1."
|
|
|
+ "\nThis node cannot boot before the conflicts are resolved.\n",
|
|
|
+ Observe ++ Cause ++ mk_conflict_resolution_action(State).
|
|
|
+
|
|
|
+detect_table_schema_vsn() ->
|
|
|
+ lists:flatten([
|
|
|
+ [v1 || _NonEmptyTrieIndex = not emqx_trie:empty()],
|
|
|
+ [v2 || _NonEmptyFilterTab = not is_empty(?ROUTE_TAB_FILTERS)]
|
|
|
+ ]).
|
|
|
+
|
|
|
is_empty(Tab) ->
|
|
|
ets:first(Tab) =:= '$end_of_table'.
|
|
|
|
|
|
+mk_conflict_resolution_action(State) ->
|
|
|
+ NodesV1 = [Node || {Node, v1, _} <- State],
|
|
|
+ NodesUnknown = [Node || {Node, unknown, _} <- State],
|
|
|
+ Format =
|
|
|
+ "There are two ways to resolve the conflict:"
|
|
|
+ "\n"
|
|
|
+ "\nA: Full cluster restart: stop ALL running nodes one by one "
|
|
|
+ "and restart them in the reversed order."
|
|
|
+ "\n"
|
|
|
+ "\nB: Force v1 nodes to clean up their routes."
|
|
|
+ "\n Following EMQX nodes are running with v1 schema: ~0p."
|
|
|
+ "\n 1. Stop listeners with command \"emqx eval 'emqx_listener:stop()'\" in all v1 nodes"
|
|
|
+ "\n 2. Wait until they are safe to restart."
|
|
|
+ "\n This could take some time, depending on the number of clients and their subscriptions."
|
|
|
+ "\n Below conditions should be true for each of the nodes in order to proceed:"
|
|
|
+ "\n a) Command 'ets:info(emqx_subscriber, size)' prints `0`."
|
|
|
+ "\n b) Command 'emqx ctl topics list' prints No topics.`"
|
|
|
+ "\n 3. Upgrade the nodes to 5.6.0 or newer.",
|
|
|
+ FormatUnkown =
|
|
|
+ "Additionally, the following nodes were unreachable during startup: ~0p."
|
|
|
+ "It is strongly advised to include them in the manual resolution procedure as well.",
|
|
|
+ Message = io_lib:format(Format, [NodesV1]),
|
|
|
+ MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []],
|
|
|
+ unicode:characters_to_list([Message, "\n", MessageUnknown]).
|
|
|
+
|
|
|
%%--------------------------------------------------------------------
|
|
|
%% gen_server callbacks
|
|
|
%%--------------------------------------------------------------------
|