Précédent Index Suivant

Synchronisation des processus

C'est dans le cadre de processus partageant une même zone mémoire que le mot << concurrence >> prend tout son sens : les divers processus impliqués sont en concurrence pour l'accès à cette unique ressource qu'est la mémoire2. Au problème du partage des ressources vient s'ajouter celui du manque de maîtrise de l'alternance et du temps d'exécution des processus concurrents.

Le système qui gère l'ensemble des processus peut à tout moment interrompre un calcul en cours. Ainsi lorsque deux processus coopèrent, ils doivent pouvoir garantir l'intégrité des manipulations de certaines données partagées. Pour cela, un processus doit pouvoir rester maître de ces données tant qu'il n'a pas achevé un calcul ou toute autre opération (par exemple, une acquisition sur un périphérique). Pour garantir l'exclusivité d'accès aux données à un seul processus, on met en oeuvre un mécanisme dit d'exclusion mutuelle.

Section critique et exclusion mutuelle

Les mécanismes d'exclusion mutuelle sont implantés à l'aide de données particulières appelées verrous. Les opérations sur les verrous sont limitées à leur création, leur pose et leur libération. Un verrou est la plus petite donnée partagée par un ensemble de processus concurrents. Sa manipulation est toujours exclusive. À la notion d'exclusivité de manipulation d'un verrou s'ajoute celle d'exclusivité de détention : seul le processus qui a pris un verrou peut le libérer ; si d'autres processus veulent à leur tour disposer de ce verrou, ils doivent en attendre la libération par le processus détenteur.

Module Mutex

Le module Mutex est utilisé pour créer des verrous entre processus en relation d'exclusion mutuelle sur une zone mémoire. Nous allons illustrer leur utilisation par deux petits exemples classiques de concurrence.

Les fonctions de création, prise et libération des verrous sont :

# Mutex.create ;;
- : unit -> Mutex.t = <fun>
# Mutex.lock ;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock ;;
- : Mutex.t -> unit = <fun>


Il existe une variante à la prise de verrou qui n'est pas bloquante :

# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
Si le verrou est déjà pris, la fonction retourne false. Dans l'autre cas, la fonction prend le verrou et retourne true.

Les philosophes orientaux

Cette petite histoire, due à Dijkstra, illustre un pur problème de partage de ressources. Elle se raconte ainsi : << Cinq philosophes orientaux partagent leur temps entre l'étude et la venue au réfectoire pour manger un bol de riz. La salle affectée à la sustentation des philosophes ne comprend qu'une seule table ronde sur laquelle sont disposés un grand plat de riz (toujours plein) cinq bols et cinq baguettes. >>




Figure 19.1 : la table des philosophes orientaux


Comme on le voit sur la figure 19.1, un philosophe qui prend les deux baguettes autour de son bol empêche ses voisins d'en faire autant. Dès qu'il dépose l'une de ses baguettes, son voisin, affamé, peut s'en emparer. Le cas échéant, ce dernier devra attendre que l'autre baguette soit disponible. Les baguettes sont ici les ressources partagées.

Pour simplifier les choses, on supposera que chaque philosophe a des habitudes et vient toujours à la même place. On modélise les cinq baguettes par autant de verrous stockés dans un vecteur b.

# let b =
let b0 = Array.create 5 (Mutex.create()) in
for i=1 to 4 do b0.(i) <- Mutex.create() done;
b0 ;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]


Manger et méditer sont simulés par une suspension des processus.

# let mediter = Thread.delay
and manger = Thread.delay ;;
val mediter : float -> unit = <fun>
val manger : float -> unit = <fun>


On modélise un philosophe par une fonction qui exécute à l'infini la séquence des actions de la petite histoire de Dijkstra. La prise d'une baguette est simulée par l'obtention d'un verrou, ainsi un seul philosophe à la fois peut détenir une baguette. On introduit un petit temps de réflexion entre la prise et le dépôt de chacune des deux baguettes ainsi que quelques affichages traçant l'activité du philosophe.

