|
|
@@ -9,6 +9,7 @@
|
|
|
%% implementation details from this module.
|
|
|
-module(emqx_ds_replication_layer_meta).
|
|
|
|
|
|
+-feature(maybe_expr, enable).
|
|
|
-compile(inline).
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
@@ -71,6 +72,14 @@
|
|
|
n_shards/1
|
|
|
]).
|
|
|
|
|
|
+%% Migrations:
|
|
|
+-export([
|
|
|
+ migrate_node_table/0,
|
|
|
+ migrate_shard_table/0,
|
|
|
+ migrate_node_table_trans/1,
|
|
|
+ migrate_shard_table_trans/1
|
|
|
+]).
|
|
|
+
|
|
|
-export_type([
|
|
|
site/0,
|
|
|
transition/0,
|
|
|
@@ -87,13 +96,15 @@
|
|
|
|
|
|
-define(SERVER, ?MODULE).
|
|
|
|
|
|
--define(SHARD, emqx_ds_builtin_metadata_shard).
|
|
|
+-define(RLOG_SHARD, emqx_ds_builtin_metadata_shard).
|
|
|
%% DS database metadata:
|
|
|
-define(META_TAB, emqx_ds_builtin_metadata_tab).
|
|
|
%% Mapping from Site to the actual Erlang node:
|
|
|
--define(NODE_TAB, emqx_ds_builtin_node_tab).
|
|
|
+-define(NODE_TAB, emqx_ds_builtin_node_tab2).
|
|
|
+-define(NODE_TAB_LEGACY, emqx_ds_builtin_node_tab).
|
|
|
%% Shard metadata:
|
|
|
--define(SHARD_TAB, emqx_ds_builtin_shard_tab).
|
|
|
+-define(SHARD_TAB, emqx_ds_builtin_shard_tab2).
|
|
|
+-define(SHARD_TAB_LEGACY, emqx_ds_builtin_shard_tab).
|
|
|
%% Membership transitions:
|
|
|
-define(TRANSITION_TAB, emqx_ds_builtin_trans_tab).
|
|
|
|
|
|
@@ -304,7 +315,7 @@ my_shards(DB) ->
|
|
|
[Shard || #?SHARD_TAB{shard = {_, Shard}, replica_set = RS} <- Recs, lists:member(Site, RS)].
|
|
|
|
|
|
allocate_shards(DB) ->
|
|
|
- case mria:transaction(?SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of
|
|
|
+ case mria:transaction(?RLOG_SHARD, fun ?MODULE:allocate_shards_trans/1, [DB]) of
|
|
|
{atomic, Shards} ->
|
|
|
{ok, Shards};
|
|
|
{aborted, {shards_already_allocated, Shards}} ->
|
|
|
@@ -459,6 +470,7 @@ init([]) ->
|
|
|
logger:set_process_metadata(#{domain => [ds, meta]}),
|
|
|
ok = ekka:monitor(membership),
|
|
|
ensure_tables(),
|
|
|
+ run_migrations(),
|
|
|
ensure_site(),
|
|
|
S = #s{},
|
|
|
{ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}),
|
|
|
@@ -684,28 +696,28 @@ node_sites(Node) ->
|
|
|
|
|
|
ensure_tables() ->
|
|
|
ok = mria:create_table(?META_TAB, [
|
|
|
- {rlog_shard, ?SHARD},
|
|
|
+ {rlog_shard, ?RLOG_SHARD},
|
|
|
{type, ordered_set},
|
|
|
{storage, disc_copies},
|
|
|
{record_name, ?META_TAB},
|
|
|
{attributes, record_info(fields, ?META_TAB)}
|
|
|
]),
|
|
|
ok = mria:create_table(?NODE_TAB, [
|
|
|
- {rlog_shard, ?SHARD},
|
|
|
+ {rlog_shard, ?RLOG_SHARD},
|
|
|
{type, ordered_set},
|
|
|
{storage, disc_copies},
|
|
|
{record_name, ?NODE_TAB},
|
|
|
{attributes, record_info(fields, ?NODE_TAB)}
|
|
|
]),
|
|
|
ok = mria:create_table(?SHARD_TAB, [
|
|
|
- {rlog_shard, ?SHARD},
|
|
|
+ {rlog_shard, ?RLOG_SHARD},
|
|
|
{type, ordered_set},
|
|
|
{storage, disc_copies},
|
|
|
{record_name, ?SHARD_TAB},
|
|
|
{attributes, record_info(fields, ?SHARD_TAB)}
|
|
|
]),
|
|
|
ok = mria:create_table(?TRANSITION_TAB, [
|
|
|
- {rlog_shard, ?SHARD},
|
|
|
+ {rlog_shard, ?RLOG_SHARD},
|
|
|
{type, bag},
|
|
|
{storage, disc_copies},
|
|
|
{record_name, ?TRANSITION_TAB},
|
|
|
@@ -713,12 +725,18 @@ ensure_tables() ->
|
|
|
]),
|
|
|
ok = mria:wait_for_tables([?META_TAB, ?NODE_TAB, ?SHARD_TAB]).
|
|
|
|
|
|
+run_migrations() ->
|
|
|
+ run_migrations(emqx_release:version()).
|
|
|
+
|
|
|
+run_migrations(_Version = "5.8." ++ _) ->
|
|
|
+ run_migrations_e58().
|
|
|
+
|
|
|
ensure_site() ->
|
|
|
Filename = filename:join(emqx_ds_storage_layer:base_dir(), "emqx_ds_builtin_site.eterm"),
|
|
|
- case file:consult(Filename) of
|
|
|
- {ok, [Site]} ->
|
|
|
- ok;
|
|
|
- _ ->
|
|
|
+ case file_read_term(Filename) of
|
|
|
+ {ok, Entry} ->
|
|
|
+ Site = migrate_site_id(Entry);
|
|
|
+ {error, Error} when Error =:= enoent; Error =:= empty ->
|
|
|
Site = binary:encode_hex(crypto:strong_rand_bytes(8)),
|
|
|
logger:notice("Creating a new site with ID=~s", [Site]),
|
|
|
ok = filelib:ensure_dir(Filename),
|
|
|
@@ -733,6 +751,32 @@ ensure_site() ->
|
|
|
logger:error("Attempt to claim site with ID=~s failed: ~p", [Site, Reason])
|
|
|
end.
|
|
|
|
|
|
+file_read_term(Filename) ->
|
|
|
+ %% NOTE
|
|
|
+ %% This mess is needed because `file:consult/1` trips over binaries encoded as
|
|
|
+ %% latin1, which 5.4.0 code could have produced with `io:format(FD, "~p.", [Site])`.
|
|
|
+ maybe
|
|
|
+ {ok, FD} ?= file:open(Filename, [read, {encoding, latin1}]),
|
|
|
+ {ok, Term, _} ?= io:read(FD, '', _Line = 1),
|
|
|
+ ok = file:close(FD),
|
|
|
+ {ok, Term}
|
|
|
+ else
|
|
|
+ {error, Reason} ->
|
|
|
+ {error, Reason};
|
|
|
+ {error, Reason, _} ->
|
|
|
+ {error, Reason};
|
|
|
+ {eof, _} ->
|
|
|
+ {error, empty}
|
|
|
+ end.
|
|
|
+
|
|
|
+migrate_site_id(Site) ->
|
|
|
+ case re:run(Site, "^[0-9A-F]+$") of
|
|
|
+ {match, _} ->
|
|
|
+ Site;
|
|
|
+ nomatch ->
|
|
|
+ binary:encode_hex(Site)
|
|
|
+ end.
|
|
|
+
|
|
|
forget_node(Node) ->
|
|
|
Sites = node_sites(Node),
|
|
|
Result = transaction(fun lists:map/2, [fun ?MODULE:forget_site_trans/1, Sites]),
|
|
|
@@ -811,12 +855,12 @@ eval_qlc(Q) ->
|
|
|
true ->
|
|
|
qlc:eval(Q);
|
|
|
false ->
|
|
|
- {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end),
|
|
|
+ {atomic, Result} = mria:ro_transaction(?RLOG_SHARD, fun() -> qlc:eval(Q) end),
|
|
|
Result
|
|
|
end.
|
|
|
|
|
|
transaction(Fun, Args) ->
|
|
|
- case mria:transaction(?SHARD, Fun, Args) of
|
|
|
+ case mria:transaction(?RLOG_SHARD, Fun, Args) of
|
|
|
{atomic, Result} ->
|
|
|
Result;
|
|
|
{aborted, Reason} ->
|
|
|
@@ -853,6 +897,137 @@ notify_subscribers(EventSubject, Event, #s{subs = Subs}) ->
|
|
|
Subs
|
|
|
).
|
|
|
|
|
|
+%%====================================================================
|
|
|
+%% Migrations / 5.8 Release
|
|
|
+%%====================================================================
|
|
|
+
|
|
|
+run_migrations_e58() ->
|
|
|
+ _ = migrate_node_table(),
|
|
|
+ _ = migrate_shard_table(),
|
|
|
+ ok.
|
|
|
+
|
|
|
+migrate_node_table() ->
|
|
|
+ Tab = ?NODE_TAB_LEGACY,
|
|
|
+ migrate_node_table(Tab, table_info_safe(Tab)).
|
|
|
+
|
|
|
+migrate_node_table(Tab, #{attributes := [_Site, _Node, _Misc]}) ->
|
|
|
+ %% Table is present and looks migratable.
|
|
|
+ ok = mria:wait_for_tables([Tab]),
|
|
|
+ case transaction(fun ?MODULE:migrate_node_table_trans/1, [Tab]) of
|
|
|
+ {migrated, [], []} ->
|
|
|
+ ok;
|
|
|
+ {migrated, Migrated, Dups} ->
|
|
|
+ logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
|
|
|
+ Dups =/= [] andalso
|
|
|
+ logger:warning("Table '~p' duplicated entries, skipped: ~p", [Tab, Dups]),
|
|
|
+ {atomic, ok} = mria:clear_table(Tab);
|
|
|
+ {error, Reason} ->
|
|
|
+ logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
|
|
|
+ end;
|
|
|
+migrate_node_table(_Tab, undefined) ->
|
|
|
+ %% No legacy table exists.
|
|
|
+ ok.
|
|
|
+
|
|
|
+migrate_node_table_trans(Tab) ->
|
|
|
+ %% NOTE
|
|
|
+ %% This table could have been populated when running 5.4.0 release, but the
|
|
|
+ %% representation of site IDs has changed in following versions. Legacy site IDs
|
|
|
+ %% need to be passed through `migrate_site_id/1`, otherwise expectations of the
|
|
|
+ %% existing code of those IDs to be "printable" will be broken.
|
|
|
+ %% This should be no-op when running > 5.4.0 releases.
|
|
|
+ Migstamp = mk_migstamp(),
|
|
|
+ Records = mnesia:match_object(Tab, {Tab, '_', '_', '_'}, read),
|
|
|
+ {Migrate, Dups} = unique_node_recs([migrate_node_rec(R) || R <- Records]),
|
|
|
+ lists:foreach(
|
|
|
+ fun(R) -> mnesia:write(?NODE_TAB, attach_migstamp(Migstamp, R), write) end,
|
|
|
+ Migrate
|
|
|
+ ),
|
|
|
+ {migrated, Migrate, Dups}.
|
|
|
+
|
|
|
+migrate_node_rec({?NODE_TAB_LEGACY, Site, Node, Misc}) ->
|
|
|
+ #?NODE_TAB{site = migrate_site_id(Site), node = Node, misc = Misc}.
|
|
|
+
|
|
|
+unique_node_recs(Records) ->
|
|
|
+ %% NOTE
|
|
|
+ %% Unlikely but possible that a 5.4.0 node could have assigned more than 1 Site ID
|
|
|
+ %% to itself, because of occasional inability to read back Site ID with
|
|
|
+ %% `file:consult/1. In this case it's impossible to tell in 100% of cases which one
|
|
|
+ %% was the most recent, so let's just drop all of such node's records. It will
|
|
|
+ %% insert the correct record by itself anyway, once upgraded to the recent release
|
|
|
+ %% and restarted.
|
|
|
+ Dups = Records -- lists:ukeysort(#?NODE_TAB.node, Records),
|
|
|
+ DupNodes = [Node || #?NODE_TAB{node = Node} <- Dups],
|
|
|
+ lists:partition(fun(R) -> not lists:member(R#?NODE_TAB.node, DupNodes) end, Records).
|
|
|
+
|
|
|
+migrate_shard_table() ->
|
|
|
+ Tab = ?SHARD_TAB_LEGACY,
|
|
|
+ migrate_shard_table(Tab, table_info_safe(Tab)).
|
|
|
+
|
|
|
+migrate_shard_table(Tab, #{attributes := [_Shard, _ReplicaSet, _TargetSet, _Misc]}) ->
|
|
|
+ %% Table is present and looks migratable.
|
|
|
+ ok = mria:wait_for_tables([Tab]),
|
|
|
+ case transaction(fun ?MODULE:migrate_shard_table_trans/1, [Tab]) of
|
|
|
+ {migrated, []} ->
|
|
|
+ ok;
|
|
|
+ {migrated, Migrated} ->
|
|
|
+ logger:notice("Table '~p' migrated ~p entries", [Tab, length(Migrated)]),
|
|
|
+ {atomic, ok} = mria:clear_table(Tab);
|
|
|
+ {error, Reason} ->
|
|
|
+ logger:warning("Table '~p' unusable, migration skipped: ~p", [Tab, Reason])
|
|
|
+ end;
|
|
|
+migrate_shard_table(Tab, #{attributes := _Incompatible}) ->
|
|
|
+ %% Table is present and is incompatible.
|
|
|
+ ok = mria:wait_for_tables([Tab]),
|
|
|
+ case mnesia:table_info(Tab, size) of
|
|
|
+ 0 ->
|
|
|
+ ok;
|
|
|
+ Size ->
|
|
|
+ logger:warning("Table '~p' has ~p legacy entries to be abandoned", [Size, Tab]),
|
|
|
+ {atomic, ok} = mria:clear_table(Tab)
|
|
|
+ end;
|
|
|
+migrate_shard_table(_Tab, undefined) ->
|
|
|
+ %% No legacy table exists.
|
|
|
+ ok.
|
|
|
+
|
|
|
+migrate_shard_table_trans(Tab) ->
|
|
|
+ %% NOTE
|
|
|
+ %% This table could have been instantiated with a different schema when running
|
|
|
+ %% 5.4.0 release but most likely never populated, so it should be fine to abandon it.
|
|
|
+ %% This table could also have been instantiated and populated when running 5.7.0
|
|
|
+ %% release with the same schema, so we just have to migrate all the recoards verbatim.
|
|
|
+ Migstamp = mk_migstamp(),
|
|
|
+ Records = mnesia:match_object(Tab, {Tab, '_', '_', '_', '_'}, read),
|
|
|
+ Migrate = [migrate_shard_rec(R) || R <- Records],
|
|
|
+ lists:foreach(
|
|
|
+ fun(R) -> mnesia:write(?SHARD_TAB, attach_migstamp(Migstamp, R), write) end,
|
|
|
+ Migrate
|
|
|
+ ),
|
|
|
+ {migrated, Migrate}.
|
|
|
+
|
|
|
+migrate_shard_rec({?SHARD_TAB_LEGACY, Shard, ReplicaSet, TargetSet, Misc}) ->
|
|
|
+ #?SHARD_TAB{shard = Shard, replica_set = ReplicaSet, target_set = TargetSet, misc = Misc}.
|
|
|
+
|
|
|
+mk_migstamp() ->
|
|
|
+ %% NOTE: Piece of information describing when and how records were migrated.
|
|
|
+ #{
|
|
|
+ at => erlang:system_time(millisecond),
|
|
|
+ on => emqx_release:version()
|
|
|
+ }.
|
|
|
+
|
|
|
+attach_migstamp(Migstamp, Node = #?NODE_TAB{misc = Misc}) ->
|
|
|
+ Node#?NODE_TAB{misc = Misc#{migrated => Migstamp}};
|
|
|
+attach_migstamp(Migstamp, Shard = #?SHARD_TAB{misc = Misc}) ->
|
|
|
+ Shard#?SHARD_TAB{misc = Misc#{migrated => Migstamp}}.
|
|
|
+
|
|
|
+table_info_safe(Tab) ->
|
|
|
+ try mnesia:table_info(Tab, all) of
|
|
|
+ Props ->
|
|
|
+ maps:from_list(Props)
|
|
|
+ catch
|
|
|
+ exit:{aborted, {no_exists, Tab, _}} ->
|
|
|
+ undefined
|
|
|
+ end.
|
|
|
+
|
|
|
%%====================================================================
|
|
|
|
|
|
%% @doc Intersperse elements of two lists.
|