Capsule Techno : Architecture d'indexation

Indexation en temps réel et architecture "Master/Slave"

Cet article présente la solution mise en place pour la gestion des index de recherche dossier médical électronique (DMÉ) Omnimed afin de suivre la croissance du nombre d’utilisateurs et du nombre de serveurs d'application (c’est-à-dire les serveurs sur lesquels l’application est hébergée). Un état de la solution en place à l’origine sera fait, suivi d’une explication de la naissance du besoin et finalement d’un aperçu de la solution établie. Les détails techniques seront exposés à la fin.

La solution d’origine

Afin de permettre une recherche rapide et efficace des patients contenus dans notre base de données, nos utilisateurs ont accès à un moteur de recherche intégré à l’application reposant sur l’architecture de la solution Apache Lucene. Cette librairie permet d’intégrer des fonctions d’indexation et de recherche basées sur le contenu (le texte).

Au départ, nos besoins en terme de recherche étaient principalement limités à la recherche de patients et d’utilisateurs, ce qui faisait en sorte que les opérations de mise à jour de l’index de recherche (les éléments nécessaires à la recherche textuelle) pouvaient aisément être effectuées sans surcharger les serveurs d’application. Il était possible, vu le faible volume d’information à tenir à jour, d’utiliser une solution de communication par Invocation de méthode à distance (Remote Method Invocation (RMI)) afin de maintenir à jour les modifications faites à l’index sur tous les serveurs.

 1. Mise à jour de l'index de recherche (architecture par  RMI )

1. Mise à jour de l'index de recherche (architecture par RMI)

Dans ce type d’architecture, chaque serveur d'application Healthrecord (HR) est responsable de gérer ses propres fichiers d’indexation et de les tenir à jour par l’entremise d’émission et de réception de messages. Il s’agit d’une solution élégante à petite échelle qui a comme avantage principal d’effectuer des mises à jour en temps réel sur les éléments disponibles pour la recherche.

Naissance des besoins

Notre application étant en constante évolution, deux nouveaux besoins se sont développés au fil du temps en lien avec notre solution de recherche, le premier lié à l’augmentation du nombre de données disponibles dans l’application et le second s’imposant par l’augmentation du nombre d’utilisateurs.

En premier lieu, l’augmentation de la quantité de données saisies dans l’application a entraîné le besoin de pouvoir obtenir rapidement certaines informations contenues dans les notes cliniques. Ceci pourrait techniquement être fait par l’utilisation de la recherche par mots-clés, cependant, cette grande quantité d’information fait en sorte que l’indexation serait extrêmement coûteuse pour chaque serveur en terme de ressources. Ceci aurait pour effet de ralentir énormément le temps de réponse des serveurs d'application, ce que nous ne pouvons nous permettre.

En deuxième lieu, afin de demeurer rapide et efficace, le nombre croissant d’utilisateurs nous impose de devoir augmenter la quantité de ressources physiques disponibles et d’augmenter le nombre de serveurs d'application disponibles. Toutefois, l’indexation de nouvelles informations surchargerait de plus en plus les serveurs devant communiquer entre eux constamment pour se mettre à jour, et cela également aurait comme effet de ralentir le temps de réponse de l’application. De plus, la probabilité de concurrence pour la modification d’une même donnée par deux serveurs différents ou plus deviendrait beaucoup trop élevée et le risque de corruption augmenterait radicalement, ce qui est totalement inacceptable. Finalement, Lucene impose également une contrainte de non-lecture lorsque l’index est en train de se faire écrire, ce qui revient à dire que plus il y a de serveurs d’application, plus il y a d’écriture, jusqu’à un point tel où il devient impossible de le consulter.

Nous avons donc réfléchi à la meilleure façon de répondre à ces deux besoins tout en poursuivant notre utilisation de l’indexation Apache Lucene et en conservant la mise à jour en temps réel, et vous présentons ici le résultat de nos réflexions ainsi que la méthode utilisée pour l’implémenter. 

Apache Lucene Master/Slave

Apache Lucene propose un autre type d’architecture différente de celle que nous utilisions permettant une augmentation du nombre de ressources tout en maintenant la rapidité des serveurs d'application. Ce type d’architecture se base sur le concept du maître et de ses esclaves (Master/Slave). Ainsi un serveur « maître » se consacre aux opérations d’indexation suite aux modifications effectuées sur la base de données. Par la suite, d’autres serveurs, soient les « esclaves », viennent s’enquérir auprès du serveur maître des dernières modifications apportées à l’index. Dans l’affirmative d’un changement, le maître transmet les modifications aux  esclaves.

 2. Architecture Master/Slave de base  (Source : . Bauer et G. King, Hibernate in Action, Manning Publication Co, 2004, 400p) 

