Die Boost C++ Bibliotheken


Kapitel 6: Multithreading


Inhaltsverzeichnis

Dieses Buch ist unter einer Creative Commons-Lizenz lizensiert.

Die englische Übersetzung dieses Buchs ist im Buchhandel verfügbar! Es handelt sich dabei um eine aktualisierte Version, die auf den Boost C++ Bibliotheken 1.47.0 vom Juli 2011 basiert. So wurde das Buch im Hinblick auf neue Versionen verschiedener Boost Bibliotheken aktualisiert (zum Beispiel auf Boost.Spirit 2.x, Boost.Signals 2 und Boost.Filesystem 3). Mit insgesamt 38 Boost Bibliotheken werden außerdem mehr Bibliotheken als je zuvor vorgestellt (neu sind zum Beispiel Boost.CircularBuffer, Boost.Intrusive und Boost.MultiArray). Das Buch kann bei Amazon oder anderen Buchhändlern bestellt werden. Zusätzliche Informationen zum Buch finden Sie beim Verlag XML Press.


6.1 Allgemeines

Threads sind sogenannte Programmfäden, mit denen es möglich ist, Funktionen in einem Programm gleichzeitig ablaufen zu lassen. Dies ist zum Beispiel dann wichtig, wenn bekannt ist, dass eine Funktion für eine Berechnung längere Zeit benötigt, eine andere Funktion jedoch nicht zwischenzeitlich warten soll. Dank Threads können Funktionen tatsächlich zeitgleich laufen, so dass keine Funktion auf die andere warten muss.

Wenn ein Programm startet, gibt es standardmäßig nur einen einzigen Thread. Die Funktion main() läuft in diesem einzigen Thread ab, und alle Funktionen, die von main() aufgerufen werden, werden nacheinander ausgeführt. Ein derartiges Programm mit nur einem Thread bezeichnet man als single-threaded.

Programme, die neue Threads erstellen, nennt man multithreaded. Sie besitzen nicht nur wie beschrieben den Vorteil, mehrere Funktionen gleichzeitig laufen lassen zu können. Sie werden auch deswegen immer wichtiger, weil heute in Computern eingebaute Prozessoren häufig mehr als einen Kern besitzen. Da mehrere Kerne gleichzeitig Funktionen ausführen können, entsteht ein gewisser Druck auf Entwickler, die in Computern vorhandenen Rechenkapazitäten entsprechend zu nutzen. Während früher also vor allem dann Threads eingesetzt wurden, wenn man Funktionen gleichzeitig ausführen wollte, werden Entwickler langsam, aber sicher dazu gezwungen, ihre Programme mit Hilfe von Threads so zu strukturieren, dass Funktionen entwickelt werden, die gleichzeitig ausgeführt werden können. Kenntnisse in der Entwicklung von Multithreaded-Programmen werden also aufgrund von Mehrkernprozessoren, wie sie heute verstärkt eingesetzt werden, immer wichtiger.

In diesem Kapitel lernen Sie die Boost C++ Bibliothek Thread kennen, mit der plattformunabhängig Multithreaded-Programme entwickelt werden können.


6.2 Thread Management

Die wichtigste Klasse in dieser Bibliothek ist boost::thread. Sie ist in der Headerdatei boost/thread.hpp definiert und wird verwendet, um einen neuen Thread zu starten. Im folgenden Beispiel sehen Sie, wie sie angewandt wird.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    std::cout << i << std::endl; 
  } 
} 

int main() 
{ 
  boost::thread t(thread); 
  t.join(); 
} 

Dem Konstruktor von boost::thread wird der Name der Funktion übergeben, die als Thread gestartet werden soll. Wenn also im obigen Programm die Variable t erstellt wird, beginnt die Funktion thread() sofort in einem eigenen Thread zu laufen. Die Funktion thread() wird also ab dann gleichzeitig zur Funktion main() ausgeführt.

Weil das Programm wie gewohnt nach Ablauf der Funktion main() beendet werden würde, wird für den soeben gestarteten Thread join() aufgerufen. Es handelt sich dabei um eine blockierende Methode: join() hält den aktuellen Thread an und kehrt erst dann zurück, wenn der Thread, für den join() aufgerufen wurde, beendet wurde. Dies bedeutet für obiges Programm, dass main() solange wartet, bis thread() seine Arbeit getan hat und zurückkehrt.

Beachten Sie, dass Sie über eine Variable wie t auf einen Thread zugreifen können, um zum Beispiel mit join() auf dessen Beendigung zu warten, der Thread aber auch dann weiterlaufen würde, wenn der Gültigkeitsbereich von t enden und die Variable zerstört werden würde. Ein Thread ist anfangs an eine Variable vom Typ boost::thread gebunden, läuft aber auch dann weiter, wenn die Variable nicht mehr existiert. Sie können sogar eine Methode detach() aufrufen, mit der Sie eine Variable vom Typ boost::thread von einem Thread entkoppeln können. Danach können Sie zum Beispiel nicht mehr join() aufrufen, weil die Variable dann keinen Thread mehr repräsentiert.