# let philosophe i =
let ii = (i+1) mod 5
in while true do
mediter 3. ;
Mutex.lock b.(i);
Printf.printf "Le philosophe (%d) prend sa baguette gauche" i ;
Printf.printf " et médite encore un peu\n";
mediter 0.2;
Mutex.lock b.(ii);
Printf.printf "Le philosophe (%d) prend sa baguette droite\n" i;
manger 0.5;
Mutex.unlock b.(i);
Printf.printf "Le philosophe (%d) repose sa baguette gauche" i;
Printf.printf " et commence déjà à méditer\n";
mediter 0.15;
Mutex.unlock b.(ii);
Printf.printf "Le philosophe (%d) repose sa baguette droite\n" i
done ;;
val philosophe : int -> unit = <fun>


On pourra tester ce petit programme en exécutant :

for i=0 to 4 do ignore (Thread.create philosophe i) done ;
while true do Thread.delay 5. done ;;


On suspend, dans la boucle infinie while, le processus principal de façon à augmenter les chances des processus philosophes d'entrer en action. On utilise dans la boucle d'activité des philosophes des délais d'attente dont la durée est choisie aléatoirement dans le but de créer un peu de disparité dans l'exécution parallèle des processus.

Problèmes de la solution naïve
Il peut arriver une chose terrible à nos philosophes : ils arrivent tous en même temps et se saisissent de leur baguette gauche. Dans ce cas nous sommes dans une situation d'inter-blocage. Plus aucun philosophe ne pourra manger ! Nous sommes dans un cas de famine.

Pour éviter cela, les philosophes peuvent reposer une baguette s'il n'arrivent pas à prendre la deuxième. Cela est fort courtois, mais permet à deux philosophes de se liguer contre un troisième pour l'empêcher de se sustenter en ne relâchant les baguettes que si l'autre voisin les a prises. Il existe diverses solutions à ce problème. L'une d'elle fait l'objet de l'exercice page ??.

Producteurs et consommateurs I

Le couple producteurs-consommateurs est un exemple classique de la programmation concurrente. Un groupe de processus, désignés comme les producteurs, est chargé d'emmagasiner des données dans une file d'attente ; un second groupe, les consommateurs, est chargé de les déstocker. Chaque intervenant exclut les autres.

Nous implantons ce schéma en utilisant une file d'attente partagée entre les producteurs et les consommateurs. Pour garantir le bon fonctionnement de l'ensemble, la file d'attente est manipulée en exclusion mutuelle afin de garantir l'intégrité des opérations d'ajout et de retrait.

f est la file d'attente partagée, et m  le verrou d'exclusion mutuelle.

# let f = Queue.create () and m = Mutex.create () ;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>


Nous divisons l'activité d'un producteur en deux parties : créer un produit (fonction produire) et stocker un produit (fonction stocker). Seule l'opération de stockage a besoin du verrou.

# let produire i p d =
incr p ;
Thread.delay d ;
Printf.printf "Le producteur (%d) a produit %d\n" i !p ;
flush stdout ;;
val produire : int -> int ref -> float -> unit = <fun>

# let stocker i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Le producteur (%d) a ajouté son %d-ième produit\n" i !p ;
flush stdout ;
Mutex.unlock m ;;
val stocker : int -> int ref -> unit = <fun>


Le code du producteur est une boucle sans fin de création et de stockage. Nous introduisons un délai aléatoire d'attente à la fin de chaque itération afin d'obtenir un effet de désynchronisation dans l'exécution.

# let producteur i =
let p = ref 0 and d = Random.float 2.
in while true do
produire i p d ;
stocker i p ;
Thread.delay (Random.float 2.5)
done ;;
val producteur : int -> unit = <fun>


La seule opération du consommateur est le retrait d'un élément de la file en prenant garde à l'existence effective d'un produit.

