Lucene has a unique write-once segmented architecture: recently indexed documents are written to a new self-contained segment, in append-only, write-once fashion: once written, those segment files will never again change. This happens either when too much RAM is being used to hold recently indexed documents, or when you ask Lucene to refresh your searcher so you can search all recently indexed documents.
Over time, smaller segments are merged away into bigger segments, and the index has a logarithmic "staircase" structure of active segment files at any time. This is an unusual design, when compared with databases which continuously update their files in-place, and it bubbles up to all sorts of nice high-level features in Lucene. For example:
- Efficient
ACID
transactions.
- Point-in-time view of the index for searching that will never
change, even under concurrent indexing, enabling stable user interactions like
pagination and drill-down.
- Multiple point-in-time snapshots (commit points) can be indefinitely preserved in the index, even under concurrent indexing, useful for taking hot backups of your Lucene index.
This write-once design also empowers Lucene to use optimized data-structures and apply powerful compression techniques when writing index files, because Lucene knows the values will not later change for this segment. For example, you never have to tell Lucene that your doc values field needs only 1 byte of storage, nor that the values are sparse, etc.: Lucene figures that out by itself by looking at all values it is about to write to each new segment.
Finally, and the topic of this post: this design enables Lucene to efficiently replicate a search index from one server ("primary") to another ("replica"): in order to sync recent index changes from primary to replica you only need to look at the file names in the index directory, and not their contents. Any new files must be copied, and any files previously copied do not need to be copied again because they are never changed!
Taking advantage of this, we long ago added a replicator module to Lucene that used exactly this approach, and it works well. However, to use those APIs to replicate recent index changes, each time you would like to sync, you must first commit your changes on the primary index. This is unfortunately a costly operation, invoking
fsync
on
multiple recently written files, and greatly increases the sync
latency when compared to a local index opening a new NRT searcher.
Document or segment replication?
While this might seem like a natural approach to keeping replicas in sync, there are downsides:
- More costly CPU/IO resource usage across the cluster:
all nodes must do the same redundant indexing and merging work,
instead of just one primary node. This is especially painful when
large merges are running and interfere with concurrent searching,
and is more costly on clusters that need many replicas to support
high query rates. This alone would give Elasticsearch and Solr a
big increase in cluster wide indexing and searching throughput.
- Risk of inconsistency: ensuring that precisely the same
set of documents is indexed in primary and all
replicas is
tricky and contributed to the problems Elasticsearch has had in
the past losing documents when the network is mis-behaving. For example, if one of the replicas throws an
exception while indexing a document, or if a network hiccup
happens, that replica is now missing a document that the other
replicas and primary contain.
- Costly node recovery after down time: when a replica
goes down for a while and then comes back up, it must replay
(reindex) any newly indexed documents that arrived while it was
down. This can easily be a very large number, requiring a large
transaction log and taking a long time to catch up, or it must
fallback to a costly full index copy. In contrast, segment based
replication only needs to copy the new index files.
- High code complexity: the code to handle the numerous
possible cases where replicas can become out of sync quickly
becomes complex, and handling a primary switch (because the old
primary crashed) especially so.
- No "point in time" consistency: the primary and replicas refresh on their own schedules, so they are all typically searching slightly different and incomparable views of the index.
How does it work?
IndexWriter
such
as opening new readers, indexing, deleting, segment merging and
committing. Fortunately we were able to build on pre-existing Lucene
capabilities like NRT readers, to write new segments and identify all
their files, and snapshotting, to ensure segment files are not deleted
until replicas have finished copying them.
The two main APIs are
PrimaryNode
, which holds all state for
the primary node including a local IndexWriter
instance,
and ReplicaNode
for the replicas. The replicas act like
an index writer, since they also create and delete index files, and so
they acquire Lucene's index write lock when they start, to detect
inadvertent misuse. When you instantiate each replica, you provide a
pointer to where its corresponding primary is running, for example a
host or IP address and port.
Both the primary and replica nodes expose a
SearcherManager
so you can acquire and release the
latest searcher at any time. Searching on the primary node also
works, and would match the behavior of all Elasticsearch and Solr
nodes today (since they always do both indexing and searching), but
you might also choose to dedicate the primary node to indexing only.
You use
IndexWriter
from the primary node to make index
changes as usual, and then when you would like to search recently
indexed documents, you ask the primary node to refresh. Under the
hood, Lucene will open a new NRT reader from its
local IndexWriter
, gather the index files it references,
and notify all connected replicas. The replicas then compute which
index files they are missing and then copy them over from the primary.
Document deletions, which normally carry in memory as a bitset directly from
IndexWriter
to an
NRT IndexReader
, are instead written through to the
filesystem and copied over as well. All files are first copied to
temporary files, and then renamed (atomically) in the end if all
copies were successful. Lucene's existing end-to-end checksums are
used to validate no bits were flipped in transit by a flaky network
link, or bad RAM or CPU. Finally, the in-memory segments file
(a SegmentInfos
instance) is serialized on the wire and
sent to the replicas, which then deserialize it and open an NRT
searcher via the local SearcherManager
. The resulting
searcher on the replica is guaranteed to search the exact same
point-in-time view as the primary.
This all happens concurrently with ongoing searches on the replica, and optionally primary, nodes. Those searches see the old searcher until the replication finishes and the new searcher is opened. You can also use Lucene's existing
SearcherLifetimeManager
,
which tracks each point-in-time searcher using its long version, if
you need to keep older searchers around for a while as well.
The replica and primary nodes both expose independent commit APIs; you can choose to call these based on your durability requirements, and even stagger the commits across nodes to reduce cluster wide search capacity impact.
No transaction log
IndexWriter
and NRT reader on a single JVM.
This means it is your responsibility to be prepared to replay
documents for reindexing since the last commit point, if the whole
cluster crashes and starts up again, or if the primary node crashes
before replicas were able to copy the new point-in-time refresh.
Note that at the filesystem level, there is no difference between the Lucene index for a primary versus replica node, which makes it simple to shut down the old primary and promote one of the replicas to be a new primary.
Merging
Once the primary has finished a merge, and before it installs the merged segment in its index, it uses Lucene's merged segment warming API to give all replicas a chance to pre-copy the merged segment. This means that a merge should never block a refresh, so Lucene will keep fast refreshes even as large merged segments are still copying. Once all running replicas have pre-copied the merge, then the primary installs the merged segment, and after the next refresh, so do all the replicas.
We have discussed having replicas perform their own merging, but I suspect that will be a poor tradeoff. Local area networks (e.g. 10 gigabit ethernet) are quickly becoming plenty fast and cheap, and asking a replica to also do merging would necessarily impact search performance. It would also be quite complex trying to ensure replicas perform precisely the same merges as the primary at the same time, and would otherwise break the same point-in-time view across replicas.
Abstractions
Lucene also does not provide any distributed leader election algorithms to pick a new primary when the current primary has crashed, nor heuristics to detect a downed primary or replica. But once you pick the new primary, Lucene will take care of having all replicas cutover to it, removing stale partially copied files from the old primary, etc.
Finally, Lucene does not provide any load-balancing to direct queries to the least loaded replica, nor any cluster state to keep track of which node is the primary and which are the replicas, though Apache Zookeeper is useful for such shared distributed state. These parts are all up to you!
Expected failure modes
An especially important case is when the primary node goes down (crashes, loses power or is intentionally killed). In this case, one of the replicas, ideally the replica that was furthest along in copying files from the old primary as decided by a distributed election, is promoted to become the new primary node. All other replicas then switch to the new primary, and in the process must delete any files copied or partially copied from the old primary but not referenced by the new primary. Any documents indexed into the primary but not copied to that replica will need to be indexed again, and this is the caller's responsibility (no transaction log).
If the whole cluster crashes or loses power, on restart you need to determine whichever index (primary or replica) has the "most recent" commit point, and start the primary node on that host and replica nodes on the other hosts. Those replica nodes may need to delete some index files in order to switch to the new primary, and Lucene takes care of that. Finally, you will have to replay any documents that arrived after the last successful commit.
Other fun cases include: a replica crashing while it was still pre-copying a merge; a replica that is falling behind because it is still copying files from the previous refresh when a new refresh happens; a replica going down and coming back up later after the primary node has also changed.
Downsides
- Very new code: this feature is quite new and also quite
complex, and not widely used yet. Likely there exciting bugs!
Patches welcome!
- Slightly slower refresh time: after refreshing on the
primary, including writing document deletions to disk as well, we
must then copy all new files to the replica and open a new
searcher on the replica, adding a bit more time before documents
are visible for searching on the replica when compared to a straight NRT reader from an IndexWriter. If this is really a problem,
you can use the primary node for searching and have refresh
latency very close to what a straight NRT reader provides.
- Index problems might be replicated: if something goes wrong, and the primary somehow writes a broken index file, then that broken file will be replicated to all replicas too. But this is uncommon these days, especially with Lucene's end to end checksums.
Concluding
[I work at Amazon and the postings on this site are my own and don't necessarily represent Amazon's position]
Cool! Having just re-read the post you linked to about losing updates in ES, I'm wondering about how one would ensure that writes to a cluster are durable and consistent. In Lucene's NRT replication, is the primary aware of the replicas' states? EG if I want a durable cluster that doesn't lose any writes, the server should only acknowledges writes to its client once a quorum of replicas has received them, and I'm wondering if this system will provide any support for that?
ReplyDeleteHi Michael,
DeleteTo offer durable writes with NRT replication you really need to use your own transaction log, or e.g. consume from a Kinesis or Kafka stream which lets you push durability back into that channel. Another option is to indeed delay confirming the writes until a refresh + commit has happened, but that delay could be long, e.g. on the order of 30s or 60s or whatever your refresh + commit interval is. Plus, if you want to be durable to more than one node crashing you'll need to commit in 2 (or more) nodes.
Nice write-up, Mike. (and Hi, hope you're doing well!)
ReplyDeleteI worry that the search-efficiency objectives might conflict with replication-efficiency ones? One objective is about aiming for consolidation into a few big files and the other is minimising data transfer by replicating smaller files so they are fundamentally opposed?
Has there been any analysis of the overheads involved in repeated-replication of re-written segments?
Hi Mark,
DeleteThanks! That's a great point, and it applies to Lucene indexing and searching on a single box as well, i.e. the tension of working hard during indexing (merging) to make fewer segments so searching is faster.
I haven't done any specific analysis of this wrt NRT replication, but I am curious! However, if you have many replicas, then the 1X indexing cost sees N*X benefits (N = number of replicas) which make it more compelling than the single indexing + searching machine.
Hi Michael,
ReplyDeleteI'm facing an issue when using SearchManager in Lucene and I'd like to know where I can get some help. Can you point me a good forum? Thanks.
Hi Eduardo,
DeleteBest bet is to send an email to Lucene's user list (java-user@lucene.apache.org).
Mike