Sie können in einem Thread grundsätzlich all das machen, was Sie auch in jeder beliebigen anderen Funktion machen können. Letztendlich ist der Thread nichts anderes als eine Funktion - nur eben mit der Besonderheit, dass sie zeitgleich zu anderen Funktionen ausgeführt wird. Im obigen Programm zum Beispiel werden in einer Schleife fünf Zahlen auf die Standardausgabe ausgegeben. Damit die Datenausgabe nicht zu schnell erfolgt, wird in jedem Schleifendurchgang die Funktion wait() aufgerufen, mit der die Ausführung des aktuellen Threads für jeweils eine Sekunde angehalten wird. Die Funktion wait() greift dazu auf eine freistehende Funktion namens sleep() zu, die ebenfalls aus Boost.Thread stammt und im Namensraum boost::this_thread definiert ist.

Die Funktion sleep() erwartet eine Zeitspanne oder einen Zeitpunkt, bis wann der aktuelle Thread angehalten werden soll. Indem ein Objekt vom Typ boost::posix_time::seconds übergeben wird, wird eine Zeitspanne definiert, für die der aktuelle Thread angehalten wird. Die Klasse boost::posix_time::seconds stammt hierbei aus der Bibliothek Boost.DateTime, auf die von Boost.Thread zugegriffen wird, weil sie zur Verarbeitung von Zeitangaben entwickelt wurde.

Während Sie im obigen Programm gesehen haben, wie ein Thread auf einen anderen warten kann, lernen Sie im Folgenden sogenannte Unterbrechungspunkte kennen, mit Hilfe derer ein Thread einen anderen unterbrechen kann.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

void thread() 
{ 
  try 
  { 
    for (int i = 0; i < 5; ++i) 
    { 
      wait(1); 
      std::cout << i << std::endl; 
    } 
  } 
  catch (boost::thread_interrupted&) 
  { 
  } 
} 

int main() 
{ 
  boost::thread t(thread); 
  wait(3); 
  t.interrupt(); 
  t.join(); 
} 

Wenn Sie die Methode interrupt() aufrufen, wird der entsprechende Thread unterbrochen. Unterbrochen bedeutet, dass im Thread eine Ausnahme vom Typ boost::thread_interrupted geworfen wird. Dies geschieht aber nur dann, wenn der Thread einen Unterbrechungspunkt erreicht.

Der alleinige Aufruf von interrupt() bewirkt nichts, wenn ein Thread keine Unterbrechungspunkte enthält. Beim Aufruf von interrupt() wird also nicht sofort eine Ausnahme vom Typ boost::thread_interrupted geworfen. Stattdessen überprüft der Thread in den bereits erwähnten Unterbrechungspunkten, ob interrupt() aufgerufen wurde - ist dies der Fall, wird eine Ausnahme vom Typ boost::thread_interrupted geworfen.

Boost.Thread definiert eine Reihe von Unterbrechungspunkten. Ein Unterbrechungspunkt ist zum Beispiel sleep(). Da sleep() aufgrund der Schleife in thread() fünfmal aufgerufen wird, überprüft der Thread fünfmal, ob er unterbrochen werden soll. Zwischen den Aufrufen von sleep() kann der Thread im obigen Programm also nicht unterbrochen werden, und es wird keine Ausnahme vom Typ boost::thread_interrupted geworfen.

Wenn Sie das obige Programm laufen lassen, stellen Sie fest, dass nur mehr drei Zahlen auf die Standardausgabe ausgegeben werden. Der Grund ist, dass in der Funktion main() nach drei Sekunden interrupt() aufgerufen wird. Damit wird thread() nach drei Sekunden unterbrochen, und es wird eine Ausnahme vom Typ boost::thread_interrupted geworfen. Diese wird zwar im Thread abgefangen, ohne dass etwas im catch-Block geschieht. Weil aber dann die Funktion thread() zurückkehrt, endet der Thread - und damit auch das gesamte Programm, da main() lediglich mit join() auf das Ende des Threads wartet.

Boost.Thread definiert rund zehn Unterbrechungspunkte, zu denen wie eben gesehen die Funktion sleep() zählt. Dank dieser Unterbrechungspunkte lassen sich relativ einfach und zeitnah Threads unterbrechen. Dadurch, dass immer erst ein Unterbrechungspunkt erreicht werden muss, um zu überprüfen, ob eine Ausnahme vom Typ boost::thread_interrupted geworfen werden muss, sind Unterbrechungspunkte aber nicht zwangsläufig die jeweils beste Wahl.