# let consommateur i =
while true do
Mutex.lock m ;
( try
let ip, p = Queue.take f
in Printf.printf "Le consommateur(%d) " i ;
Printf.printf "a retiré le produit (%d,%d)\n" ip p ;
flush stdout ;
with
Queue.Empty ->
Printf.printf "Le consommateur(%d) " i ;
print_string "est reparti les mains vides\n" ) ;
Mutex.unlock m ;
Thread.delay (Random.float 2.5)
done ;;
val consommateur : int -> unit = <fun>


Le programme de test suivant crée quatre producteurs et quatre consommateurs.

for i = 0 to 3 do
ignore (Thread.create producteur i);
ignore (Thread.create consommateur i)
done ;
while true do Thread.delay 5. done ;;


Attente et synchronisation

La relation d'exclusion mutuelle n'est pas assez fine pour décrire la synchronisation entre processus. Il n'est pas rare que le travail d'un processus dépende de la réalisation d'une action par un autre processus, modifiant par là une certaine condition. Il est alors souhaitable que les processus puissent communiquer le fait que cette condition a pu changer, indiquant aux processus en attente de la tester de nouveau. Les différents processus sont alors en relation d'exclusion mutuelle avec communication.

Dans l'exemple précédent, un consommateur, plutôt que repartir les mains vides pourrait attendre qu'un producteur vienne réapprovisionner le stock. Ce dernier pourra signaler au consommateur en attente qu'il y a quelque chose à emporter. Le modèle de l'attente sur condition d'une prise de verrou est connu sous le nom de sémaphore.

Sémaphores
Un sémaphore est une variable entière s ne pouvant prendre que des valeurs positives (ou nulles). Une fois s initialisée, les seules opérations admises sont : wait(s) et signal(s), notées respectivement P(s) et V(s). Elles sont définies ainsi, s correspond au nombre de ressources d'un type donné. Un sémaphore ne prenant que les valeurs 0 ou 1 est appelé sémaphore binaire.

Module Condition

Les fonctions du module Condition implantent des primitives de mise en sommeil et de réveil de processus sur un signal. Un signal, dans ce cadre, est une variable partagée par l'ensemble des processus. Son type est abstrait et les fonctions de manipulation sont :
create
: unit -> Condition.t qui crée un nouveau signal.
signal
: Condition.t -> unit qui réveille l'un des processus en attente du signal.
broadcast
: Condition.t -> unit qui réveille l'ensemble des processus en attente du signal.
wait
: Condition.t -> Mutex.t -> unit qui suspend le processus appelant sur le signal passé en premier argument. Le second argument est un verrou utilisé pour protéger la manipulation du signal. Il est libéré, puis repositionné à chaque exécution de la fonction.

Producteurs et consommateurs (2)

Nous reprenons l'exemple des producteurs et consommateurs en utilisant le mécanisme des variables de condition pour mettre en attente un consommateur arrivant alors que le magasin est vide.

Pour réaliser la synchronisation entre attente d'un consommateur et production, on déclare :

# let c = Condition.create () ;;
val c : Condition.t = <abstr>


On modifie la fonction de stockage du producteur en lui ajoutant l'émission d'un signal :

# let stocker2 i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Le producteur (%d) a ajoute son %d-ième produit\n" i !p ;
flush stdout ;
Condition.signal c ;
Mutex.unlock m ;;
val stocker2 : int -> int ref -> unit = <fun>
# let producteur2 i =
let p = ref 0 in
let d = Random.float 2.
in while true do
produire i p d;
stocker2 i p;
Thread.delay (Random.float 2.5)
done ;;
val producteur2 : int -> unit = <fun>


L'activité du consommateur se fait alors en deux temps : attendre qu'un produit soit disponible puis emporter le produit. Le verrou est pris quand l'attente prend fin et il est rendu dès que le consommateur a emporté son produit. L'attente se fait sur la variable c.

