Techno Time: Index Architecture

Real-time Master/Slave indexation

Here is presented the solution implemented to manage the token indexes needed by the  search engines of our application to scale with the growing demand originating from the growth of our user bases as well as the increasing number of application servers (the servers on which the application is hosted). First, a summary of the original solution will be presented, followed by the emergence of new needs. Then, an overview of the solution implemented will be presented before exploring its details and particularities.

The first solution

In order to allow a quick and efficient search of patients in our database, our users have access to a search engine integrated in the application which relies on the Apache Lucene solution. This library provides token indexing and text research functions based on the text content of an application.

 1. Update of the search index (RMI)

1. Update of the search index (RMI)

In this architecture type, every application server (HR) is in charge of updating its own index folders. They are notified to do so by the other servers when the modification is alien to their own environment. This is an elegant small scale solution whose main purpose is real time updates of searchable elements.

Emergence of new needs

Our constantly evolving application shaped two new needs linked in regards to the search solution, namely one to address the increasing quantity of available data in the application and a second on to support the ever growing number of users.

First, the increasing quantity of data available in the application makes appealing the capacity to quickly search for information in the patient health record. This could technically be done using keyword searches, however with the RMI solution, the large volume of information being process on each application server would result in a resource heavy indexing process for each for them. This would result in a significant increase in the application’s servers’ response time, which we simply can’t allow.

Secondly, in order to stay quick and efficient, the increasing number of users forces us to increase the quantity of available physical resources and the number of application servers. However, the indexing architecture implemented would see a continuous increase in demand caused by the frequent need to update, thus resulting in an increase of the application’s response time. The probability of corruption occurring during data modification caused by the access concurrency of two different servers would drastically increase, which is unacceptable for us.

We sought the best way of answering these 2 needs while continuing to use Apache Lucene indexing and maintaining real time updates.

Apache Lucene Master/Slave

Apache Lucene offers other types of architecture then the one we originally used, and some of them allows smoother increase of the number of resources while maintaining good server performances. One of these is based on concept of the master and its slaves (Master/Slave). A master server is dedicated to the indexing operations when modifications are made to the database. Meanwhile, the other servers referred to as slaves, send a request to the master server to know if modifications have been made to the index. In case of a positive answer, the master transmits the modifications to the slaves, whose also serve as index reader.

 2. Usual  Master/Slave  architecture  (Source : . Bauer et G. King, Hibernate in Action, Manning Publication Co, 2004, 400p) 

2. Usual Master/Slave architecture  (Source : . Bauer et G. King, Hibernate in Action, Manning Publication Co, 2004, 400p) 

Master/SlaveOmnimed’s version

Although the basic Lucene Master/Slave option is fairly practical to manage large quantities of data, it requires the copy of index files, an operation which requires a certain time thus limiting the possibility of real time indexing (this could potentially result in data corruption if the copy time is shorter than the writing time of the master server).

A few possible solutions for real-time indexing are proposed here:

However, none combine the possibility of increasing the number of servers, the real-time indexing functionality and the use of our current index structures.

In order to solve these problems, we implemented a solution which combines the Master/Slave architecture suggested by Apache with a temporary local index on an application server. Exploiting the fact that it’s highly unlikely for a user to switches from one application server to another while one of the slave servers is being updated, we created a local index on each of them which only keeps the local index modifications that were done since the last update of its slave server. When the updates are transferred to the slave servers and available to all the application servers, the temporary index is reset. This allows a user who’s working on a server to have access to its own modifications immediately. 

 3. Real time  Master/Slave  architecture with temporary local files.

3. Real time Master/Slave architecture with temporary local files.