Abschließend sollen noch zwei Funktionen vorgestellt werden, um eine Vorstellung davon zu bekommen, welche sonstigen nützlichen Funktionen Boost.Thread anzubieten hat.

#include <boost/thread.hpp> 
#include <iostream> 

int main() 
{ 
  std::cout << boost::this_thread::get_id() << std::endl; 
  std::cout << boost::thread::hardware_concurrency() << std::endl; 
} 

Über den Namensraum boost::this_thread können verschiedene freistehende Funktionen aufgerufen werden, die sich auf den aktuellen Thread beziehen. sleep() hatten Sie schon kennengelernt. Eine weitere Funktionen ist get_id(): Sie gibt eine Nummer zurück, mit der der aktuelle Thread identifiziert werden kann. get_id() steht auch als Methode für die Klasse boost::thread zur Verfügung.

Die Methode hardware_concurrency(), die als statische Methode zur Klasse boost::thread gehört, gibt die Zahl der Threads zurück, die tatsächlich dank verfügbarer Prozessoren oder Prozessorkerne gleichzeitig ausgeführt werden können. So wird zum Beispiel auf Computern mit den heute weit verbreiteten Doppelkernprozessoren der Wert 2 zurückgegeben. Auf diese Weise kann zum Beispiel recht einfach ermittelt werden, wie viele Threads in einem Multithreaded-Programm maximal eingesetzt werden sollten.


6.3 Synchronisation

Während der Einsatz mehrerer Threads die Performance eines Programms erhöhen kann, erhöht sich üblicherweise auch die Komplexität. Denn wenn mehrere Funktionen dank Threads gleichzeitig ausgeführt werden, müssen Zugriffe dieser Threads auf globale Ressourcen, die also mehreren Threads zur Verfügung stehen, synchronisiert werden. Die Synchronisation von Threads kann in größeren Programmen sehr schwierig sein und erfordert viel Detailarbeit. Im Folgenden lernen Sie die Klassen kennen, die Boost.Thread zur Synchronisation von Threads zur Verfügung stellt.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    mutex.lock(); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
    mutex.unlock(); 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

Multithreaded-Programme verwenden zur Synchronisation Objekte, die Mutex genannt werden. Boost.Thread stellt entsprechend verschiedene Mutex-Klassen zur Verfügung, von denen die einfachste boost::mutex ist. Das Grundprinzip eines Mutex ist: Wenn ein Thread einen Mutex in Beschlag nimmt, können andere Threads den gleichen Mutex erst dann ihrerseits in Beschlag nehmen, wenn der Mutex wieder freigegeben wurde. Auf diese Weise können Threads gezwungen werden zu warten, bis ein anderer Thread bestimmte Operationen ausgeführt und anschließend einen Mutex wieder freigegeben hat.

Im obigen Programm wird ein globales Objekt mutex vom Typ boost::mutex verwendet. Dieser Mutex wird innerhalb der for-Schleife in der Funktion thread() kurz vor dem Zugriff auf die Standardausgabe std::cout in Beschlag genommen. Dies geschieht über den Aufruf der Methode lock(). Nachdem eine Meldung auf die Standardausgabe ausgegeben wurde, wird der Mutex mit unlock() wieder freigegeben.

In der Funktion main() wird nun die Funktion thread() in zwei Threads gestartet. Jeder dieser beiden Threads zählt nun innerhalb einer for-Schleife bis fünf und gibt in jedem Schleifendurchgang eine Meldung auf die Standardausgabe aus. Das Problem ist jedoch, dass die Standardausgabe ein globales Objekt ist, das von beiden Threads geteilt wird. Laut dem C++ Standard gibt es keine speziellen Garantien, dass die Standardausgabe std::cout von mehreren Threads gleichzeitig verwendet werden kann. Deswegen muss der Zugriff auf die Standardausgabe synchronisiert werden, was bedeutet: Zu jedem Zeitpunkt im Programm darf nur ein einziger Thread auf std::cout zugreifen.

Indem nun beide Threads vor dem Zugriff auf die Standardausgabe versuchen, den gleichen Mutex in Beschlag zu nehmen, ist garantiert, dass höchstens ein Thread auf die Standardausgabe zugreift. Egal, welcher Thread erfolgreich lock() aufruft - der andere Thread muss warten, bis unlock() aufgerufen wurde.

Diese für Mutexe typische Vorgehensweise - das in Beschlagnehmen und wieder Freigeben - wird durch verschiedene Datentypen in Boost.Thread unterstützt. Anstatt lock() und unlock() selbst aufzurufen, kann zum Beispiel die Klasse boost::lock_guard verwendet werden.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    boost::lock_guard<boost::mutex> lock(mutex); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