2. Architecture Master/Slave de base  (Source : . Bauer et G. King, Hibernate in Action, Manning Publication Co, 2004, 400p) 

Master/Slave version Omnimed

Bien que pratique pour indexer un grand volume de données, l’option Master/Slave de Lucene nécessite de copier des fichiers, une opération qui demande un certain temps et qui limite la possibilité de faire de l’indexation en temps réel (il pourrait y avoir des corruptions de données si le temps de copie est plus court que le temps d’écriture du maître).

Quelques solutions pour pallier à ce problème sont proposées ici :

Cependant, aucune ne combine à la fois la possibilité d’augmenter sans impact le nombre de serveurs, la capacité d’indexer en temps réel et l’utilisation de notre index actuel.

Afin de répondre à nos besoins, nous avons implémenté une solution qui combine l’architecture Master/Slave suggérée par Apache ainsi qu’une méthode d’indexation locale temporaire à même un serveur d'application. En exploitant le fait qu’il est improbable qu’un utilisateur change de serveur d'application pendant que les serveurs esclaves sont mis à jour, nous avons créé un index local sur chaque serveur d’application (HR) qui conserve uniquement les modifications locales d’index effectuées depuis la dernière mise à jour du serveur esclave associé. Lorsque les mises à jour sont acheminées aux serveurs esclaves et disponibles pour tous les serveurs d'application, l’index temporaire est réinitialisé. Cela permet à une personne travaillant sur un serveur d’avoir accès à ses modifications immédiatement et de continuer à travailler même si les esclaves n’ont pas encore été mis à jour. Le schéma ci-dessous représente le processus:

 3. Architecture Master/Slave en temps réel avec fichiers locaux temporaires.

3. Architecture Master/Slave en temps réel avec fichiers locaux temporaires.

Lors d'une modification sur un serveur d'application (HR), on écrit la modification d'index sur un fichier temporaire (Tmp#). La modification est écrite dans la base de données (BD), qui déclenche une mise à jour de l'index maître (Master). À toutes les cinq minutes, le Master copie ses modifications d'indexes sur le dossier partagé (Shared) dans le dossier disponible pour l'écriture (indiqué par un jeton dans le dossier). Ce dossier contient deux copie des indexes, une pour l'écriture et l'autre pour la lecture par les esclaves (Slaves), et ils changent de rôle à toutes les cinq minutes. Les serveurs esclaves ont également deux copies des indexes, une disponible en écriture lorsque les modifications arrivent du maitre, et l'autre disponible pour la lecture par les serveurs HR. Lorsque les modifications initialement effectuées sur une serveur HR sont disponible à la lecture dans l'index du slave, il est réinitialisé.

Pour la recherche, lorsqu'une requête est effectuée, le serveur HR consulte ses deux copies d'indexes locaux ainsi que la copie d'indexes disponible à la lecture du serveur esclave. Les résultats en double sont épurés et tous les éléments devant être retournés par la recherche sont disponibles.

Implémentation de la nouvelle solution

Cette section devient plus technique et contient des éléments de code et d'implémentation de la solution, et s'adresse à un public initié à la programmation.

Configuration des serveurs

Afin d’implémenter cette solution, la première étape est de configurer correctement les serveurs (serveur Master et serveurs Slave) afin d’avoir les bons paramètres d’indexation. Pour ce faire, nous définissons les propriétés d’Hibernate dans notre sessionFactory pour inclure Hibernate search :

<bean id="sessionFactory" class="...">
…
    <property name="hibernateProperties">
        <props>
            ...
            <prop key="hibernate.search.default.exclusive_index_use">
                false
            </prop>
            <prop key="hibernate.search.default.directory_provider">
                ${indexProperties.directoryProvider}
            </prop>
            <prop key="hibernate.search.default.reader.strategy">
                ${indexProperties.readerStrategy}
            </prop>
            <prop key="hibernate.search.default.refresh">
                ${indexProperties.refreshTime}
            </prop>
            <prop key="hibernate.search.default.indexBase">
                ${indexProperties.indexBase}
            </prop>
            <prop key="hibernate.search.default.sourceBase">
                ${indexProperties.sourceBase}
            </prop>
            <prop key="hibernate.search.default.server_name">
                ${indexProperties.serverName}
            </prop>
            ...
        </props>
    </property>
</bean>
...

et définissons les paramètres de Hibernate Master/Slave à l’aide de fichiers de propriétés :

