Tuesday, May 14, 2013

The Replicator

No, as much as I want to, I don't (yet) have a true Replicator at my disposal to blog about. Nor does it look like I will have one in the near future, even though scientists are making great progress towards it. I have recently made my contribution to the global replication effort though, by adding an index replication module to Lucene. It does not convert energy to mass, nor mass to mass, so I'm not sure if scientifically it qualifies as a Replicator at all, but if you are into search indexes replication with Lucene, read on!

In computer science, replication is defined as "sharing information so as to ensure consistency between redundant resources ... to improve reliability, fault-tolerance, or accessibility" (Wikipedia). When you design a search engine, especially at large scales, replication is one approach to achieve these. Index replicas can replace primary nodes that become unavailable (e.g due to planned maintenance or severe hardware failures), as well as to support higher query loads by load-balancing search requests across them. Even if you are not building a large-scale search engine, you can use replication to take hot backups of your primary index, while searches are running on it.

Lucene's replication framework implements a primary/backup approach, where the primary process performs all indexing operations (updating the index, merging segments etc.), and the backup/replica processes copy over the changes in an asynchronous manner. The framework consists of few key components that control the replication actions:
  • Replicator mediates between the clients and server. The primary process publishes Revisions while the replica processes update to the most recent revision following their own. When a new Revision is published, the previous one is released (unless it is currently being replicated), so that the resources it consumes can be reclaimed (e.g. remove unneeded files).

  • Revision describes a list of files and their metadata. The Revision is responsible to ensure that the files are available as long as clients copy its files. For example, IndexRevision takes a snapshot on the index using SnapshotDeletionPolicy, to guarantee the files are not deleted until the snapshot is released.

  • ReplicationClient performs the replication operation on the replica side. It first copies the needed files from the primary server (e.g. the files that the replica is missing) and then invokes the ReplicationHandler to act on the copied files. If any errors occur during the copy process, it restarts the replication session. That way, when the handler is invoked, it is guaranteed that all files were copied safely to the local storage.

  • ReplicationHandler acts on the copied files. IndexReplicationHandler copies the files over to the index directory and then syncs them to ensure the files are on stable storage. If any errors occur, it aborts the replication session and cleans up the index directory. Only after successfully copying and syncing the files, it notifies a callback that the index has been updated, so that e.g. the application can refresh its SearcherManager or perform whatever tasks it needs on the updated index.
The replication framework supports replicating any type of files. It offers built-in support for replicating a single index (as described above), as well as an index and taxonomy pair (for faceted search) via IndexAndTaxonomyRevision and IndexAndTaxonomyReplicationHandler. To replicate other types of files, you need to implement Revision and ReplicationHandler. But be advised, implementing a handler properly is ... tricky!

Following example code shows how to use the replicator on the server side:
// publish a new revision on the server
IndexWriter indexWriter; // the writer used for indexing
Replicator replicator = new LocalReplicator();
replicator.publish(new IndexRevision(indexWriter));
Using the replicator on the client side is a bit more involved but hey, that's where the important stuff happens!
// client can replicate either via LocalReplicator (e.g. for backups)
// or HttpReplicator (e.g. when the server is located on a different
// node).
Replicator replicator;