Im obigen Programm wird lock() für den Mutex im Konstruktor und unlock() im Destruktor von boost::lock_guard automatisch aufgerufen. Der Zugriff ist also wie zuvor beim expliziten Aufruf von lock() und unlock() synchronisiert. Die Klasse boost::lock_guard ist demnach ein weiteres Beispiel für das RAII-Prinzip, das Sie im Kapitel 2, Smart Pointers kennengelernt haben.

Neben boost::mutex und boost::lock_guard bietet Boost.Thread weitere Klassen an, die verschiedene Spielarten der Synchronisation unterstützen. Eine wichtige Klasse ist dabei boost::unique_lock, die im Vergleich zu boost::lock_guard eine Reihe nützlicher Methoden anbietet und nicht nur aus einem Konstruktor und Destruktor besteht.

#include <boost/thread.hpp> 
#include <iostream> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::timed_mutex mutex; 

void thread() 
{ 
  for (int i = 0; i < 5; ++i) 
  { 
    wait(1); 
    boost::unique_lock<boost::timed_mutex> lock(mutex, boost::try_to_lock); 
    if (!lock.owns_lock()) 
      lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1)); 
    std::cout << "Thread " << boost::this_thread::get_id() << ": " << i << std::endl; 
    boost::timed_mutex *m = lock.release(); 
    m->unlock(); 
  } 
} 

int main() 
{ 
  boost::thread t1(thread); 
  boost::thread t2(thread); 
  t1.join(); 
  t2.join(); 
} 

Im obigen Beispiel werden verschiedenen Methoden eingesetzt, um die Features von boost::unique_lock vorzustellen. Der Einsatz dieser Features macht in diesem Beispiel nicht unbedingt Sinn - so war der Einsatz von boost::lock_guard im vorherigen Beispiel ausreichend und zielführend. Dieses Beispiel dient lediglich zur Veranschaulichung, was mit boost::unique_lock möglich ist.

Die Klasse boost::unique_lock bietet mehrere Konstruktoren an. Je nach Konstruktor sieht der Versuch, einen Mutex in Beschlag zu nehmen, anders aus. Der Konstruktor von boost::unique_lock, dem lediglich ein Mutex übergeben wird, ruft lock() auf und wartet, bis der Mutex in Beschlag genommen werden konnte. Dieser Konstruktor funktioniert also genauso wie der von boost::lock_guard.

Wird der Konstruktor aufgerufen, dem als zweiter Parameter ein Wert vom Typ boost::try_to_lock_t übergeben werden kann, wird für den Mutex nicht lock(), sondern try_lock() aufgerufen. Diese Methode gibt einen Wert vom Typ bool zurück: true, wenn der Mutex in Beschlag genommen werden konnte, false, wenn nicht. Der Unterschied zu lock() ist also, dass try_lock() sofort zurückkehrt und keine blockierende Methode ist, die darauf wartet, dass der Mutex in Beschlag genommen werden kann.

Im obigen Programm wird dem Konstruktor von boost::unique_lock als zweiter Parameter boost::try_to_lock übergeben. Über die Methode owns_lock() kann anschließend herausgefunden werden, ob der Mutex in Beschlag genommen werden konnte oder nicht. Ist dies nicht der Fall - owns_lock() gibt dann false zurück - wird auf eine weitere Methode der Klasse boost::unique_lock zugegriffen: Mit timed_lock() kann für eine bestimmte Zeit darauf gewartet werden, einen Mutex in Beschlag zu nehmen. Dadurch, dass im obigen Beispiel bis zu einer Sekunde gewartet wird, sollte der aktuelle Thread dann auf jeden Fall den Mutex in Beschlag genommen haben.

Sie haben anhand dieses Beispiels auch gesehen, dass es grundsätzlich drei verschiedene Möglichkeiten gibt, einen Mutex in Beschlag zu nehmen: Mit lock() wird solange gewartet, bis der Mutex in Beschlag genommen werden konnte. Mit try_lock() wird der Mutex nur dann in Beschlag genommen, wenn er gerade frei ist - ansonsten gibt die Methode false zurück, wartet aber nicht. Und mit timed_lock() wird für eine bestimmte Zeit versucht, den Mutex in Beschlag zu nehmen. Auch hier wird über den Rückgabewert vom Typ bool ersichtlich, ob der Mutex innerhalb der vorgegebenen Zeit in Beschlag genommen werden konnte oder nicht.

Während die Klasse boost::mutex sowohl lock() als auch try_lock() anbietet, wird timed_lock() lediglich von der Mutex-Klasse boost::timed_mutex unterstützt. Das ist der Grund, warum diese Klasse im obigen Programm verwendet werden muss. Würde timed_lock() nicht verwendet werden, könnte wie im vorherigen Beispiel der Mutex vom Typ boost::mutex sein.