Fichier de propriétés du serveur maître :
indexProperties.directoryProvider=org.hibernate.search.store.impl.FSMasterDirectoryProvider
indexProperties.indexBase=/var/lucene/indexes
indexProperties.sourceBase=/var/MasterLucene/indexes
indexProperties.refreshTime=300
indexProperties.serverName=ProdMasterLucene01
indexProperties.readerStrategy=shared
Fichier de propriétés des serveurs esclaves :
indexProperties.directoryProvider=org.hibernate.search.store.impl.FSSlaveDirectoryProvider
indexProperties.indexBase=/var/lucene/indexes
indexProperties.sourceBase=/var/MasterLucene/indexes
indexProperties.refreshTime=300
indexProperties.serverName=ProdSlaveLucene01
indexProperties.readerStrategy=shared
Fichier de propriétés des serveurs d'application :
indexProperties.directoryProvider=com.omnimed.shared.seeker.service.impl.FSTmpDirectoryProvider
indexProperties.indexBase=/var/lucene/indexes
indexProperties.sourceBase=/var/MasterLucene/indexes
indexProperties.refreshTime=1
indexProperties.serverName=ProdHRVirgo01
indexProperties.readerStrategy=com.omnimed.shared.seeker.service.impl.MultiFolderReaderProvider

Afin que cette architecture fonctionne (incorporation d’une composante locale temporaire), nous avons créé deux nouvelles classes dérivées de celles fournies par Apache Lucene : un directoryProvider permettant au serveur d'application de gérer leur index temporaire ainsi qu’un readerStrategy qui permet de rechercher dans plus d’un fichier d’index. 

Directory provider

Dans notre implémentation, le directoryProvider (objet qui gère la création et la disponibilité des dossiers pour l’indexation) permet de créer des dossiers d’indexation locaux et de les supprimer en fonction des opérations sur le serveur esclave qui lui est associé. Un peu comme le directoryProvider de type slave (FSSlaveDirectoryProvider), il fonctionne à deux dossiers et conserve un marqueur indiquant quel dossier est disponible pour l’écriture pendant que l’autre est disponible pour la lecture. Il garde également un lien (par mount NFS) vers le serveur Master afin de connaître l’état de son marqueur, ainsi qu’un lien vers le serveur slave, pour la même raison.

À l’initialisation, le provider identifie le dossier utilisé pour lire sur le serveur esclave ainsi que celui utilisé sur le serveur d'application, et il crée les fichiers d’index pour le serveur d'application. 

public void initialize(String directoryProviderName, Properties properties, BuildContext context) {
    
    this.properties = properties;
    this.directoryProviderName = directoryProviderName;
    
    // On manual indexing skip read-write check on index directory
    boolean manual = context.getIndexingStrategy().equals("manual");
    
    slaveIndexDir = DirectoryProviderHelper.getVerifiedIndexDir( directoryProviderName, properties, !manual );
    logger.debug("Slave index directory: " + slaveIndexDir.getPath());
    
    sharedIndexDir = DirectoryProviderHelper.getSourceDirectory(directoryProviderName, properties, false);
    logger.debug("Shared index directory: " + sharedIndexDir.getPath());
    
    
    try {
        indexName = slaveIndexDir.getCanonicalPath();
        
        if (new File(slaveIndexDir, "current1").exists()) {
            slaveCurrent = 1;
        } else if (new File(slaveIndexDir, "current2").exists()) {
            slaveCurrent = 2;
        } else {
            slaveCurrent = 1;
        }
        
        if (new File(sharedIndexDir, "current1").exists()) {
            sharedCurrent = 1;
        } else if (new File(sharedIndexDir, "current2").exists()) {
            sharedCurrent = 2;
        } else {
            sharedCurrent = 1;
        }
        
        directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
        directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
    }
    catch (IOException e) {
        throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
    }
}

Une tâche automatique est créée avec comme temps de répétition celui défini par la propriété refreshTime

class UpdateTask extends TimerTask {
    private final ExecutorService executor;
    
    public UpdateTask() {
        executor = Executors.newSingleThreadExecutor();
    }
    
