Synchronisation des processus
C'est dans le cadre de processus partageant une même zone mémoire
que le mot << concurrence >> prend tout son sens : les divers processus
impliqués sont en concurrence pour l'accès à cette unique
ressource qu'est la mémoire2. Au problème du partage de
ressource vient s'ajouter celui du manque de maîtrise de
l'alternance et du temps d'exécution des processus concurrents.
Le système qui gère l'ensemble des processus, peut à tout moment
interrompre un calcul en cours. Ainsi
lorsque deux processus coopèrent, ils doivent pouvoir garantir
l'intégrité des manipulations de certaines données
partagées. Pour celà, un processus doit pouvoir rester maître de
ces données tant qu'il n'a pas achevé un calcul ou toute autre
opération (par exemple, une acquisition sur un périphérique).
Pour garantir l'exclusivité d'accès aux données
à un seul processus, on met en oeuvre un mécanisme dit d'exclusion mutuelle.
Section critique et exclusion mutuelle
Les mécanismes d'exclusion mutuelle sont implantés à l'aide de
données particulières appelées verrous. Les opérations
sur les verrous sont limitées à leur création, leur pose et leur
libération. Un verrou est la plus petite donnée partagée par un
ensemble de processus concurrents. Sa manipulation est toujours
exclusive. À la notion d'exclusivité de manipulation d'un verrou
s'ajoute celle d'exclusivité de détention : seul le processus
qui a pris un verrou peut le libérer ; si d'autres processus
veulent à leur tour disposer de ce verrou, ils doivent en attendre
la libération par le processus détenteur.
Le module Mutex est utiliser pour créer des verrous
entre processus en relation d'exclusion mutuelle
sur une zone mémoire.
Module Mutex
Les fonctions du module Mutex implantent la gestion des
verrous. Nous allons illustrer leur utilisation par deux petits
exemples classiques de concurrence.
Une fois un verrou créé, un thread peut le prendre et le relâcher.
# Mutex.create ;;
- : unit -> Mutex.t = <fun>
# Mutex.lock ;;
- : Mutex.t -> unit = <fun>
# Mutex.unlock ;;
- : Mutex.t -> unit = <fun>
La fonction lock tente de prendre un verrou et y parvient si
il est libre. Dans le cas contraire, le processus est suspendu tant
que ce verrou n'a pas été relâché. Un seul processus ne peut
tenir un verrou dans le même temps.
Il existe une variante à la prise de verrou qui n'est pas bloquante.
# Mutex.try_lock;;
- : Mutex.t -> bool = <fun>
Si le verrou est déjà pris, la fonction retourne false. Dans
l'autre cas, la fonction prend le verrou et retourne true.
Les philosophes orientaux
Cette petite histoire, due à Dijkstra, illustre un pur problème de
partage de ressources.
Cinq philosophes orientaux partagent leur temps entre l'étude et la
venue au réfectoire pour manger un bol de riz. La salle affectée à
la sustentation des philosophes ne comprend qu'une seule table ronde sur
laquelle sont disposés un grand plat de riz (toujours plein) cinq
bols et cinq baguettes.
Figure 4.1 : La table des philosophes orientaux
<<<<<<< PC.tex
Comme on le voit sur cette figure, un philosophe qui prend les deux
baguettes autour de son bol empêche ses voisins d'en
faire autant. Dès qu'il dépose l'une de ses baguettes, son voisin,
affamé, peut s'en emparer. Le cas échéant, ce dernier devra
attendre que l'autre baguette soit disponible.
Pour simplifier les choses, on supposera que chaque philosophe a des habitudes
et vient
toujours à la même place. On modélise les cinq baguettes par
autant de verrous stockés dans un vecteur b. Ainsi, un seul
philosophe à la fois peut détenir une baguette. Manger et
méditer sont simulés par une suspension des processus. On
introduit un petit temps de réflexion entre la prise et le dépôt
de chacune des deux baguettes.
# let b =
let b0 = Array.create 5 (Mutex.create()) in
for i=1 to 4 do b0.(i) <- Mutex.create() done;
b0 ;;
val b : Mutex.t array = [|<abstr>; <abstr>; <abstr>; <abstr>; <abstr>|]
# =======
Comme on le voit sur la figure \ref{fig-philosophes}, un philosophe
qui prend les deux baguettes autour de son bol empêche ses voisins
imm\'ediats d'en faire autant. Dès qu'il dépose l'une de ses
baguettes, son voisin, affamé, peut s'en emparer. Le cas échéant, ce
dernier devra attendre que l'autre baguette soit disponible.
%Pour simplifier les choses, on supposera que chaque philosophe a des habitudes
%et vient toujours à la même place.
On modélise les philosophes par cinq processus et les cinq baguettes
par autant de verrous. Un philosophe qui s'empare d'une baguette est
un processus qui prend un verrou. Ainsi, un seul philosophe à la fois
peut détenir une baguette donner. La méditation et la mastication sont
simulées par une suspension d'un processus. On introduit un petit
delai entre la prise et le dép\^ot d'une baguette.
\begin{caml}
let b = let b0 = Array.create 5 (Mutex.create())
in Array.map (fun _ -> Mutex.create ()) b0 ; b0 ;;
Characters 0-7:
Syntax error
# >>>>>>> 1.23
let mediter t = Thread.delay (Random.float t)
let manger = mediter ;;
Characters 0-7:
Syntax error
# let philosophe i =
let ii = (i+1) mod 5
in while true do
mediter 3. ;
Mutex.lock b.(i);
Printf.printf "Le philosophe (%d) prend sa baguette gauche" i ;
Printf.printf " et médite encore un peu\n";
mediter 0.2;
Mutex.lock b.(ii);
Printf.printf "Le philosophe (%d) prend sa baguette droite\n" i;
manger 0.5;
Mutex.unlock b.(i);
Printf.printf "Le philosophe (%d) repose sa baguette gauche" i;
Printf.printf " et commence déjà à méditer\n";
mediter 0.15;
Mutex.unlock b.(ii);
Printf.printf "Le philosophe (%d) repose sa baguette droite\n" i
done ;;
Characters 69-76:
Unbound value mediter
On pourra tester ce petit programme en exécutant :
for i=0 to 4 do ignore (Thread.create philosophe i) done ;
while true do Thread.delay 5. done ;;
On suspend, dans la boucle infinie while, le processus
principal de façon à augmenter les chances des processus
philosophes d'entrer en action.
On utilise dans la boucle d'activité des
philosophes des délais d'attente entre certaines opérations. Ces
délais, dont la durée est choisie aléatoirement, dans une
certaine limite, on pour but de créer un peu de disparité dans
l'exécution parallèle des processus.
Problèmes de la solution naïve
Il peut arriver une chose terrible à nos philosophes : ils arrivent
tous en même temps et se saisissent de leur baguette gauche. Dans ce
cas nous sommes dans une situation dite d'inter-blocage. Plus
aucun philosophe ne pourra manger ! Nous sommes dans un cas de famine.
Pour éviter cela, les philosophes peuvent reposer une baguette
s'il n'arrivent pas à prendre la deuxième.
Cela est fort courtois, mais permet
à deux philosophes
de se liguer contre un troisième pour l'empêcher de se sustenter
en ne relâchant les baguettes que si l'autre voisin les a prises.
Il existe diverses solutions à ce
problème. L'une d'elle fait l'objet de l'exercice page X.
Producteurs et consommateurs I
Le couple producteurs-consommateurs est un exemple
classique de la programmation concurrente. Un groupe de processus,
les producteurs,
est chargé d'emmagasiner des données dans une file d'attente ; un second
groupe, les consommateurs, est chargé de les déstocker. Chaque
intervenant exclut les autres.
Nous implantons ce schéma en utilisant une file d'attente partagée
entre les producteurs et les consommateurs. Pour garantir le bon
fonctionnement de l'ensemble, la file d'attente est manipulée en
exclusion mutuelle afin de garantir l'intégrité des opérations
d'ajout et de retrait.
f est la file d'attente partagée, et m le verrou
d'exclusion mutuelle.
# let f = Queue.create () and m = Mutex.create () ;;
val f : '_a Queue.t = <abstr>
val m : Mutex.t = <abstr>
Nous divisons l'activité d'un producteur en deux parties : créer
un produit (fonction produire) et stocker un produit
(fonction stocker). Seule l'opération de stockage a besoin
du verrou.
# let produire i p d =
incr p ;
Thread.delay d ;
Printf.printf "Le producteur (%d) a produit %d\n" i !p ;
flush stdout ;;
val produire : int -> int ref -> float -> unit = <fun>
# let stocker i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Le producteur (%d) a ajouté son %d-ieme produit\n" i !p ;
flush stdout ;
Mutex.unlock m ;;
val stocker : int -> int ref -> unit = <fun>
Le code du producteur est une boucle sans fin de création et de
stockage. Nous introduisons un délai aléatoire d'attente à la
fin de chaque itération afin d'obtenir un effet de
désynchronisation dans l'exécution.
# let producteur i =
let p = ref 0 and d = Random.float 2.
in while true do
produire i p d ;
stocker i p ;
Thread.delay (Random.float 2.5)
done ;;
val producteur : int -> unit = <fun>
La seule opération du consommateur est le retrait d'un élément de
la file prenant garde à l'existence effective d'un produit.
# let consommateur i =
while true do
Mutex.lock m ;
( try
let ip, p = Queue.take f
in Printf.printf "Le consommateur(%d) " i ;
Printf.printf "a retiré le produit (%d,%d)\n" ip p ;
flush stdout ;
with
Queue.Empty ->
Printf.printf "Le consommateur(%d) " i ;
print_string "est reparti les mains vides\n" ) ;
Mutex.unlock m ;
Thread.delay (Random.float 2.5)
done ;;
val consommateur : int -> unit = <fun>
Le programme de test suivant crée quatre producteurs et quatre consommateurs.
for i = 0 to 3 do
ignore (Thread.create producteur i);
ignore (Thread.create consommateur i)
done ;
while true do Thread.delay 5. done ;;
Attente et synchronisation
La relation d'exclusion mutuelle n'est pas assez fine pour
décrire la synchronisation entre threads.
Il n'est pas rare que le travail d'un processus dépende
de la réalisation d'une action par un autre processus, modifiant par là une
certaine condition.
Il est alors souhaitable que les processus puissent
communiquer le fait que cette condition ait pu changer, indiquant
aux processus en attente de la tester de nouveau.
Les différents prrcessus sont alors en relation d'exclusion mutuelle avec communication.
Dans l'exemple précédent, un consommateur, plutôt que repartir les
mains vides pourrait attendre qu'un producteur vienne réapprovisionner
le stock. Ce dernier pourra signaler au consommateur en attente qu'il
y a quelque chose à emporter.
Le modèle de l'attente sur condition d'une prise de verrou est connu sous
le nom de sémaphore.
Sémaphores
Un sémaphore est une variable
entière s ne pouvant prendre que des valeurs positives (ou nulles).
Une fois s initialisé, les seules opérations admises
sont : wait(s) et signal(s), notées respectivement P(s) et V(s).
Elles sont définies ainsi, s correspond au nombre de ressources d'un type donné.
-
wait(s) : si s > 0 alors s := s -1, sinon l'exécution du
processus ayant appelé wait(s) est suspendue.
- signal(s) : si un processus a été suspendu lors d'une exécution antérieure
d'un wait(s) alors le réveiller, sinon s := s + 1.
Un sémaphore ne prenant que les valeurs 0 ou 1 est appelé
sémaphore binaire.
Module Condition
Les fonctions du module Condition implantent des primitives
de mise en sommeil et de réveil de processus sur un signal. Un
signal, dans ce cadre, est une variable partagée par l'ensemble des
processus. Son type est abstrait et les fonctions de manipulation
sont :
-
create
- : unit -> Condition.t qui crée
un nouveau signal.
- signal
- : Condition.t -> unit qui
réveille l'un des processus en attente du signal.
- broadcast
- : Condition.t -> unit qui
réveille l'ensemble des processus en attente du signal.
- wait
- : Condition.t -> Mutex.t -> unit
qui suspend le processus appelant sur le signal passé en premier
argument. Le second argument est un verrou utilisé pour protéger
la manipulation du signal. Il est libéré, puis repositionné à
chaque exécution de la fonction.
Producteurs et consommateurs (2)
Nous reprenons l'exemple des producteurs et consommateurs en
utilisant le mécanisme des variable de condition pour mettre en
attente un consommateur arrivant alors que le magasin est vide.
# let c = Condition.create () ;;
val c : Condition.t = <abstr>
# let stocker2 i p =
Mutex.lock m ;
Queue.add (i,!p) f ;
Printf.printf "Le producteur (%d) a ajoute son %d-ème produit\n" i !p ;
flush stdout ;
Condition.signal c ;
Mutex.unlock m ;;
val stocker2 : int -> int ref -> unit = <fun>
# let producteur2 i =
let p = ref 0 in
let d = Random.float 2.
in while true do
produire i p d;
stocker2 i p;
Thread.delay (Random.float 2.5)
done ;;
val producteur2 : int -> unit = <fun>
L'activité du consommateur se fait alors en deux temps :
attendre qu'un produit soit disponible puis emporter le produit. Le
verrou est pris quand l'attente prend fin et il est rendu dès
que le consommateur a emporté son produit. L'attente se fait sur
la variable c.
# let attendre2 i =
Mutex.lock m ;
while Queue.length f = 0 do
Printf.printf "Le consommateur (%d) attend\n" i ;
Condition.wait c m
done ;;
val attendre2 : int -> unit = <fun>
# let emporter2 i =
let ip, p = Queue.take f in
Printf.printf "Le consommateur (%d) " i ;
Printf.printf "emporte le produit (%d, %d)\n" ip p ;
flush stdout ;
Mutex.unlock m ;;
val emporter2 : int -> unit = <fun>
# let consommateur2 i =
while true do
attendre2 i;
emporter2 i;
Thread.delay (Random.float 2.5)
done ;;
val consommateur2 : int -> unit = <fun>
Notons qu'il n'est plus besoin de vérifier l'existence effective d'un
produit puisqu'un consommateur doit attendre l'existence d'un produit
dans la file d'attente pour que sa suspension prenne fin. Vu que la
fin de son attente correspond à la prise du verrou, il ne court pas
le risque de se faire dérober le nouveau produit avant de l'avoir
emporté.
Lecteurs et écrivain
Voici un autre exemple classique de processus concurrents dans lequel
les agents n'ont pas le même comportement vis-à-vis de la donnée
partagée.
Un écrivain et des lecteurs opèrent sur une donnée
partagée. L'action du premier est susceptible de rendre
momentanément les données incohérentes alors que les seconds
n'ont qu'une action passive. La difficulté vient du fait que nous ne
souhaitons pas interdire à plusieurs lecteurs de consulter la
donnée simultanément.
Une solution à ce problème est de gérer un compteur n
dénombrant les lecteurs en train d'accéder aux
données. L'écriture ne sera acceptable que si le nombre de
lecteurs est nul.
# let data = ref 0 ;;
val data : int ref = {contents=0}
# let n = ref 0 ;;
val n : int ref = {contents=0}
# let m = Mutex.create () ;;
val m : Mutex.t = <abstr>
# let c = Condition.create () ;;
val c : Condition.t = <abstr>
La donnée est symbolisée par un entier valant 0 lorsqu'elle est
cohérente.
Les opérations sur le compteur sont protégées par son verrou.
# let cpt_incr () = Mutex.lock m ; incr n ; Mutex.unlock m ;;
val cpt_incr : unit -> unit = <fun>
# let cpt_decr () = Mutex.lock m ; decr n ; Mutex.unlock m ;;
val cpt_decr : unit -> unit = <fun>
# let cpt_signal () = Mutex.lock m ;
if !n=0 then Condition.signal c ;
Mutex.unlock m ;;
val cpt_signal : unit -> unit = <fun>
Les lecteurs mettent à jour le compteur et n'émettent le signal
indiquant qu'ils ont libéré la donnée que si plus aucun lecteur
n'est présent.
# let lire i =
cpt_incr () ;
Printf.printf "Le lecteur (%d) lit (donnée=%d)\n" i !data ;
Thread.delay (Random.float 1.5) ;
Printf.printf "Le lecteur (%d) a fini sa lecture\n" i ;
cpt_decr () ;
cpt_signal () ;;
val lire : int -> unit = <fun>
# let lecteur i = while true do lire i ; Thread.delay (Random.float 1.5) done ;;
val lecteur : int -> unit = <fun>
L'écrivain tente de bloquer le compteur pour interdire aux lecteurs
d'accéder à la donnée partagée. Mais il ne peut le faire que
si le compteur est nul, sinon l'écrivain attend le signal lui
indiquant que c'est le cas.
# let ecrire () =
Mutex.lock m ;
while !n<>0 do Condition.wait c m done ;
print_string "L'écrivain écrit\n" ; flush stdout ;
data := 1 ; Thread.delay (Random.float 1.) ; data := 0 ;
Mutex.unlock mn ;;
Characters 206-208:
Unbound value mn
# let ecrivain () =
while true do ecrire () ; Thread.delay (Random.float 1.5) done ;;
Characters 36-42:
Unbound value ecrire
On crée pour tester ces fonctions un écrivain et six lecteurs.
ignore (Thread.create ecrivain ());
for i=0 to 5 do ignore(Thread.create lecteur i) done;
while true do Thread.delay 5. done ;;
Cette solution garantit que l'écrivain et les lecteurs n'ont pas accès
aux données en même temps. Mais rien ne garantit que l'écrivain
puisse un jour remplir son office, nous sommes là encore confronter
à un cas de famine.