So wie boost::lock_guard gibt auch der Destruktor von boost::unique_lock einen Mutex wieder frei. Sie haben bei boost::unique_lock aber auch die Möglichkeit, ihn manuell über unlock() freizugeben. Sie können sogar so wie im obigen Beispiel mit release() den Mutex vom Lock entkoppeln - der Destruktor von boost::unique_lock gibt dann den Mutex nicht mehr frei, da er den Mutex nicht mehr besitzt. Sie sind dann logischerweise gezwungen, per unlock() den Mutex selbst wieder freizugeben.

Die Klasse boost::unique_lock ist ein sogenannter exklusiver Lock. Das bedeutet, dass jeweils nur ein Thread mit dieser Klasse einen Mutex in Beschlag nehmen kann und andere Threads warten müssen, bis der Mutex wieder freigegeben wurde. Neben exklusiven Locks gibt es auch nicht-exklusive Locks. Boost.Thread bietet hierfür die Klasse boost::shared_lock an, die zusammen mit einem Mutex vom Typ shared_mutex verwendet werden muss. Im folgenden Beispiel wird Ihnen gezeigt, wie dieser nicht-exklusive Lock eingesetzt werden kann.

#include <boost/thread.hpp> 
#include <iostream> 
#include <vector> 
#include <cstdlib> 
#include <ctime> 

void wait(int seconds) 
{ 
  boost::this_thread::sleep(boost::posix_time::seconds(seconds)); 
} 

boost::shared_mutex mutex; 
std::vector<int> random_numbers; 

void fill() 
{ 
  std::srand(static_cast<unsigned int>(std::time(0))); 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::shared_mutex> lock(mutex); 
    random_numbers.push_back(std::rand()); 
    lock.unlock(); 
    wait(1); 
  } 
} 

void print() 
{ 
  for (int i = 0; i < 3; ++i) 
  { 
    wait(1); 
    boost::shared_lock<boost::shared_mutex> lock(mutex); 
    std::cout << random_numbers.back() << std::endl; 
  } 
} 

int sum = 0; 

void count() 
{ 
  for (int i = 0; i < 3; ++i) 
  { 
    wait(1); 
    boost::shared_lock<boost::shared_mutex> lock(mutex); 
    sum += random_numbers.back(); 
  } 
} 

int main() 
{ 
  boost::thread t1(fill); 
  boost::thread t2(print); 
  boost::thread t3(count); 
  t1.join(); 
  t2.join(); 
  t3.join(); 
  std::cout << "Summe: " << sum << std::endl; 
} 

Nicht-exklusive Locks vom Typ boost::shared_lock können dann verwendet werden, wenn Threads lediglich lesend auf eine Ressource zugreifen. Ein Thread, der eine Ressource verändert und daher schreibend auf sie zugreift, benötigt einen exklusiven Lock. Das sollte einleuchtend sein: Threads, die lediglich lesend auf eine Ressource zugreifen, merken schließlich gar nicht, dass eine Ressource zeitgleich in einem anderen Thread gelesen wird. Nicht-exklusive Locks können daher einen Mutex mit anderen nicht-exklusiven Locks teilen.

Im obigen Beispiel greifen die beiden Funktionen print() und count() lesend auf random_numbers zu. Während print() die letzte Zahl in random_numbers auf die Standardausgabe ausgibt, addiert count() sie zur Variablen sum hinzu. Weil beide Funktionen random_numbers nicht ändern, können sie gleichzeitig auf diese Variable zugreifen. Deswegen wird der Zugriff auf random_numbers mit einem nicht-exklusiven Lock vom Typ boost::shared_lock synchronisiert.

In der Funktion fill() jedoch wird ein exklusiver Lock vom Typ boost::unique_lock benötigt, da in dieser Funktion neue Zufallszahlen in den Container random_numbers eingefügt werden. Damit der Mutex freigegeben wird, bevor in der for-Schleife der Funktion fill() eine Sekunde gewartet wird, wird explizit unlock() aufgerufen. Die Funktion wait() wird im Gegensatz zum vorherigen Beispiel nicht zu Beginn, sondern am Ende der for-Schleife aufgerufen, damit der Container random_numbers auf alle Fälle eine Zufallszahl enthält, bevor print() und count() zum ersten Mal auf den Container zugreifen. In diesen Funktionen wird daher wait() zu Beginn der for-Schleifen aufgerufen.