    public void run() {
        try {
            // Verification for deleting the right tmp
            if (new File(slaveIndexDir, "current1").exists() && slaveCurrent == 2) {
                if (sharedCurrent == 1) {
                    String[] fileList = directory2.listAll();
                    for (String fileName : fileList) {
                        directory2.deleteFile(fileName);
                    }
                    directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
                } else if (sharedCurrent == 2) {
                    String[] fileList = directory1.listAll();
                    for (String fileName : fileList) {
                        directory1.deleteFile(fileName);
                    }
                    directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
                }
                slaveCurrent = 1;
            } else if (new File(slaveIndexDir, "current2").exists() && slaveCurrent == 1) {
                if (sharedCurrent == 1) {
                    String[] fileList = directory2.listAll();
                    for (String fileName : fileList) {
                        directory2.deleteFile(fileName);
                    }
                    directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
                } else if (sharedCurrent == 2) {
                    String[] fileList = directory1.listAll();
                    for (String fileName : fileList) {
                        directory1.deleteFile(fileName);
                    }
                    directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
                }
                slaveCurrent = 2;
            }
            
            // Verification for the variation of the writing (following the shared current)
            if (new File(sharedIndexDir, "current1").exists() && sharedCurrent == 2) {
                sharedCurrent = 1;
                directory1 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_1"), properties );
            } else if (new File(sharedIndexDir, "current2").exists() && sharedCurrent == 1) {
                sharedCurrent = 2;
                directory2 = DirectoryProviderHelper.createFSIndex( new File(slaveIndexDir, "tmp_" + getServerName() + "_2"), properties );
            }
            
        }
        catch (IOException e) {
            throw new SearchException( "Unable to swap index directory to: " + directoryProviderName, e );
        }
    }
    
    public void stop() {
        executor.shutdownNow();
    }
}

Reader strategy

S’inspirant du SharingBufferReaderProvider fourni de base par Apache Lucene, le MultiFolderReaderProvider implémente l’algorithme de lecture dans les dossiers d’indexation. En créant des lecteurs dans les fichiers temporaires ainsi que dans les fichiers esclaves, il est possible d’effectuer les recherches dans les deux index, et ensuite de combiner les résultats retournés pour obtenir toute l’information, même celle qui vient tout juste d’être mise à jour. Ce readerProvider utilise les jetons locaux indiquant quel dossier est valide pour la lecture ainsi que les jetons du serveur esclave afin de synchroniser la création et la suppression de lecteur dans les fichiers.  Cela indique également quand un fichier d’index local est prêt a être réinitialisé.