# let attendre2 i =
Mutex.lock m ;
while Queue.length f = 0 do
Printf.printf "Le consommateur (%d) attend\n" i ;
Condition.wait c m
done ;;
val attendre2 : int -> unit = <fun>
# let emporter2 i =
let ip, p = Queue.take f in
Printf.printf "Le consommateur (%d) " i ;
Printf.printf "emporte le produit (%d, %d)\n" ip p ;
flush stdout ;
Mutex.unlock m ;;
val emporter2 : int -> unit = <fun>
# let consommateur2 i =
while true do
attendre2 i;
emporter2 i;
Thread.delay (Random.float 2.5)
done ;;
val consommateur2 : int -> unit = <fun>
Notons qu'il n'est plus besoin de vérifier l'existence effective d'un produit puisqu'un consommateur doit attendre l'existence d'un produit dans la file d'attente pour que sa suspension prenne fin. Vu que la fin de son attente correspond à la prise du verrou, il ne court pas le risque de se faire dérober le nouveau produit avant de l'avoir emporté.

Lecteurs et écrivain

Voici un autre exemple classique de processus concurrents dans lequel les agents n'ont pas le même comportement vis-à-vis de la donnée partagée.

Un écrivain et des lecteurs opèrent sur une donnée partagée. L'action du premier est susceptible de rendre momentanément les données incohérentes alors que les seconds n'ont qu'une action passive. La difficulté vient du fait que nous ne souhaitons pas interdire à plusieurs lecteurs de consulter la donnée simultanément. Une solution à ce problème est de gérer un compteur dénombrant les lecteurs en train d'accéder aux données. L'écriture ne sera acceptable que si le nombre de lecteurs est nul.

La donnée est symbolisée par l'entier data qui prendra la valeur 0 ou 1. La valeur 0 signifie que la donnée est prête pour la lecture :

# let data = ref 0 ;;
val data : int ref = {contents=0}


Les opérations sur le compteur n sont protégées par le verrou m :

# let n = ref 0 ;;
val n : int ref = {contents=0}
# let m = Mutex.create () ;;
val m : Mutex.t = <abstr>
# let cpt_incr () = Mutex.lock m ; incr n ; Mutex.unlock m ;;
val cpt_incr : unit -> unit = <fun>
# let cpt_decr () = Mutex.lock m ; decr n ; Mutex.unlock m ;;
val cpt_decr : unit -> unit = <fun>
# let cpt_signal () = Mutex.lock m ;
if !n=0 then Condition.signal c ;
Mutex.unlock m ;;
val cpt_signal : unit -> unit = <fun>


Les lecteurs mettent à jour le compteur et émettent le signal c lorsque plus aucun lecteur n'est présent. Ils indiquent par là à l'écrivain qu'il peut entrer en action.

# let c = Condition.create () ;;
val c : Condition.t = <abstr>
# let lire i =
cpt_incr () ;
Printf.printf "Le lecteur (%d) lit (donnée=%d)\n" i !data ;
Thread.delay (Random.float 1.5) ;
Printf.printf "Le lecteur (%d) a fini sa lecture\n" i ;
cpt_decr () ;
cpt_signal () ;;
val lire : int -> unit = <fun>

# let lecteur i = while true do lire i ; Thread.delay (Random.float 1.5) done ;;
val lecteur : int -> unit = <fun>


L'écrivain tente de bloquer le compteur pour interdire aux lecteurs d'accéder à la donnée partagée. Mais il ne peut le faire que si le compteur est nul, sinon il attend le signal lui indiquant que c'est le cas.

# let ecrire () =
Mutex.lock m ;
while !n<>0 do Condition.wait c m done ;
print_string "L'écrivain écrit\n" ; flush stdout ;
data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ;
Mutex.unlock m ;;
val ecrire : unit -> unit = <fun>

# let ecrivain () =
while true do ecrire () ; Thread.delay (Random.float 1.5) done ;;
val ecrivain : unit -> unit = <fun>


On crée pour tester ces fonctions un écrivain et six lecteurs.

ignore (Thread.create ecrivain ());
for i=0 to 5 do ignore(Thread.create lecteur i) done;
while true do Thread.delay 5. done ;;


Cette solution garantit que ni l'écrivain ni les lecteurs n'ont accès aux données en même temps. Par contre, rien ne garantit que l'écrivain puisse un jour remplir son office, nous sommes confrontés là encore à un cas de famine.


Précédent Index Suivant