Wenn Ihnen die Aufrufe von wait() an den unterschiedlichen Stellen in den for-Schleifen nicht geheuer sind, haben Sie Recht: Je nachdem, welche Threads wann und wie schnell vom Prozessor ausgeführt werden, kann die Reihenfolge durcheinander kommen. Mit Hilfe sogenannter Bedingungsvariablen können die Threads jedoch so synchronisiert werden, dass Zahlen sofort dann, wenn sie dem Container random_numbers hinzugefügt wurden, in einem anderen Thread verarbeitet werden.

#include <boost/thread.hpp> 
#include <iostream> 
#include <vector> 
#include <cstdlib> 
#include <ctime> 

boost::mutex mutex; 
boost::condition_variable_any cond; 
std::vector<int> random_numbers; 

void fill() 
{ 
  std::srand(static_cast<unsigned int>(std::time(0))); 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::mutex> lock(mutex); 
    random_numbers.push_back(std::rand()); 
    cond.notify_all(); 
    cond.wait(mutex); 
  } 
} 

void print() 
{ 
  std::size_t next_size = 1; 
  for (int i = 0; i < 3; ++i) 
  { 
    boost::unique_lock<boost::mutex> lock(mutex); 
    while (random_numbers.size() != next_size) 
      cond.wait(mutex); 
    std::cout << random_numbers.back() << std::endl; 
    ++next_size; 
    cond.notify_all(); 
  } 
} 

int main() 
{ 
  boost::thread t1(fill); 
  boost::thread t2(print); 
  t1.join(); 
  t2.join(); 
} 

Im obigen Programm wurden die Funktionen wait() und count() entfernt. Threads warten also nicht mehr pro Schleifendurchgang eine Sekunde, sondern arbeiten so schnell wie möglich. Außerdem wird keine Summe mehr gebildet - Zahlen werden lediglich auf die Standardausgabe ausgegeben.

Damit die Verarbeitung der Zufallszahlen nun nicht durcheinander kommt, müssen die Threads mit Hilfe von Bedingungsvariablen synchronisiert werden. Diese ermöglichen es, Bedingungen threadübergreifend zu überprüfen.

Die Funktion fill() generiert wie zuvor pro Schleifendurchgang eine Zufallszahl und speichert sie im Container random_numbers. Dazu muss wie zuvor ein exklusiver Lock verwendet werden, damit ein anderer Thread nicht gleichzeitig auf random_numbers zugreift, während in diesen Container eine neue Zufallszahl eingefügt wird. Am Ende der for-Schleife in der Funktion fill() wird aber nun nicht mehr einfach nur eine Sekunde gewartet, sondern auf eine Bedingungsvariable zugegriffen und die Methode notify_all() aufgerufen. Diese Methode weckt quasi alle Threads auf, in denen für die gleiche Bedingungsvariable wait() aufgerufen wurde und die daher auf eine Benachrichtigung, wie sie durch notify_all() ausgelöst wird, warten.

Wenn Sie sich die for-Schleifen in der Funktion print() ansehen, stellen Sie fest, dass dort für die gleiche Bedingungsvariable, die in der Funktion fill() verwendet wird, wait() aufgerufen wird. Wenn der Thread durch einen Aufruf von notify_all() geweckt wird, versucht er, den Mutex in Beschlag zu nehmen. Dies gelingt logischerweise nur, nachdem der Mutex in der Funktion fill() freigegeben wurde.

Der Trick ist, dass der Aufruf von wait() gleichzeitig den entsprechenden Mutex freigibt, der als Parameter übergeben wird. Da am Ende der for-Schleife in der Funktion fill() für die gleiche Bedingungsvariable, für die gerade eben notify_all() aufgerufen wurde, nun wait() aufgerufen wird, wird der Mutex freigegeben. Das heißt, die Funktion fill() wartet nun, bis jemand anderes für die Bedingungsvariable notify_all() aufruft. Dies geschieht dann in print(), nachdem in dieser Funktion die neue Zufallszahl auf die Standardausgabe ausgegeben wurde.

Beachten Sie, dass der Aufruf von wait() in der Funktion print() außerdem in einer while-Schleife erfolgt: Es könnte zum Beispiel sein, dass der Thread, der die Funktion fill() ausführt, zuerst eine Zufallszahl im Container random_numbers speichert, bevor in print() wait() für die Bedingungsvariable aufgerufen wird. Damit die Funktion print() trotzdem die erste bereits zu random_numbers hinzugefügte Zufallszahl verarbeiten, wird die Anzahl der im Container gespeicherten Zufallszahlen mit dem nächsten erwarteten Wert verglichen.

Obiges Programm funktioniert auch problemlos, wenn Sie die Locks nicht in, sondern vor die for-Schleifen setzen. Das macht genaugenommen auch mehr Sinn, da in diesem Fall die Locks nicht in jedem Schleifendurchgang initialisiert und zerstört werden. Weil der Mutex jeweils beim Aufruf von wait() freigegeben wird, ist es nicht notwendig, die Locks am Ende jedes Schleifendurchgangs zu zerstören, um auf diese Weise den Mutex freizugeben.


