Communication synchrone
Le module Event de la bibliothèque des processus légers implante
la communication de valeurs quelconques entre deux processus
via des canaux de communication particuliers. La communication effective
de la valeur est synchronisée au moyen d'événements d'émission ou de
réception.
Ce modèle de communication synchronisée sur des événements permet de
transférer sur des canaux typés des valeurs du langage, y compris
des fermetures, des objets ou des événements.
Il est décrit dans [Rep92].
Synchronisation sur événements de communication
Les événements primitifs de communication sont :
-
send c v envoie une valeur v sur le canal c
- receive c réceptionne une valeur sur la canal c
Pour qu'ils réalisent l'action physique qui leur est associée, deux
événements doivent être synchronisés. Pour cela on introduit une
opération de synchronisation (sync) sur les événements.
L'émission et la réception d'une valeur ne sont effectives que lorsque
les deux processus communicants sont en phase. Si
un seul processus cherche à se synchroniser, l'opération est bloquante
et attend que le deuxième processus effectue sa synchronisation. Cela
implique qu'un émetteur cherchant à synchroniser l'envoi d'une valeur
(sync
(send
c
v)) peut se retrouver bloqué en attente d'une
synchronisation du récepteur (sync
(receive
c)).
Valeurs transmises
Les canaux de communication par où transitent les valeurs échangées
sont typés : seules les valeurs du type paramètre du canal y sont
acceptées. Rien n'empêche de créer de nombreux canaux par type de
valeur à communiquer. Comme cette communication s'effectue entre
processus légers Objective CAML, n'importe quelle valeur du langage peut
être envoyée sur un canal de son type. Cela est valable pour les
fermetures, les objets et aussi des événements pour une demande de
synchronisation déportée.
Module Event
Les valeurs encapsulées dans les événements de communication
transitent par des canaux de communication du type abstrait 'a
channel. La fonction de création d'un canal est :
# Event.new_channel
;;
- : unit -> 'a Event.channel = <fun>
Les événements d'émission et de réception sont créés par un appel aux fonctions :
# Event.send
;;
- : 'a Event.channel -> 'a -> unit Event.event = <fun>
# Event.receive
;;
- : 'a Event.channel -> 'a Event.event = <fun>
On peut considérer les fonctions send et receive
comme des constructeurs du type abstrait 'a
event. L'événement construit à partir de l'émission ne conserve pas
une information de type de la valeur à transmettre (type unit
Event.event). Par contre l'événement de réception en tient compte
pour récupérer la valeur pendant une synchronisation. Ces fonctions ne sont
pas bloquantes dans la mesure où la transmission d'une valeur n'est
réalisée qu'au moment de la synchronisation de deux processus par la
fonction :
# Event.sync
;;
- : 'a Event.event -> 'a = <fun>
Cette fonction peut être est bloquante pour l'émetteur et le récepteur.
Il en existe une version non bloquante :
# Event.poll
;;
- : 'a Event.event -> 'a option = <fun>
Cette fonction vérifie qu'un autre processus est en attente de
synchronisation. Si c'est le cas elle réalise les transmissions, et
retourne la valeur Some v si v est la valeur
associée à l'événement, et None sinon. Le message reçu,
extrait par la fonction sync peut être le résultat d'un
processus plus ou moins compliqué mettant en oeuvre d'autres
échanges de messages.
Exemple de synchronisation
On définit trois processus légers, le premier t1 envoie une chaîne
de caractères sur le canal c (fonction g)
partagé par tous les processus. Les deux autres t2 et t3
attendent une valeur sur ce même canal.
Voici les fonctions exécutées par les différents processus :
# let
c
=
Event.new_channel
();;
val c : '_a Event.channel = <abstr>
# let
f
()
=
let
ids
=
string_of_int
(Thread.id
(Thread.self
()))
in
print_string
("-------- avant -------"
^
ids)
;
print_newline()
;
let
e
=
Event.receive
c
in
print_string
("-------- pendant -------"
^
ids)
;
print_newline()
;
let
v
=
Event.sync
e
in
print_string
(v
^
" "
^
ids
^
" "
)
;
print_string
("-------- après -------"
^
ids)
;
print_newline()
;;
val f : unit -> unit = <fun>
# let
g
()
=
let
ids
=
string_of_int
(Thread.id
(Thread.self
()))
in
print_string
("Début de "
^
ids
^
"\n"
);
let
e2
=
Event.send
c
"hello"
in
Event.sync
e2
;
print_string
("Fin de "
^
ids)
;
print_newline
()
;;
val g : unit -> unit = <fun>
Les trois processus sont alors créés et exécutés :
# let
t1,
t2,
t3
=
Thread.create
f
(),
Thread.create
f
(),
Thread.create
g
();;
val t1 : Thread.t = <abstr>
val t2 : Thread.t = <abstr>
val t3 : Thread.t = <abstr>
# Thread.delay
1
.
0
;;
Début de 5
-------- avant -------6
-------- pendant -------6
hello 6 -------- après -------6
-------- avant -------7
-------- pendant -------7
Fin de 5
- : unit = <unknown constructor>
L'émission peut être bloquante. La trace de fin de
t1 s'affiche après les traces de synchronisation de
t2 et t3. Seul un des deux processus t1 ou
t2 est vraiment terminé, comme le montrent les appels
suivants :
# Thread.kill
t1;;
- : unit = ()
# Thread.kill
t2;;
Uncaught exception: Failure("Thread.kill: killed thread")