public IndexReader openIndexReader() {
    logger.debug( "Opening IndexReader for directoryProvider " + indexName);

    if (directoryProvider.getClass().equals(FSTmpDirectoryProvider.class)) {
        FSDirectory[] tmpDirectory = ((FSTmpDirectoryProvider)directoryProvider).getAllDirectory();
    
        /* We need a step to delete the old readers from the map when the tmp resets */
        if (tmpDirectory[0] != currentTmpDirectory1) {
            PerDirectoryLatestReader readerToDelete = currentReaders.get(currentTmpDirectory1);
            if (readerToDelete != null) {
                allReaders.remove(readerToDelete.current.reader);
                currentReaders.remove(currentTmpDirectory1);
            }
            currentTmpDirectory1 = tmpDirectory[0];
        }
        if (tmpDirectory[1] != currentTmpDirectory2) {
            PerDirectoryLatestReader readerToDelete = currentReaders.get(currentTmpDirectory2);
            if (readerToDelete != null) {
                allReaders.remove(readerToDelete.current.reader);
                currentReaders.remove(currentTmpDirectory2);
            }
            currentTmpDirectory2 = tmpDirectory[1];
        }
        IndexReader[] readers = new IndexReader[NUMBER_OF_DIRECTORY];
        ReaderProvider[] managers = new ReaderProvider[NUMBER_OF_DIRECTORY];

        if (new File(indexDir, "current1").exists()) {
            PerDirectoryLatestReader directoryLatestReader = currentReaders.get( slaveDirectory1 );
            if ( directoryLatestReader == null ) {
                directoryLatestReader = createReader( slaveDirectory1 );
            }
            readers[0] = directoryLatestReader.refreshAndGet();
        } else if (new File(indexDir, "current2").exists()) {
            PerDirectoryLatestReader directoryLatestReader = currentReaders.get( slaveDirectory2 );
            if ( directoryLatestReader == null ) {
                directoryLatestReader = createReader( slaveDirectory2 );
            }
            readers[0] = directoryLatestReader.refreshAndGet();
        } else {
            throw new SearchException("Unable to get the latest directory reader in: " + indexName + ". No marker available.");
        }
    
        PerDirectoryLatestReader tmpDirectoryLatestReader1 = currentReaders.get(currentTmpDirectory1);
        if ( tmpDirectoryLatestReader1 == null ) {
            tmpDirectoryLatestReader1 = createReader( currentTmpDirectory1 );
        }
    
        PerDirectoryLatestReader tmpDirectoryLatestReader2 = currentReaders.get(currentTmpDirectory2);
        if ( tmpDirectoryLatestReader2 == null ) {
            tmpDirectoryLatestReader2 = createReader( currentTmpDirectory2 );
        }
        readers[1] = tmpDirectoryLatestReader1.refreshAndGet();
        readers[2] = tmpDirectoryLatestReader2.refreshAndGet();

        managers[0] = new SharingBufferReaderProvider();
        managers[1] = new SharingBufferReaderProvider();
        managers[2] = new SharingBufferReaderProvider();
        
        logger.debug("Finished opening IndexReader for directoryProvider: " + directoryProvider);
        return ReaderProviderHelper.buildMultiReader( NUMBER_OF_DIRECTORY, readers, managers );
    
    } else {
        // This is the usual way to open indexReaders. It would be used 
        // if this reader strategy is combined to a different DirectoryProvider then
        // FSTmpDirectoryProvider
    
        Directory directory = directoryProvider.getDirectory();
        PerDirectoryLatestReader directoryLatestReader = currentReaders.get( directory );
        // might eg happen for FSSlaveDirectoryProvider or for mutable SearchFactory
        if ( directoryLatestReader == null ) {
            directoryLatestReader = createReader( directory );
        }
        logger.debug("Finished opening IndexReader for index " + directoryProvider);
        return directoryLatestReader.refreshAndGet();
    }
}
public void closeIndexReader(IndexReader multiReader) {
    if ( multiReader == null ) {
        return;
    }
    IndexReader[] readers;
    
    if (multiReader instanceof CacheableMultiReader) {
        CacheableMultiReader castMultiReader = ( CacheableMultiReader ) multiReader;
        readers = ReaderProviderHelper.getSubReadersFromMultiReader(castMultiReader);
    } else {
        throw new AssertionFailure( "Everything should be wrapped in a CacheableMultiReader");
    }
    
    logger.debug( "Closing multiReader: " + multiReader.toString());
    for (int i = 0; i< readers.length; i++) {
        allReaders.get(readers[i]).close();
    }
}
public void initialize(DirectoryBasedIndexManager indexManager, Properties props) {
    this.directoryProvider = indexManager.getDirectoryProvider();
    this.indexName = indexManager.getIndexName();
    this.properties = props;
    
    logger.debug("Creating reader for :" + directoryProvider.getDirectory().toString());

    // More step in case of FSTmpDirectoryProvider because of the need to initialize two readers
    if (directoryProvider.getClass().equals(FSTmpDirectoryProvider.class)) {
        indexDir = ((FSTmpDirectoryProvider)directoryProvider).getIndexDir();
        try {
            slaveDirectory1 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "1"), properties);
            createReader( slaveDirectory1 );
            logger.debug("Creating reader for :" + slaveDirectory1.toString());
            
            slaveDirectory2 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "2"), properties);
            createReader( slaveDirectory2 );
            logger.debug("Creating reader for :" + slaveDirectory2.toString());
            
        } catch (IOException e) {
            throw new SearchException( "Unable to initialize index reader for: " + directoryProvider, e );
        }
        FSDirectory[] tmpDirectory = ((FSTmpDirectoryProvider)directoryProvider).getAllDirectory();
        currentTmpDirectory1 = tmpDirectory[0];
        currentTmpDirectory2 = tmpDirectory[1];
    }
    

    logger.debug( "List all added to the map readers :");
    for (IndexReader reader : allReaders.keySet()) {

        logger.debug("All readers keySet: " + reader.toString());
    }
}

Conclusion

La solution que nous avons développée à partir d’une solution existante de Lucene comble notre besoin d’indexer un large volume de données disponibles sur plusieurs serveurs, tout en permettant une mise à jour en temps réel. Cette solution est actuellement utilisée pour l’indexation de notre base de données de patient.

Dans notre problème, nous considérons qu’un patient ne peut pas être à deux établissements en même temps, et qu’il n’est pas pertinent que les mises à jour soient immédiatement disponibles sur tous les serveurs d’application en même temps. Cependant, les utilisateurs d’un même lieu peuvent vouloir accéder aux éléments modifiés pour faire une recherche (ex. une secrétaire qui fait une modification aux informations médico-administratives du patient en même temps qu’un médecin qui consulte le dossier du patient avant que celui-ci ne le rencontre). En regroupant par lieu physique les connexions sur un même serveur et en utilisant notre architecture, il est possible pour ces personnes d’avoir accès aux modifications immédiatement grâce aux index locaux, sans devoir attendre qu’un cycle complet de mise à jour des esclaves soit complété.

Cette démonstration présente bien à quel point les développeurs chez Omnimed sont polyvalents et que notre entreprise s’investit non seulement dans le développement de notre application, mais aussi dans la recherche de nouvelle solution liée au développement informatique.