6.4 Thread Local Storage

Thread Local Storage, abgekürzt TLS, ist ein Speicherbereich, auf den jeweils nur ein Thread Zugriff hat. Sie können sich TLS-Variablen als globale Variablen vorstellen, die aber eben nicht global im gesamten Programm sind, sondern nur pro Thread. Welchen Sinn derartige Variablen haben, soll Ihnen anhand des folgenden Beispiels gezeigt werden.

#include <boost/thread.hpp> 
#include <iostream> 
#include <cstdlib> 
#include <ctime> 

void init_number_generator() 
{ 
  static bool done = false; 
  if (!done) 
  { 
    done = true; 
    std::srand(static_cast<unsigned int>(std::time(0))); 
  } 
} 

boost::mutex mutex; 

void random_number_generator() 
{ 
  init_number_generator(); 
  int i = std::rand(); 
  boost::lock_guard<boost::mutex> lock(mutex); 
  std::cout << i << std::endl; 
} 

int main() 
{ 
  boost::thread t[3]; 

  for (int i = 0; i < 3; ++i) 
    t[i] = boost::thread(random_number_generator); 

  for (int i = 0; i < 3; ++i) 
    t[i].join(); 
} 

Im obigen Programm werden drei Threads gestartet, die jeweils eine Zufallszahl auf die Standardausgabe ausgeben. Die Funktion random_number_generator() greift dazu auf die im C++ Standard definierte Funktion std::rand() zu. Der Zufallsgenerator, auf dem diese Funktion basiert, muss jedoch mit std::srand() initialisiert werden. Andernfalls würde das Programm jedesmal die gleiche Zufallszahl ausgeben.

Die Initialisierung des Zufallsgenerators findet in init_number_generator() statt. Dies geschieht mit Hilfe der Funktion std::time(), die die aktuelle Zeit zurückgibt. Dadurch, dass dieser Wert bei jedem Programmstart ein anderer ist, ist sichergestellt, dass der Zufallsgenerator jeweils mit einem anderen Wert initialisiert wird und somit neue Zufallszahlen ausgibt. Da der Zufallsgenerator nur einmal initialisiert werden muss, verwendet init_number_generator() außerdem eine statische Variable done als Schalter.

Wenn Sie obiges Programm mehrmals laufen lassen, stellen Sie fest, dass jedesmal zwei der drei Zufallszahlen gleich sind. Das Programm hat einen Fehler: Der Zufallsgenerator, auf dem std::rand() basiert, muss nämlich für jeden Thread initialisiert werden, in dem std::rand() verwendet wird. Die Funktion init_number_generator() ist demnach falsch implementiert, da dort tatsächlich nur ein einziges Mal im Programm std::srand() aufgerufen wird. Mit Hilfe von TLS kann der Fehler behoben werden.

#include <boost/thread.hpp> 
#include <iostream> 
#include <cstdlib> 
#include <ctime> 

void init_number_generator() 
{ 
  static boost::thread_specific_ptr<bool> tls; 
  if (!tls.get()) 
    tls.reset(new bool(false)); 
  if (!*tls) 
  { 
    *tls = true; 
    std::srand(static_cast<unsigned int>(std::time(0))); 
  } 
} 

boost::mutex mutex; 

void random_number_generator() 
{ 
  init_number_generator(); 
  int i = std::rand(); 
  boost::lock_guard<boost::mutex> lock(mutex); 
  std::cout << i << std::endl; 
} 

int main() 
{ 
  boost::thread t[3]; 

  for (int i = 0; i < 3; ++i) 
    t[i] = boost::thread(random_number_generator); 

  for (int i = 0; i < 3; ++i) 
    t[i].join(); 
} 

Die statische Variable done wurde nun durch eine TLS-Variable tls ersetzt. Sie basiert auf der Template-Klasse boost::thread_specific, die mit dem Datentyp bool instantiiert ist. Die neue Variable tls funktioniert grundsätzlich genauso wie done: Es handelt sich um einen Schalter, mit dem kontrolliert wird, ob eine Initialisierung des Zufallsgenerators bereits erfolgte. Der entscheidende Unterschied ist jedoch, dass der Wert, den tls speichert, nur im jeweiligen Thread verfügbar ist. Auch wenn die statische Variable tls nur einmal existiert, so existiert ein in tls gespeicherter Wert nur im jeweiligen Thread und ist für andere Threads nicht sichtbar.

