In the logic layer we don't speak about replication. This is because we could use an external DB with its own replication logic.
On the other hand, we introduce notion of shard right here at the logic layer. This is because shared subscription logic needs to be aware of it to some extend, as it has to split work between subscribers somehow.
Data is written every time a message matching certain pattern is published. This pattern is not part of the logic layer spec.
Write throughput: very high
Data size: very high
Write pattern: append only
Read pattern: pseudoserial
Number of records: O(total write throughput * retention time)
Data there is updated when:
Write throughput: low
Data is read when a client connects and replay agents are started
Read throughput: low
Data format:
#session{clientId = "foobar", iterators = [ItKey1, ItKey2, ItKey3, ...]}
Number of records: O(N clients)
Size of record: O(N subscriptions per clients)
Data is written every time a client acks a message.
Data is read when a client reconnects and we restart replay agents.
#iterator{key = IterKey, data = Blob}
Number of records: O(N clients * N subscriptions per client)
Size of record: O(1)
Write throughput: high, lots of small updates
Write pattern: mostly key overwrite
Read throughput: low
Read pattern: random
In push model we have replay agents iterating over the dataset in the shards.
In pull model the client processes work with iterators directly and fetch data from the remote message storage instances via remote procedure calls.
This includes HOCON schema and an interface module that is used by the rest of the code (emqx_ds_conf.erl).
At the early stage we need at least to implement a feature flag that can be used by developers.
We should have safety measures to prevent a client from breaking down the broker by connecting with clean session = false and subscribing to #.
We don't want to persist ALL messages. Only the messages that can be replayed by some client.
Persistent sessions should signal to the emqx_broker what topic filters should be persisted. Scenario:
a/b/# must be persisted.Replay feature (separate, optional, under BSL license): in the configuration file we have list of topic filters that specify what topics can be replayed. (Lower prio)
#At the first stage we can just use emqttb:
https://github.com/emqx/emqttb/blob/master/src/scenarios/emqttb_scenario_persistent_session.erl
Consistency verification at the early stages can just use this test suite:
apps/emqx/test/emqx_persistent_session_SUITE.erl
Problem with the current bitmask-based schema:
Good: a/b/c/#
Bad: +/a/b/c/d