When a user modify an object on the Application server (HR), the local index is modified (Tmp#). This modification is also sent to the Database (BD), which triggers an update on the Master index. Each five minutes, the Master copies its modifications on a Shared folder available to the Slave. This shared folder contains two copies of the indexes, one for writing and one for reading, and their role changes every 5 minutes. The Slave servers also got two copies of the indexes, one to write the incoming modifications, and the other to be read by the HR servers. Once the initial modifications are done on the Slave, the local temporary folders are reinitialised.

When a search request is done, the HR server checks for results in both Tmp folders as well as in the Slave readable folder. The duplicates are sorted out and the returned results contains every results it's supposed to, up-to-date with the latest modifications.

Implementation of the new solution

This section presents more technical details, such as code snippets and implementation details, and is more targeted for developers and programmers.

Server configuration

To implement this solution, the first step is to correctly configure the servers (Master servers and Slave servers) in order to have the proper indexing parameters. We’re defining Hibernate’s properties in our sessionFactory to include Hibernate search:

<bean id="sessionFactory" class="...">
…
    <property name="hibernateProperties">
        <props>
            ...
            <prop key="hibernate.search.default.exclusive_index_use">
                false
            </prop>
            <prop key="hibernate.search.default.directory_provider">
                ${indexProperties.directoryProvider}
            </prop>
            <prop key="hibernate.search.default.reader.strategy">
                ${indexProperties.readerStrategy}
            </prop>
            <prop key="hibernate.search.default.refresh">
                ${indexProperties.refreshTime}
            </prop>
            <prop key="hibernate.search.default.indexBase">
                ${indexProperties.indexBase}
            </prop>
            <prop key="hibernate.search.default.sourceBase">
                ${indexProperties.sourceBase}
            </prop>
            <prop key="hibernate.search.default.server_name">
                ${indexProperties.serverName}
            </prop>
            ...
        </props>
    </property>
</bean>
...

and were defining Hibernate’s Master/Slave parameters using properties files:

Master server property file :
indexProperties.directoryProvider=org.hibernate.search.store.impl.FSMasterDirectoryProvider
indexProperties.indexBase=/var/lucene/indexes
indexProperties.sourceBase=/var/MasterLucene/indexes
indexProperties.refreshTime=300
indexProperties.serverName=ProdMasterLucene01
indexProperties.readerStrategy=shared
Slave servers property file :
indexProperties.directoryProvider=org.hibernate.search.store.impl.FSSlaveDirectoryProvider
indexProperties.indexBase=/var/lucene/indexes
indexProperties.sourceBase=/var/MasterLucene/indexes
indexProperties.refreshTime=300
indexProperties.serverName=ProdSlaveLucene01
indexProperties.readerStrategy=shared
Application servers property file :
indexProperties.directoryProvider=com.omnimed.shared.seeker.service.impl.FSTmpDirectoryProvider
indexProperties.indexBase=/var/lucene/indexes
indexProperties.sourceBase=/var/MasterLucene/indexes
indexProperties.refreshTime=1
indexProperties.serverName=ProdHRVirgo01
indexProperties.readerStrategy=com.omnimed.shared.seeker.service.impl.MultiFolderReaderProvider

To enable our architecture to work (incorporation of a temporary local component), we’ve created two new classes derived from the one supplied by Apache Lucene: a directoryProvider which allows the application server to management the temporary index and a readerStrategy which enables multi-folders searches.

Directory provider

We’re using our own implementation of a directoryProvider (an object that manages the creation and availability of folders used for indexing) which enables local indexing folder creation and deletion depending on the slave server operations that are associated to it. Similarly to the slave directoryProvider (FSSlaveDirectoryProvider), it keeps markers indicating which folders available for reading. It also keeps a link (NFS mount) to the Master server in order to know the status of its marker, as well as the same kind of link to the Slave server, for the same reason.

At the initialization, older used for reading is identified on the slave server as well as the one used on the application server, and we create the local index folders and files for the application server.

public void initialize(String directoryProviderName, Properties properties, BuildContext context) {
    
    this.properties = properties;
    this.directoryProviderName = directoryProviderName;
    
    // On manual indexing skip read-write check on index directory
    boolean manual = context.getIndexingStrategy().equals("manual");
    
    slaveIndexDir = DirectoryProviderHelper.getVerifiedIndexDir( directoryProviderName, properties, !manual );
    logger.debug("Slave index directory: " + slaveIndexDir.getPath());
    
    sharedIndexDir = DirectoryProviderHelper.getSourceDirectory(directoryProviderName, properties, false);
    logger.debug("Shared index directory: " + sharedIndexDir.getPath());
    
    
    try {
        indexName = slaveIndexDir.getCanonicalPath();
        
        if (new File(slaveIndexDir, "current1").exists()) {
            slaveCurrent = 1;
        } else if (new File(slaveIndexDir, "current2").exists()) {
            slaveCurrent = 2;
        } else {
            slaveCurrent = 1;
        }
        
        if (new File(sharedIndexDir, "current1").exists()) {
            sharedCurrent = 1;
        } else if (new File(sharedIndexDir, "current2").exists()) {
            sharedCurrent = 2;
        } else {
            sharedCurrent = 1;
        }
        
        directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
        directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
    }
    catch (IOException e) {
        throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
    }
}

An automatic task is created with a refresh time defined by the refreshTime property.

class UpdateTask extends TimerTask {
    private final ExecutorService executor;
    
    public UpdateTask() {
        executor = Executors.newSingleThreadExecutor();
    }
    
    public void run() {
        try {
            // Verification for deleting the right tmp
            if (new File(slaveIndexDir, "current1").exists() && slaveCurrent == 2) {
                if (sharedCurrent == 1) {
                    String[] fileList = directory2.listAll();
                    for (String fileName : fileList) {
                        directory2.deleteFile(fileName);
                    }
                    directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
                } else if (sharedCurrent == 2) {
                    String[] fileList = directory1.listAll();
                    for (String fileName : fileList) {
                        directory1.deleteFile(fileName);
                    }
                    directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
                }
                slaveCurrent = 1;
            } else if (new File(slaveIndexDir, "current2").exists() && slaveCurrent == 1) {
                if (sharedCurrent == 1) {
                    String[] fileList = directory2.listAll();
                    for (String fileName : fileList) {
                        directory2.deleteFile(fileName);
                    }
                    directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
                } else if (sharedCurrent == 2) {
                    String[] fileList = directory1.listAll();
                    for (String fileName : fileList) {
                        directory1.deleteFile(fileName);
                    }
                    directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
                }
                slaveCurrent = 2;
            }
            
            // Verification for the variation of the writing (following the shared current)
            if (new File(sharedIndexDir, "current1").exists() && sharedCurrent == 2) {
                sharedCurrent = 1;
                directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
            } else if (new File(sharedIndexDir, "current2").exists() && sharedCurrent == 1) {
                sharedCurrent = 2;
                directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
            }
            
        }
        catch (IOException e) {
            throw new SearchException( "Unable to swap index directory to: " + directoryProviderName, e );
        }
    }
    
    public void stop() {
        executor.shutdownNow();
    }
}

Reader strategy

Inspired from the SharingBufferReaderProvider provided by Apache Lucene, the MultiFolderReaderProvider implements the reading algorithm in the indexing folders. By creating readers in the temporary files as well as in the slave files, it’s possible to perform search in both folders, and then combine the returned results to obtain all the information, even if it was just updated. This readerProvider uses local tokens indicating which folder is available for reading as well as the markers of the slave server in order to synchronize the creation and deletion of readers in the files. This also indicates when a local index folder needs to be reinitialized.

public IndexReader openIndexReader() {
    logger.debug( "Opening IndexReader for directoryProvider " + indexName);

    if (directoryProvider.getClass().equals(FSTmpDirectoryProvider.class)) {
        FSDirectory[] tmpDirectory = ((FSTmpDirectoryProvider)directoryProvider).getAllDirectory();
    
        /* We need a step to delete the old readers from the map when the tmp resets */
        if (tmpDirectory[0] != currentTmpDirectory1) {
            PerDirectoryLatestReader readerToDelete = currentReaders.get(currentTmpDirectory1);
            if (readerToDelete != null) {
                allReaders.remove(readerToDelete.current.reader);
                currentReaders.remove(currentTmpDirectory1);
            }
            currentTmpDirectory1 = tmpDirectory[0];
        }
        if (tmpDirectory[1] != currentTmpDirectory2) {
            PerDirectoryLatestReader readerToDelete = currentReaders.get(currentTmpDirectory2);
            if (readerToDelete != null) {
                allReaders.remove(readerToDelete.current.reader);
                currentReaders.remove(currentTmpDirectory2);
            }
            currentTmpDirectory2 = tmpDirectory[1];
        }
        IndexReader[] readers = new IndexReader[NUMBER_OF_DIRECTORY];
        ReaderProvider[] managers = new ReaderProvider[NUMBER_OF_DIRECTORY];

        if (new File(indexDir, "current1").exists()) {
            PerDirectoryLatestReader directoryLatestReader = currentReaders.get( slaveDirectory1 );
            if ( directoryLatestReader == null ) {
                directoryLatestReader = createReader( slaveDirectory1 );
            }
            readers[0] = directoryLatestReader.refreshAndGet();
        } else if (new File(indexDir, "current2").exists()) {
            PerDirectoryLatestReader directoryLatestReader = currentReaders.get( slaveDirectory2 );
            if ( directoryLatestReader == null ) {
                directoryLatestReader = createReader( slaveDirectory2 );
            }
            readers[0] = directoryLatestReader.refreshAndGet();
        } else {
            throw new SearchException("Unable to get the latest directory reader in: " + indexName + ". No marker available.");
        }
    
        PerDirectoryLatestReader tmpDirectoryLatestReader1 = currentReaders.get(currentTmpDirectory1);
        if ( tmpDirectoryLatestReader1 == null ) {
            tmpDirectoryLatestReader1 = createReader( currentTmpDirectory1 );
        }
    
        PerDirectoryLatestReader tmpDirectoryLatestReader2 = currentReaders.get(currentTmpDirectory2);
        if ( tmpDirectoryLatestReader2 == null ) {
            tmpDirectoryLatestReader2 = createReader( currentTmpDirectory2 );
        }
        readers[1] = tmpDirectoryLatestReader1.refreshAndGet();
        readers[2] = tmpDirectoryLatestReader2.refreshAndGet();

        managers[0] = new SharingBufferReaderProvider();
        managers[1] = new SharingBufferReaderProvider();
        managers[2] = new SharingBufferReaderProvider();
        
        logger.debug("Finished opening IndexReader for directoryProvider: " + directoryProvider);
        return ReaderProviderHelper.buildMultiReader( NUMBER_OF_DIRECTORY, readers, managers );
    
    } else {
        // This is the usual way to open indexReaders. It would be used 
        // if this reader strategy is combined to a different DirectoryProvider then
        // FSTmpDirectoryProvider
    
        Directory directory = directoryProvider.getDirectory();
        PerDirectoryLatestReader directoryLatestReader = currentReaders.get( directory );
        // might eg happen for FSSlaveDirectoryProvider or for mutable SearchFactory
        if ( directoryLatestReader == null ) {
            directoryLatestReader = createReader( directory );
        }
        logger.debug("Finished opening IndexReader for index " + directoryProvider);
        return directoryLatestReader.refreshAndGet();
    }
}
public void closeIndexReader(IndexReader multiReader) {
    if ( multiReader == null ) {
        return;
    }
    IndexReader[] readers;
    
    if (multiReader instanceof CacheableMultiReader) {
        CacheableMultiReader castMultiReader = ( CacheableMultiReader ) multiReader;
        readers = ReaderProviderHelper.getSubReadersFromMultiReader(castMultiReader);
    } else {
        throw new AssertionFailure( "Everything should be wrapped in a CacheableMultiReader");
    }
    
    logger.debug( "Closing multiReader: " + multiReader.toString());
    for (int i = 0; i< readers.length; i++) {
        allReaders.get(readers[i]).close();
    }
}
public void initialize(DirectoryBasedIndexManager indexManager, Properties props) {
    this.directoryProvider = indexManager.getDirectoryProvider();
    this.indexName = indexManager.getIndexName();
    this.properties = props;
    
    logger.debug("Creating reader for :" + directoryProvider.getDirectory().toString());

    // More step in case of FSTmpDirectoryProvider because of the need to initialize two readers
    if (directoryProvider.getClass().equals(FSTmpDirectoryProvider.class)) {
        indexDir = ((FSTmpDirectoryProvider)directoryProvider).getIndexDir();
        try {
            slaveDirectory1 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "1"), properties);
            createReader( slaveDirectory1 );
            logger.debug("Creating reader for :" + slaveDirectory1.toString());
            
            slaveDirectory2 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "2"), properties);
            createReader( slaveDirectory2 );
            logger.debug("Creating reader for :" + slaveDirectory2.toString());
            
        } catch (IOException e) {
            throw new SearchException( "Unable to initialize index reader for: " + directoryProvider, e );
        }
        FSDirectory[] tmpDirectory = ((FSTmpDirectoryProvider)directoryProvider).getAllDirectory();
        currentTmpDirectory1 = tmpDirectory[0];
        currentTmpDirectory2 = tmpDirectory[1];
    }
    

    logger.debug( "List all added to the map readers :");
    for (IndexReader reader : allReaders.keySet()) {

        logger.debug("All readers keySet: " + reader.toString());
    }
}

Conclusion

The solution we’ve developed from the existing Lucene library answers our need in terms of real-time indexing of large quantities of data available on several servers. This solution is currently used for our patient data base indexing.

In the current case, since our patient cannot be in two establishments at the same time, it’s not important for the information to be available to all of our users immediatley. However, users from the same location can access the modified elements in order to perform a search (ex. a secretary who is modifying medical administrative information of the patient while the doctor is consulting the patient’s record before their appointment). By regrouping by physical location the connections to a given server and by using our architecture, it’s possible for these persons to have access to the index modifications immediately thanks to local indexes, without having to wait for a complete cycle of updates to the slave servers to be completed.

This article demonstrates the extent of Omnimed’s programmers’ versatility and also how our company invests, not only in the development of the application, but also in research of new solutions linked to system development.