// refresh SearcherManager after the index is updated
Callable<Boolean> callback = new Callable<Boolean>() {
  public void call() throws Exception {
    // index was updated, refresh manager

// initialize a matching handler for the published revisions
ReplicationHandler handler = new IndexReplicationHandler(indexDir, callback);

// where should the client store the files before invoking the handler
SourceDirectoryFactory factory = new PerSessionDirectoryFactory(workDir);

ReplicationClient client = new ReplicationClient(replicator, handler, factory);

client.updateNow(); // invoke client manually
// -- OR --
client.startUpdateThread(30000); // check for updates every 30 seconds

So What's Next?

The replication framework currently restarts a replication session upon failures. It would be great if it supported resuming a session by e.g. copying only the files that weren't already successfully copied. This can be done by having both server and client e.g. compute a checksum on the files, so the client knows which ones were successfully copied and which weren't. Patches welcome!

Another improvement would be to support resume at the file level, i.e. don't copy parts of the file that were already copied safely. This can be implemented by modifying ReplicationClient to discard the first N bytes of the file it already copied, but would be better if it's supported by the server as well, so that it only sends the required parts. Here too, patches are welcome!

Yet another improvement is to support peer-to-peer replication. Today, the primary server is the bottleneck of the replication process since all clients access it for pulling files. This can be somewhat mitigated by e.g. having a tree network topology, where the primary node is accessed by only a few nodes, which are then accessed by other nodes and so forth. Such topology is however harder to maintain as well as very sensitive to the root node going out of action. In a peer-to-peer replication however, there is no single node from which all clients replicate, but rather each server can broadcast its current revision as well as choose any server that has a newer revision available to replicate from. An example of such implementation over Apache Solr is described here. Hmmm ... did I say patches are welcome?

Lucene has a new replication module. It's only 1 days-old and can already do so much. You are welcome to use it and help us teach it new things!


  1. There is a problem with "tree network topology". Time to index latency suffers (the time from indexing document on the master replica to the moment when document is present on the leaf node of the topology).

    One of the ideas that comes to my mind is stream new index revisions to several replicas at a time. Using UDP-multicast we can send packet to several machines, so no bottleneck is present in the topology. Of course there are some downsides: all replicas should replicate index at the same time, some packet retranslation logic should be implemented.

  2. The Replicator supports multiple clients pulling index revisions simultaneously. You're right that in a tree network topology it will take time until the leaf nodes receive the replicas, but that's the problem with that topology in general.

    The Replicator works in a pull mode because that way it doesn't need to maintain any state w.r.t active clients and ones that e.g. received an update notification, yet crashed while replicating files etc. Clients are free to come and go as they wish. It works in many ways like source code control systems, where your client does not get notified when someone else just committed changes to the code.

    Having said that, I think it will be good if the replicator worked in a semi-pull mode, where clients would wait for a notification that a new revision exists and then request it (rather than the master replica pushing the new revision to them). That way the clients don't need to continuously poll the master for updates. Also, when clients become alive, they should check with the master for any updates. This could be a nice addition to the module!

  3. I see no problem with polling master for changes. We have homegrown replication solution for lucene based on rsync which is working the same way — replicas just pulling index from master. The problem we faced though is network bandwidth. 20 replicas pulling partially merged index of size about 1.5Gb are create quite heavy demand on a network. This is why multicast index replication is looking promising to me, it will save a lot of bandwidth.

  4. I think that UDP, while interesting, will add a lot of complexity to the Replicator code. As UDP is unreliable, the replication code will need to ensure it received all packets of a file, take care of ordering etc. Also, what happens if a client didn't receive few bytes of a file, and the replication server already deleted it (since e.g. a new revision was made available)?

    I think Replicator should be used for recovering old/corrupt/stale nodes, as it's not near-real-time friendly. If your application needs to have any real-time aspects to it (or even short delays between when a document is indexed to the time it's exposed on all search nodes), it should not use replication, but methods such as implemented by Solr.

    If your application cares less about real-time, yet still needs to have many replicas (for e.g. supporting high query loads), then I agree that replicating files from a single master is not very good. A BroadcastingReplicator might be better (you're welcome to post a patch on Lucene JIRA if you have something even partially working). There's also an interesting approach I read about, doing replication in a BitTorrent like approach: http://codeascraft.com/2012/01/23/solr-bittorrent-index-replication/. Here, replication is still done over TCP, but clients replicate from each other, rather than bombarding a single master. This also could be a handy improvement to Lucene's Replicator, and I believe can be implemented over the current APIs.

  5. I have tried to test the implementation of the server as HTTP server, but I the HttpPublisher doesn't support the publish client. Should I implement my own method or there is something that I am missing from the documentation?

    1. Indeed HttpReplicator doesn't support publish, because it doesn't make much sense to publish revisions to a remote server. The revision does not hold the actual files, only their metadata. If you publish revisions to a remote server, the server has no way to guarantee the revisions will not be deleted while they are being replicated. Usually the server that publishes revisions is the one that holds the files, and it's done with LocalReplicator.

      If you want to publish revisions to remote servers, e.g. if you intend to support crawling by a large number of nodes, and want to load-balance the replicating nodes between multiple replication servers, I think it would be better to have a 2-tier replication servers: first tier replicates from a single master, second tier replicates from servers in the first tier.

    2. Hi Shai. Thank you very much for the reply. Probably I am missing the model in the API. I would assume (at least if I would model the API, and don't want to be a critic on your model but just a misunderstanding from my side) that the server (if it is a remote server) would provide connection to the clients. In that case the clients would get the opportunity to copy all the files (through a HttpReplicator), and backup the index in some path. Are my thoughts correct?

      What I don't get from the API is how the Server can provide the HttpConnection to the client that want to copy the index. Thank you again!

    3. The server can use ReplicationService to embed in a servlet which responds to HTTP requests sent by HttpReplicator. The server also uses LocalReplicator to manage the revisions. The indexing code on the server will call localReplicator.publish() and the servlet (through ReplicationService) will call localReplicator.checkForUpdate, obtain etc.

      The clients can use ReplicationClient, with an HttpReplicator, to replicate index changes from the server. The HttpReplicator is given the host:port of the server which manages the index revisions. The code examples above show how it can be done.

    4. Thank you very much, very helpful.

  6. This comment has been removed by the author.

  7. This comment has been removed by the author.

  8. This comment has been removed by the author.

  9. Can someone explain what indexDir is and what workDir is? which one is source and which one is dest?
    I have lucene index shared over NFS and want the localreplicator to get it and update to latest one. I got the replication client's log
    message: component:ReplicationThread, message:doUpdate(): handlerVersion=d session=null
    the reason is session is null.

    The way we are using replicator is, we have a process which opens writer updates index and closes writer. The other process running on different machine starts sync operation.

    When using LocalReplicator does producer/consumer need to be in the same process?