Nachdem eine Variable vom Typ boost::thread_specific_ptr erstellt wurde, kann sie gesetzt werden. Die Klasse boost::thread_specific_ptr erwartet nun aber keine bool-Variable, sondern die Adresse einer bool-Variablen. Indem die Methode reset() aufgerufen wird, kann die Adresse einer bool-Variablen in tls gespeichert werden. Im obigen Beispiel wird eine bool-Variable dynamisch reserviert und die Adresse, die von new zurückgegeben wird, in tls gespeichert. Damit dies nur einmal geschieht und nicht jeder erneute Aufruf von init_number_generator() die Variable tls neu setzt, wird vorher mit get() überprüft, ob bereits eine Adresse in der Variablen tls gespeichert ist.

Da boost::thread_specific_ptr eine Adresse speichert, verhält sich diese Klasse wie ein Zeiger. So sind auch die Operatoren operator*() und operator->() überladen worden, um den Umgang mit einem Objekt vom Typ boost::thread_specific_ptr zu vereinfachen. So wird im obigen Programm über *tls überprüft, ob der Schalter momentan auf true oder false gesetzt ist. In Abhängigkeit davon wird der Zufallsgenerator initialisiert oder eben nicht.

Die Klasse boost::thread_specific_ptr ermöglicht also, die Adresse eines Objekts im aktuellen Thread zu speichern und dann auch nur im aktuellen Thread auf die gleiche Adresse wieder zuzugreifen. Während also in einem Thread bereits eine Adresse in einer Variablen vom Typ boost::thread_specific_ptr gespeichert sein kann, ist die Variable möglicherweise in einem anderen Thread noch nicht gesetzt.

Wenn Sie das obige Programm laufen lassen, stellen Sie wohlmöglich zu Ihrer Überraschung fest, dass trotz der TLS-Variable nicht drei unterschiedliche Zufallszahlen ausgegeben werden, sondern alle drei Zahlen gleich sind. Das liegt daran, dass alle drei Threads zur gleichen Zeit starten und die Zufallsgeneratoren daher alle mit dem gleichen Zeitpunkt initialisiert werden, der jeweils von std::time() zurückgegeben wird. Lassen Sie das Programm jedoch mehrmals hintereinander laufen, stellen Sie fest, dass sich die Zufallszahlen ändern. Die Zufallsgeneratoren werden in allen drei Threads also richtig initialisiert, wenn auch eben lediglich mit der aktuellen Zeit, die in allen drei Threads natürlich immer die gleiche ist.


6.5 Aufgaben

Sie können die Lösungen zu allen Aufgaben in diesem Buch als ZIP-Datei erwerben.

  1. Restrukturieren Sie folgendes Programm, um die Summe der Zahlen, die in der for-Schleife verarbeitet werden, mit Hilfe von zwei Threads zu berechnen. Da viele Prozessoren heute zwei Kerne besitzen und sich ein derartiger Mehrkernprozessor möglicherweise auch in Ihrem Computer befindet, wird durch die Verwendung von Threads die Ausführungsgeschwindigkeit des Programms erhöht.

    #include <boost/date_time/posix_time/posix_time.hpp> 
    #include <boost/cstdint.hpp> 
    #include <iostream> 
    
    int main() 
    { 
      boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time(); 
    
      boost::uint64_t sum = 0; 
      for (int i = 0; i < 1000000000; ++i) 
        sum += i; 
    
      boost::posix_time::ptime end = boost::posix_time::microsec_clock::local_time(); 
      std::cout << end - start << std::endl; 
    
      std::cout << sum << std::endl; 
    } 
  2. Verallgemeinern Sie das Programm aus Aufgabe 1, indem jeweils genauso viele Threads verwendet werden wie vom Prozessor gleichzeitig ausgeführt werden können. So sollen zum Beispiel vier Threads verwendet werden, wenn das Programm auf einem Computer mit einem Prozessor mit vier Kernen ausgeführt wird.

  3. Ändern Sie folgendes Programm, indem Sie die Funktion thread() zweimal als Thread in main() starten. Das Programm soll daraufhin zweimal die Summe berechnen und auf den Bildschirm ausgeben. Sie dürfen dabei die Implementation der drei Funktionen calculate(), print() and thread() ändern, nicht aber die Funktionsköpfe. Alle drei Funktionen sollen auch nach Ihren Änderungen keinen Parameter erwarten und keinen Rückgabewert besitzen.

    #include <iostream> 
    
    int sum = 0; 
    
    void calculate() 
    { 
      for (int i = 0; i < 1000; ++i) 
        sum += i; 
    } 
    
    void print() 
    { 
      std::cout << sum << std::endl; 
    } 
    
    void thread() 
    { 
      calculate(); 
      print(); 
    } 
    
    int main() 
    { 
      thread(); 
    }