Kaynağa Gözat

feat(routing): add schema conflict resolution procedure

In the log message printed when a schema conflict in cluster routing
is detected.
Andrew Mayorov 2 yıl önce
ebeveyn
işleme
849fe0c2c8
1 değiştirilmiş dosya ile 36 ekleme ve 7 silme
  1. 36 7
      apps/emqx/src/emqx_router.erl

+ 36 - 7
apps/emqx/src/emqx_router.erl

@@ -656,8 +656,8 @@ init_schema() ->
     ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
     ok = emqx_trie:wait_for_tables(),
     ConfSchema = emqx_config:get([broker, routing, storage_schema]),
-    ClusterSchema = discover_cluster_schema_vsn(),
-    Schema = choose_schema_vsn(ConfSchema, ClusterSchema),
+    ClusterState = discover_cluster_schema_vsn(),
+    Schema = choose_schema_vsn(ConfSchema, ClusterState),
     ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
     case Schema of
         ConfSchema ->
@@ -681,7 +681,8 @@ deinit_schema() ->
     _ = persistent_term:erase(?PT_SCHEMA_VSN),
     ok.
 
--spec discover_cluster_schema_vsn() -> schemavsn() | undefined.
+-spec discover_cluster_schema_vsn() ->
+    {schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}.
 discover_cluster_schema_vsn() ->
     discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]).
 
@@ -723,13 +724,17 @@ discover_cluster_schema_vsn(Nodes) ->
                     "There are nodes in the cluster with different configured routing "
                     "storage schemas. This probably means that some nodes use v1 schema "
                     "and some use v2, independently of each other. The routing is likely "
-                    "broken. Manual intervention required."
+                    "broken. Manual intervention required.",
+                action => mk_conflict_resolution_action(Responses)
             }),
             error(conflicting_routing_schemas_configured_in_cluster)
     end.
 
--spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn().
-choose_schema_vsn(ConfSchema, ClusterSchema) ->
+-spec choose_schema_vsn(
+    schemavsn(),
+    _ClusterState :: {schemavsn() | undefined, [{node(), schemavsn() | undefined, _Details}]}
+) -> schemavsn().
+choose_schema_vsn(ConfSchema, {ClusterSchema, State}) ->
     case detect_table_schema_vsn() of
         [ClusterSchema] ->
             %% Table contents match configured schema in the cluster.
@@ -753,7 +758,8 @@ choose_schema_vsn(ConfSchema, ClusterSchema) ->
                     "by the cluster. This probably means that some nodes in the cluster "
                     "use v1 schema and some use v2, independently of each other. The "
                     "routing is likely broken. Manual intervention and full cluster "
-                    "restart is required. This node will shut down."
+                    "restart is required. This node will shut down.",
+                action => mk_conflict_resolution_action(State)
             }),
             error(conflicting_routing_schemas_detected_in_cluster)
     end.
@@ -767,6 +773,29 @@ detect_table_schema_vsn() ->
 is_empty(Tab) ->
     ets:first(Tab) =:= '$end_of_table'.
 
+mk_conflict_resolution_action(State) ->
+    NodesV1 = [Node || {Node, v1, _} <- State],
+    NodesUnknown = [Node || {Node, unknown, _} <- State],
+    Format =
+        "Following EMQX nodes are running with conflicting schema:"
+        "\n ~p"
+        "Please take the following steps to resolve the conflict:"
+        "\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`"
+        "\n 2. Wait until they are safe to restart."
+        "\n This could take some time, depending on the number of clients and their subscriptions."
+        "\n Those conditions should be true for each of the nodes in order to proceed:"
+        "\n   * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`."
+        "\n   * `$ emqx ctl topics list` prints `No topics.`"
+        "\n 3. Upgrade the nodes to the latest version."
+        "\n 4. Restart the nodes.",
+    FormatUnkown =
+        "Additionally, following nodes were unreachable during startup:"
+        "\n ~p"
+        "It's strongly advised to include them in the manual resolution procedure as well.",
+    Message = io_lib:format(Format, [NodesV1]),
+    MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []],
+    unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown).
+
 %%--------------------------------------------------------------------
 %% gen_server callbacks
 %%--------------------------------------------------------------------