C++ Programing for Financial Engineering Online Certificate. Approved for 15 CPE credit hours by GARP

  1. This site uses cookies. By continuing to use this site, you are agreeing to our use of cookies. Learn More.

C++ Multithreading in Boost

Discussion in 'Computing' started by Daniel Duffy, 10/26/10.

  1. Daniel Duffy

    Daniel Duffy C++ author, trainer

    This is a preview of Chapter 18 An Introduction to Thread in the upcoming book From: Introduction to the C++ Boost Libraries – Volume I – Foundations.

    18.1. Introduction and Objectives

    In this chapter we discuss how to create C++ code that make use of multi-core processors. In particular, we introduce the thread concept. A thread is a software entity and it represents an independent unit of execution in a program. We design an application by creating threads and letting them execute separate parts of the program code with the objective of improving the speedup of an application. We define the speedup as a number that indicates how many times faster a parallel program is than its serial equivalent. The formula for the speedup S(p) on a CPU with p processors is:
    S(p)= T(1)/T(p)
    where the factor T(p) is the total execution time on a CPU with p processors. When the speedup equals the number of processors we then say that the speedup is perfectly linear.

    The improved performance of parallel programs comes at a price and in this case we must ensure that the threads are synchronised so that they do not destroy the integrity of the shared data. To this end, Boost.Thread has a number of synchronisation mechanisms to protect the program from data races, and ensuring that the code is thread-safe. We also show how to define locks on objects and data so that only one thread can update the data at any given time.
    When working with Boost.Thread you should use the following header file:
    Code (C++):
    1. #include <boost/thread.hpp>
    and the following namespaces:
    Code (C++):
    1. using namespace boost;
    2. using namespace boost::this_thread;
    This chapter is a gentle introduction to multi-threading. We recommend that you also run the source code that accompanies the book to see how multithreaded code differs from sequential code. In Volume II we use Thread as the basis for the implementation of parallel design patterns (Mattson 2005).

    18.2 An Introduction to Threads
    A process is a collection of resources to enable the execution of program instructions.

    Examples of resources are (virtual) memory, I/O descriptors, run-time stack and signal handlers. It is possible to create a program that consists of a collection of cooperating processes. What is the structure of a process?
    • A read-only area for program instructions.
    • A read-write area for global data.
    • A heap area for memory that we allocate dynamically using the new operator or the malloc system call.
    • A stack where we store the automatic variables of the current procedure.
    Processes have control over their resources. Processes can communicate with each other using IPC (Inter Process Communication) mechanisms and they can be seen as heavyweight units of execution. Context-switching between processes is expensive.
    A thread is a lightweight unit of execution that shares an address space with other threads. A process has one or more threads. Threads share the resources of the in process. The execution context for a thread is the data address space that contains all variables in a program. This includes both global variables and automatic variables in routines as well as dynamically allocated variables. Furthermore, each thread has its own stack within the execution context. Multiple threads invoke their own routines without interfering with the stack frames of other threads.

    18.3 The Life of a Thread
    Each process starts with one thread, the master or main thread. Before a new thread can be used, it must be created. The main thread can have one or more child threads. Each thread executes independently of the other threads.

    What is happening in a thread after it has been created and before it no longer exists? A general answer is that it is either executing or not executing. The latter state may have several causes:
    • It is sleeping.
    • It is waiting on some other thread.
    • It is blocked, that is it is waiting on system resources to perform an input or output operation.
    An application should make the best use of its threads because each thread may run on its own processor and the presence of idle threads is synonymous with resource waste.

    18.3.1 How Threads Communicate
    A multi-threaded application consists of a collection of threads. Each thread is responsible for some particular task in the application. In order to avoid anarchy we need to address a number of important issues:
    • Synchronisation: ensuring that an event in one thread notifies another thread. This is called event synchronisation. This signals the occurrence of an event among multiple threads. Another type of synchronisation is mutual exclusion that gives a thread exclusive access to a shared variable or to some other resource for a certain amount of time. This ensures the integrity of the shared variable when multiple threads attempt to access and modify it. We place a lock on the resource and failure to do this may result in a race condition. This occurs when multiple threads share data and at least one of the threads accesses this data without using a defined synchronisation mechanism.
    • Scheduling: we order the events in a program by imposing some kind of scheduling policy on them. In general, there are more concurrent tasks to be executed than there are processors to run them. The scheduler synchronises access to the different processors on a CPU. Thus the scheduler determines which threads are currently executing on the available processors.
    18.4 What Kinds of Applications are suitable for Multi-Threading?
    The main reason for creating a multi-threaded application is performance and responsiveness. As such, threaded code does not add new functionality to a serial application. There should be compelling reasons for using parallel programming techniques.

    In this section we give an overview of a number of issues to address when developing parallel applications (see Mattson 2005 and Nichols 1996). First, we give a list of criteria that help us determine the categories of applications that could benefit from parallel processing. Second, having determined that a given application should be parallelised we discuss how to analyse and design the application with ’parallelism in mind’.

    18.4.1 Suitable Tasks for Multi-threading
    The ideal situation is when we can design an application that consists of a number of independent tasks in which each task is responsible for its own input, processing and output.

    In practice, however tasks are inter-dependent and we must take this into account.

    Concurrency is a property of software systems in which several computations are executing simultaneously and potentially interacting with each other. We maximise concurrency while we minimise the need for synchronisation. We identify a task that will be a candidate for threading based on the following criteria (Nichols 1996):
    • Its degree of independence from other tasks. Does the task need results or data from other tasks and do other tasks depend on its results? These questions determine the provide/require constraints between tasks. An analysis of these questions will lead us to questions concerning task dependencies and resource sharing.
    • Does a task spend a long time in a suspended state and is it blocked in potentially long waits? Tasks that consume resources are candidates for threads. For example, if we dedicate a thread for I/O operations then our program will run faster instead of having to wait for slow I/O operations to complete.
    • Compute-intensive routines. In many applications we may be able to dedicate threads to tasks with time-consuming calculations. Examples of such calculations are array processing, matrix manipulation and random number generation.
    18.5 The Boost thread class
    This class represents a thread. It has member functions for creating threads, firing up threads, thread synchronisation and notification, and finally changing thread state. We discuss the functionality of the thread class in this chapter.
    There are three constructors in thread:
    • Default constructor.
    • Create a thread with an instance of a callable type (which can be a function object, a global or static function) as argument. This function is run when the thread fires up that is, after thread creation.
    • Create a thread with a callable type and its bound arguments to the thread constructor.
    We now discuss the second option. The callable type – which plays the role of the thread function – can be a free function, a static member function or a function callable object. The thread function has a voidreturn type and when it has finished the thread that called it will stop executing.
    We now discuss some code to show how to create a simple ‘101’ multi-threaded program.
    There are two threads, namely the main thread (in the main() function) and a thread that we explicitly create in main(). The program is simple – each one thread is prints some text on the console.
    The first case is when we create a thread whose thread function is a free (global) function:
    Code (C++):
    1. // Global function called by thread
    2. void GlobalFunction()
    3. {
    4. for (int i=0; i<10; ++i)
    5. {
    6. cout<< i << "Do something in parallel with main method." << endl;
    7. boost::this_thread::yield(); // 'yield' discussed in section 18.6
    8. }
    9. }
    10.  
    We now create a thread with GlobalFuntion() as thread function and we fire the thread up:
    Code (C++):
    1. int main()
    2. {
    3. boost::thread t(&GlobalFunction);
    4.  
    5. for (int i=0; i<10; i++)
    6. {
    7. cout << i <<"Do something in main method."<<endl;
    8. }
    9.  
    10. return 0;
    11. }
    12.  
    Each thread prints information on the console. There is no coordination between the threads and you will get different output each time you run the program. The output depends on the thread scheduler. You can run the program and view the output.
    We now discuss how to create a thread whose thread function is a static member function of a class and that is a functor at the same time.
    Code (C++):
    1. class CallableClass
    2. {
    3. private:
    4. // Number of iterations
    5. int m_iterations;
    6.  
    7. public:
    8.  
    9. // Default constructor
    10. CallableClass()
    11. {
    12. m_iterations=10;
    13. }
    14.  
    15. // Constructor with number of iterations
    16. CallableClass(int iterations)
    17. {
    18. m_iterations=iterations;
    19. }
    20.  
    21. // Copy constructor
    22. CallableClass(const CallableClass& source)
    23. {
    24. m_iterations=source.m_iterations;
    25. }
    26.  
    27. // Destructor
    28. ~CallableClass()
    29. {
    30. }
    31.  
    32. // Assignment operator
    33. CallableClass& operator = (const CallableClass& source)
    34. {
    35. m_iterations=source.m_iterations;
    36. return *this;
    37. }
    38.  
    39. // Static function called by thread
    40. static void StaticFunction()
    41. {
    42. for (int i=0; i < 10; i++)  // Hard-coded upper limit
    43. {
    44. cout<<i<<"Do something in parallel (Static function)."<<endl;
    45. boost::this_thread::yield(); // 'yield' discussed in section 18.6
    46. }
    47. }
    48.  
    49. // Operator() called by the thread
    50. void operator () ()
    51. {
    52. for (int i=0; i<m_iterations; i++)
    53. {
    54. cout<<i<<" - Do something in parallel (operator() )."<<endl;
    55. boost::this_thread::yield(); // 'yield' discussed in section 18.6
    56. }
    57. }
    58.  
    59. };
    60.  
    We can now create threads based on the static member function StaticFunction() and on the fact that CallableClass is a function object:
    Code (C++):
    1. int main()
    2. {
    3. boost::thread t(&CallableClass::StaticFunction);
    4.  
    5. for (int i=0; i<10; i++)
    6. {
    7. cout<<i<<" - Do something in main method."<<endl;
    8. }
    9.  
    10. return 0;
    11. }
    12.  
    and
    Code (C++):
    1. int main()
    2. {
    3. // Using a callable object as thread function
    4. int numberIterations = 20;
    5. CallableClass c(numberIterations);
    6. boost::thread t(c);
    7.  
    8. for (int i=0; i<10; i++)
    9. {
    10. cout<< i <<" – Do something in main method." << endl;
    11. }
    12.  
    13. return 0;
    14. }
    15.  
    Finally, when a thread’s destructor is called then the thread of execution becomes detached and it no longer has an associated boost::thread object. In other words, the member function Thread::detach() is called while the thread function continues running.

    18.6 The Life of a Thread
    In general, a thread is either doing something (running its thread function) or is doing nothing (wait or sleep mode). The state transition diagram is shown in Figure 18.1. The scheduler is responsible for some of the transitions between states:
    • Running: the thread has been created and is already started or is ready to start (this is a runnable state). The scheduler has allocated processor time for the thread.
    • WaitSleepJoin: the thread is waiting for an event to trigger. The thread will be placed in the Running state when this event triggers.
    • Stopped: the thread function has run its course (has completed).
    figure-18-1-thread-lifecycle.png
    Figure 18.1 Thread Lifecycle​

    We now discuss some of the member functions that appear in Figure 18.1. First, we note that multi-tasking is not guaranteed to be preemptive and this can result in possible performance degradation because a thread can be involved in a computationally intensive algorithm. Preemption relates to the ability of the operating system to stop a running thread in favour of another thread. In order to give other threads a chance to run, a running thread may voluntarily give up or yield control. Control is returned as soon as possible. For example, we use the global function yield() in the boost::this_thread namespace.

    As an example, consider a callable object that computes the powers of numbers (this class could be adapted to compute powers of very large matrices which would constitute a computationally intensive algorithm):
    Code (C++):
    1. class PowerClass
    2. {
    3. private:
    4.  
    5. // Version II: m will be a large matrix
    6. int m, n; // Variables for m^n
    7.  
    8. public:
    9. double result; // Public data member for the result
    10.  
    11. // Constructor with arguments
    12. PowerClass(int m, int n)
    13. {
    14. this->m=m; this->n=n;
    15. this->result=0.0;
    16. }
    17.  
    18. // Calculate m^n. Supposes n>=0
    19. void operator () ()
    20. {
    21. result=m; // Start with m^1
    22. for (int i=1; i<n; ++i)
    23. {
    24. result*=m; // result=result*m
    25. boost::this_thread::yield();
    26. }
    27. if (n==0) result=1; // m^0 is always 1
    28. }
    29. };
    30.  
    A thread can put itself to sleep for a certain duration (the units can be a POSIX time duration, hours, minutes, seconds, milliseconds or nanoseconds). We use the sleep option when we wish to give other threads a chance to run and for tasks that fire at regular intervals. The main difference is that with yield the thread gets the processor as soon as possible.
    We give an example to show how to put a thread to sleep. We simulate an animation application by creating a thread whose thread function displays some information, then sleeps only to be awoken again by the scheduler at a later stage. The thread function is modelled as AnimationClass:
    Code (C++):
    1. class AnimationClass
    2. {
    3. private:
    4. boost::thread* m_thread; // The thread runs this object
    5. int m_frame; // The current frame number
    6.  
    7. // Variable that indicates to stop and the mutex to
    8. // synchronise "must stop" on (mutex explained later)
    9. bool m_mustStop;
    10. boost::mutex m_mustStopMutex;
    11.  
    12. public:
    13. // Default constructor
    14. AnimationClass()
    15. {
    16. m_thread=NULL;
    17. m_mustStop=false;
    18. m_frame=0;
    19. }
    20.  
    21. // Destructor
    22. ~AnimationClass()
    23. {
    24. if (m_thread!=NULL) delete m_thread;
    25. }
    26.  
    27. // Start the threa
    28. void Start()
    29. {
    30. // Create thread and start it with myself as argument.
    31. // Pass myself as reference since I don't want a copy
    32. m_thread=new boost::thread(boost::ref(*this));
    33. }
    34.  
    35. // Stop the thread
    36. void Stop()
    37. {
    38. // Signal the thread to stop (thread-safe)
    39. m_mustStopMutex.lock();
    40. m_mustStop=true;
    41. m_mustStopMutex.unlock();
    42.  
    43. // Wait for the thread to finish.
    44. if (m_thread!=NULL) m_thread->join();
    45. }
    46.  
    47. // Display next frame of the animation
    48. void DisplayNextFrame()
    49. {
    50. // Simulate next frame
    51. cout<<"Press <RETURN> to stop. Frame: "<<m_frame++<<endl;
    52. }
    53.  
    54. // Thread function
    55. void operator () ()
    56. {
    57. bool mustStop;
    58.  
    59. do
    60. {
    61. // Display the next animation frame
    62. DisplayNextFrame();
    63.  
    64. // Sleep for 40ms (25 frames/second).
    65. boost::this_thread::sleep(boost::posix_time::millisec(40));
    66.  
    67. // Get the "must stop" state (thread-safe)
    68. m_mustStopMutex.lock();
    69. mustStop=m_mustStop;
    70. m_mustStopMutex.unlock();
    71. }
    72. while (mustStop==false);
    73. }
    74. };
    75.  
    Note that the boost thread is created in the Start() function passing itself as thread function. This function object will loop until we call Stop(). In the current case the main() function will call this member function. The code corresponding to the second thread is:
    Code (C++):
    1. int main()
    2. {
    3. // Create and start the animation class
    4. AnimationClass ac;
    5. ac.Start();
    6.  
    7. // Wait for the user to press return
    8. getchar();
    9.  
    10. // Stop the animation class
    11. cout << "Animation stopping..." << endl;
    12. ac.Stop();
    13. cout << "Animation stopped." << endl;
    14.  
    15. return 0;
    16. }
    17.  
    We note the presence of the variable boost::mutex m_mustStopMutex and the call lock() and unlock() on that variable in the Stop() and operator () functions.We discuss mutexes in section 18.7.
    The next question is: how does a thread ‘wait’ on another thread before proceeding? The answer is to use join() (wait for a thread to finish) or timed_join (wait for a thread to finish for a certain amount of time). The effect in both cases is to put the calling thread into WaitSleepJoin state. It is used when we need to wait on the result of a lengthy calculation.

    To give an example, we revisit the class PowerClass and we use it in main() as follows:
    Code (C++):
    1. int main()
    2. {
    3. int m=2;
    4. int n=200;
    5.  
    6. // Create a m^n calculation object
    7. PowerClass pc(m, n);
    8.  
    9. // Create thread and start m^n calculation in parallel
    10. // Since we read the result from pc, we must pass it as reference,
    11. // else the result will be placed in a copy of pc
    12. boost::thread t(boost::ref(pc));
    13.  
    14. // Do calculation while the PowerClass is calculating m^n
    15. double result=m*n;
    16.  
    17. // Wait till the PowerClass is finished
    18. // Leave this out and the result will be bogus
    19. t.join();
    20.  
    21. // Display result.
    22. cout << "(" << m << "^" << n << ") / (" << m << "*" << n
    23.   << ") = "<<pc.result/result<<endl;
    24. }
    25.  
    Here we see that the main thread does some calculations and it waits until the computationally intensive thread function in PowerClass has completed.

    18.7 Basic Thread Synchronisation
    One of the attention points when writing multi-threaded code is to determine how to organise threads in such a way that access to shared data is done in a controlled manner.

    This is because the order in which threads access data is non-deterministic and this can lead to inconsistent results; called race conditions. A classic example is when two threads attempt to withdraw funds from an account at the same time. The steps in a sequential program to perform this transaction are:
    1. Check the balance (are there enough funds in the account?).
    2. Give the amount to withdraw.
    3. Commit the transaction and update the account.
    When there are two threads involved then steps 1, 2 and 3 will be interleaved which means the threads can update data in a non-deterministic way. For example, the scenario in Figure 18.2 shows that after withdrawing 70 and 90 money units the balance is -60 money units which destroys the invariant condition. This states in this case that the balance may never become negative. Why did this transaction go wrong?

    Thread 1Thread 2balance
    if (70>balance)100
    if (90>balance)100
    balance-=7030
    balance-=90-60
    Figure 18.2 Thread Synchronisation​

    The solution is to ensure that steps 1, 2 and 3 constitute an atomic transaction by which we mean that they are locked by a single thread at any one moment in time. Boost.Thread has a number of classes for thread synchronisation. The first class is called mutex (mutual exclusion) and it allows us to define a lock on a code block and release the lock when the thread has finished executing the code block. To do this, we create an Account class containing an embedded mutex:
    Code (C++):
    1. class Account
    2. {
    3. private:
    4.  
    5. // The mutex to synchronise on
    6. boost::mutex m_mutex;
    7.  
    8. // more...
    9. };
    10.  
    We now give the code for withdrawing funds from an account. Notice the thread-unsafe version (which can lead to race conditions) and the thread-safe version using mutex:
    Code (C++):
    1. // Withdraw an amount (not synchronized). Scary!
    2. void Withdraw(int amount)
    3. {
    4. if (m_balance-amount>=0)
    5. {
    6. // For testing we now give other threads a chance to run
    7. boost::this_thread::sleep(boost::posix_time::seconds(1));
    8.  
    9. m_balance-=amount;
    10. }
    11. else throw NoFundsException();
    12. }
    13.  
    14. // Withdraw an amount (locking using mutex object)
    15. void WithdrawSynchronized(int amount)
    16. {
    17. // Acquire lock on mutex.
    18. // If lock already locked, it waits till unlocked
    19. m_mutex.lock();
    20.  
    21. if (m_balance-amount>=0)
    22. {
    23. // For testing we now give other threads a chance to run
    24. boost::this_thread::sleep(boost::posix_time::seconds(1));
    25.  
    26. m_balance-=amount;
    27. }
    28. else
    29. {
    30. // Release lock on mutex. Forget this and it will hang
    31. m_mutex.unlock();
    32. throw NoFundsException();
    33. }
    34.  
    35. // Release lock on mutex. Forget this and it will hang
    36. m_mutex.unlock();
    37. }
    38.  
    Only one thread has the lock at any time. If another thread tries to lock a mutex that is already locked it will enter the SleepWaitJoin state. Summarising, only one thread can hold a lock on a mutex and the code following the call to mutex.lock() can only be executed by one thread at a given time.
    A major disadvantage of using mutex is that the system will deadlock (‘hang’) if you forget to call mutex.unlock(). For this reason we use the unique_lock<Lockable> adapter class that locks a mutex in its constructor and that unlocks a mutex in its destructor. The new version of the withdraw member function will be:
    Code (C++):
    1. // Withdraw an amount (locking using unique_lock)
    2. void WithdrawSynchronized2(int amount)
    3. {
    4. // Acquire lock on mutex. Will be automatically unlocked
    5. // when lock is destroyed at the end of the function
    6. boost::unique_lock<boost::mutex> lock(m_mutex);
    7. if (m_balance-amount>=0)
    8. {
    9. // For testing we now give other threads a change to run
    10. boost::this_thread::sleep(boost::posix_time::seconds(1));
    11. m_balance-=amount;
    12. }
    13. else throw NoFundsException();
    14. } // Mutex automatically unlocked here
    15.  
    Note that it is not necessary to unlock the mutex in this case.

    18.8 Thread Interruption
    A thread that is in the WaitSleepJoin state can be interrupted by another thread which results in the former thread transitioning into the Running state. To interrupt a thread we call the thread member function interrupt() and then an exception of type thread_interrupted is thrown. We note that interrupt() only works when the thread is in the WaitSleepJoin state. If the thread never enters this state, you should call boost::this_thread::interruption_point() to specify a point where the thread can be interrupted.
    The following function contains a defined interruption point:
    Code (C++):
    1. // The function that will be run by the thread
    2. void ThreadFunction()
    3. {
    4. // Never ending loop. Normally the thread will never finish
    5. while(true)
    6. {
    7. try
    8. {
    9. // Interrupt can only occur in wait/sleep or join operation.
    10. // If you don't do that, call interuption_point().
    11. // Remove this line, and the thread will never be interrupted.
    12. boost::this_thread::interruption_point();
    13. }
    14. catch(const boost::thread_interrupted&)
    15. {
    16. // Thread interruption request received, break the loop
    17. cout<<"- Thread interrupted. Exiting thread."<<endl;
    18. break;
    19. }
    20. }
    21. }
    22.  
    We now use this function in a test program; in this case we start a thread with ThreadFunction() as thread function. We let it run and then we interrupt it.
    Code (C++):
    1. int main()
    2. {
    3. // Create and start the thread
    4. boost::thread t(&ThreadFunction);
    5.  
    6. // Wait 2 seconds for the thread to finish
    7. cout<<"Wait for 2 seconds for the thread to stop."<<endl;
    8. while (t.timed_join(boost::posix_time::seconds(2))==false)
    9. {
    10. // Interupt the thread
    11. cout<<"Thread not stopped, interrupt it now."<<endl;
    12. t.interrupt();
    13. cout<<"Thread interrupt request sent. "
    14. cout<<"Wait to finish for 2 seconds again."<<endl;
    15. }
    16.  
    17. // The thread has been stopped
    18. cout<<"Thread stopped"<<endl;
    19. }
    20.  
    18.9 Thread Notification
    In some cases a thread A needs to wait for another thread B to perform some activity. Boost.Thread provides an efficient way for thread notification:
    • wait(): thread A releases the lock when wait() is called; A then sleeps until another thread B calls notify().
    • notify(): signals a change in an object related to thread B. Then one waiting thread (in this case A) wakes up after the lock has been released.
    • notify_all(): this has the same intent as notify() except that all waiting threads wake up.
    We shall see examples of this mechanism when we discuss synchronising queues and the Producer-Consumer pattern in section 11.

    18.10 Thread Groups
    Boost.Thread contains the class thread_group that supports the creation and management of a group of threads as one entity. The threads in the group are related in some way. The functionality is:
    • Create a new thread group with no threads.
    • Delete all threads in the group.
    • Create a new thread and add it to the group.
    • Remove a thread from the group without deleting the thread.
    • join_all(): call join() on each thread in the group.
    • interrupt_all(): call interrupt() on each thread object in the group.
    • size(): give the number of threads in the group.
    We shall give an example of how to use this class when we discuss the Producer-Consumer pattern in which a producer group writes data (enqueue) to a synchronised queue while threads in a consumer group extract (dequeue) the data from this queue.

    18.11 Shared Queue Pattern
    This pattern is a specialisation of the Shared Data Pattern (Mattson 2005). It is a thread-safe wrapper for the STL queue<T> container. It is a blocking queue because a thread wishing to dequeue the data will go into sleep mode if the queue is empty and it will only call this function when it receives a notify() from another thread. This notification implies that new data is in the queue. The lock is automatically released and waiting threads are notified using a condition variable. A condition variable provides a way of naming an event in which threads have a general interest.
    The interface is:
    Code (C++):
    1. // Queue class that has thread synchronisation
    2. template <typename T>
    3. class SynchronisedQueue
    4. {
    5. private:
    6. std::queue<T> m_queue; // Use STL queue to store data
    7. boost::mutex m_mutex; // The mutex to synchronise on
    8. boost::condition_variable m_cond; // The condition to wait for
    9.  
    10. public:
    11.  
    12. // Add data to the queue and notify others
    13. void Enqueue(const T& data)
    14. {
    15. // Acquire lock on the queue
    16. boost::unique_lock<boost::mutex> lock(m_mutex);
    17.  
    18. // Add the data to the queue
    19. m_queue.push(data);
    20.  
    21. // Notify others that data is ready
    22. m_cond.notify_one();
    23.  
    24. } // Lock is automatically released here
    25.  
    26. // Get data from the queue. Wait for data if not available
    27. T Dequeue()
    28. {
    29.  
    30. // Acquire lock on the queue
    31. boost::unique_lock<boost::mutex> lock(m_mutex);
    32.  
    33. // When there is no data, wait till someone fills it.
    34. // Lock is automatically released in the wait and obtained
    35. // again after the wait
    36. while (m_queue.size()==0) m_cond.wait(lock);
    37.  
    38. // Retrieve the data from the queue
    39. T result=m_queue.front(); m_queue.pop();
    40. return result;
    41.  
    42. } // Lock is automatically released here
    43. };
    44.  
    We now use this class as a data container in the Producer-Consumer pattern.

    18.12 The Producer-Consumer Pattern
    This pattern is useful in a variety of situations. There are many applications of this pattern (POSA 1996, Mattson 2005, GOF 1995). In general, one or more producer agents write information to a synchronised queue while one or more consumer agents extract information from the queue. It is possible to extend the pattern to support multiple queues. The Producer-Consumer Pattern is depicted in Figure 18.3.

    figure-18-3-producer-consumer.png
    Figure 18.3 Producer-Consumer Pattern​

    We create a producer class as follows:
    Code (C++):
    1. // Class that produces objects and puts them in a queue
    2. class Producer
    3. {
    4. private:
    5. int m_id; // The id of the producer
    6. SynchronisedQueue<string>* m_queue; // The queue to use
    7.  
    8. public:
    9.  
    10. // Constructor with id and the queue to use
    11. Producer(int id, SynchronisedQueue<string>* queue)
    12. {
    13. m_id=id;
    14. m_queue=queue;
    15. }
    16.  
    17. // The thread function fills the queue with data
    18. void operator () ()
    19. {
    20. int data=0;
    21. while (true)
    22. {
    23. // Produce a string and store in the queue
    24. string str = "Producer: " + IntToString(m_id) +
    25. " data: " + IntToString(data++);
    26. m_queue->Enqueue(str);
    27. cout<<str<<endl;
    28.  
    29. // Sleep one second
    30. boost::this_thread::sleep(boost::posix_time::seconds(1));
    31. }
    32. }
    33. };
    34.  
    Similarly, the interface for the consumer class is given by:
    Code (C++):
    1. // Class that consumes objects from a queue
    2. class Consumer
    3. {
    4. private:
    5. int m_id; // The id of the consumer
    6. SynchronisedQueue<string>* m_queue; // The queue to use
    7.  
    8. public:
    9. // Constructor with id and the queue to use.
    10. Consumer(int id, SynchronisedQueue<string>* queue)
    11. {
    12. m_id=id;
    13. m_queue=queue;
    14. }
    15.  
    16. // The thread function reads data from the queue
    17. void operator () ()
    18. {
    19. while (true)
    20. {
    21. // Get the data from the queue and print it
    22. cout<<"Consumer "<<IntToString(m_id).c_str()
    23. <<" consumed: ("<<m_queue->Dequeue().c_str();
    24.  
    25. // Make sure we can be interrupted
    26. boost::this_thread::interruption_point();
    27. }
    28. }
    29. };
    30.  
    Finally, the following code creates thread groups for producers and consumers using the thread-group class:
    Code (C++):
    1. #include "Producer.hpp"
    2. #include "Consumer.hpp"
    3.  
    4. using namespace std;
    5.  
    6. int main()
    7. {
    8. // Display the number of processors/cores
    9. cout<<boost::thread::hardware_concurrency()
    10. <<" processors/cores detected."<<endl<<endl;
    11. cout<<"When threads are running, press enter to stop"<<endl;
    12.  
    13. // The number of producers/consumers
    14. int nrProducers, nrConsumers;
    15.  
    16. // The shared queue
    17. SynchronisedQueue<string> queue;
    18.  
    19. // Ask the number of producers
    20. cout<<"How many producers do you want? : ";
    21. cin>>nrProducers;
    22.  
    23. // Ask the number of consumers
    24. cout<<"How many consumers do you want? : ";
    25. cin>>nrConsumers;
    26.  
    27. // Create producers
    28. boost::thread_group producers;
    29. for (int i=0; i<nrProducers; i++)
    30. {
    31. Producer p(i, &queue);
    32. producers.create_thread(p);
    33. }
    34.  
    35. // Create consumers
    36. boost::thread_group consumers;
    37. for (int i=0; i<nrConsumers; i++)
    38. {
    39. Consumer c(i, &queue);
    40. consumers.create_thread(c);
    41. }
    42. // Wait for enter (two times because the return from the
    43. // previous cin is still in the buffer)
    44. getchar(); getchar();
    45.  
    46. // Interrupt the threads and stop them
    47. producers.interrupt_all(); producers.join_all();
    48. consumers.interrupt_all(); consumers.join_all();
    49. }
    50.  
    18.13 Thread Local Storage
    We know that global data is shared between threads. In some cases we may wish to give each thread its own copy of global data. To this end, we call thread_specific_ptr<T> that is a pointer to the data (it is initially set to NULL). Each thread must initialise this pointer by calling reset() and subsequentially the data can be accessed by dereferencing the pointer. The data is automatically deleted when the thread exits.
    Here is an example of a thread function that defines it own copy of global data:
    Code (C++):
    1. // Global data. Each thread has its own value
    2. boost::thread_specific_ptr<int> threadLocalData;
    3.  
    4. // Callable function
    5. void CallableFunction(int id)
    6. {
    7. // Initialise thread local data (for the current thread)
    8. threadLocalData.reset(new int);
    9. *threadLocalData=0;
    10.  
    11. // Do this a number of times
    12. for (int i=0; i<5; i++)
    13. {
    14. // Print value of global data and increase value
    15. cout<<"Thread: "<<id<<" - Value: "<<(*threadLocalData)++<<endl;
    16.  
    17. // Wait one second
    18. boost::this_thread::sleep(boost::posix_time::seconds(1));
    19. }
    20. }
    21.  
    We now initialise the copy of the global data, we also create a thread group and we add a number of threads to it, each one having its own copy of the global data:
    Code (C++):
    1. int main()
    2. {
    3. // Initialise thread local data (for the main thread)
    4. threadLocalData.reset(new int);
    5. *threadLocalData=0;
    6.  
    7. // Create threads and add them to the thread group
    8. boost::thread_group threads;
    9. for (int i=0; i<3; i++)
    10. {
    11. boost::thread* t=new boost::thread(&CallableFunction, i);
    12. threads.add_thread(t);
    13. }
    14.  
    15. // Wait till they are finished
    16. threads.join_all();
    17.  
    18. // Display thread local storage value, should still be zero
    19. cout<<"Main - Value: "<<(*threadLocalData)<<endl;
    20. return 0;
    21. }
    22.  
    18.14 Summary and Conclusions

    We have included a chapter on multi-threading using Boost.Thread. It is now possible to create parallel applications in C++. We see a future for multi-tasking and multi-threading applications and for this reason we decided to give an introduction to the most important functionality in this library.
    Boost.Thread contains low-level operations or implementation mechanisms that we use to design and implement multithreaded applications. It contains the building blocks that can be used with parallel design patterns (see Mattson 2005). We summarise the main steps in the process of creating a multithreaded application:
    1. Finding Concurrency: we decide if a problem is a suitable candidate for a parallel solution. System decomposition based on tasks or data allows us to find potentially concurrent tasks and their dependencies. In particular, we need a way of grouping tasks and ordering the groups in order to satisfy temporal constraints.
    2. An initial design is produced.
    3. Algorithm Structure Design: we elaborate the initial model in order to move it closet to a program. We pay attention to forces such as efficiency, simplicity, portability and scalability. The algorithm structure will be determined by tasks on the one hand or by data on the other hand. Examples of high-level algorithms are Divide and Conquer, Geometric Decomposition and Pipeline.
    4. Supporting Structures Design: in this phase we need to decide how to model program structure and shared data. For example, the program could be designed as a SPMD (Single Program Multiple Data), Master Worker or Loop Parallelism pattern. Possible data structures are Shared Data and Shared Queue whose implementation we discussed in section 18.11.
    5. Implementation Mechanisms: in this phase we deploy the functionality of Boost.Thread to implement the design.
    We discuss this process and its applications in Volume II.
    About the Author:
    Daniel Duffy
    is an author and trainer. His company Datasim specializes in methods and techniques for solving problems in quantitative finance. He is the author of Monte Carlo Frameworks: Building Customisable High-performance C++ Applications and Introduction to C++ for Financial Engineers: An Object-Oriented Approach. For more information on the author, see QuantNet's interview with Daniel Duffy
     
  2. bruce.banner

    bruce.banner Guest

    Excellent article. Please indent your code in the future.

    if (m_thread!=NULL) delete m_thread; // strictly speaking, only "delete m_thread" is required.
     
  3. Daniel Duffy

    Daniel Duffy C++ author, trainer

    This blog was in fact a pdf of a chapter in the book. The other blogs were easier as far as code is concerned. We shall post code soon. All the source code is provided with the book.

    hth
     
  4. Komodo

    Komodo New Member

    Very good. I love the optimizer in UDB. Very well designed.
     
  5. Daniel Duffy

    Daniel Duffy C++ author, trainer

    "Very good. I love the optimizer in UDB. Very well designed. "
    Is the UDB == synchronising queue?
     
  6. Me95mlp

    Me95mlp Guest

    This is a fantastic article, thanks for sharing. I really want to understand what you have explained and would like to ask the following:-

    [Q1] In Section 18.8, you talk about interrupts. In the code fragment you have a try block with a function call

    boost::this_thread::interruption_point();

    Am I right in saying that unless the function

    t.interrupt();
    is invoked from the main function (in this case) this is ignored. So if you removed the t.interrupt() call that thread in the function ThreadFunction will be stuck in a infinite loop?

    [Q2] In Section 18.12, how does the Producer thread break out of its while(true) loop? I see no call to

    boost::this_thread::interruption_point();

    as in the Consumer thread's while(true) loop?

    [Q3] Again in Section 18.12, you make a call to

    boost::this_thread::interruption_point();

    however, you mention in Section 18.8 that this will throw an exception. In the main() function you do not have any catch statements for this exception. Does this mean your program will crash? Admittedly I have not run it (don't have the facilities at the moment), or do the function calls

    // Interrupt the threads and stop them
    producers.interrupt_all(); producers.join_all(); consumers.interrupt_all(); consumers.join_all();

    handle the exception?

    [Q4] Finally, still referring to Section 18.8, if

    boost::this_thread::interruption_point();

    throws an exception, why are they not in a try block in the Consumer and Producer threads?

    Thank you very much for any input.
     
  7. Daniel Duffy

    Daniel Duffy C++ author, trainer

  8. Joe K Nasser

    Joe K Nasser Guest

    can you use multithreading in C# or VB? if so do you have any links?
     
  9. Me95mlp

    Me95mlp Guest

    Many thanks for your reply. I understand your examples a lot better. I must confess that I did not know that an exception thrown by a thread that is not caught does not call "terminate" on the program like on the main thread. Is there a reason for this? What happens to the exception object as a result of not finding a matching "catch" statement? I presume the call stack is unwound (as per usual) and the exeption object is just dumped?

    Thanks in advance.
     
  10. Me95mlp

    Me95mlp Guest

    C# certainly has multithreading functionality. Don't know about VB but I am sure google will know :)
     
  11. ferber

    ferber Guest

    Hi,
    I have a decent knowledge of C & C++. I was wondering if knowledge of gpgpu (eg cuda) would give me an edge when applying for an FE program or for landing a job. Would it help to have some high performance computing research experience?
     
  12. TkOp

    TkOp New Member

    When I runed this code of The Producer-Consumer Pattern on Visual Studio . I received a notification:
    ' IntToString': identifier not found...
    Help me.
     
  13. Daniel Duffy

    Daniel Duffy C++ author, trainer

    ??

    what's the context?
     
  14. TkOp

    TkOp New Member

    I need detailed code of Producer-Consumer problem.
    Can U help me...
    Thank in advance !
     
  15. Daniel Duffy

    Daniel Duffy C++ author, trainer

    In which language?Also search on QN.
     
  16. TkOp

    TkOp New Member

    I have assignment on this topic. I write code in C++ about Producer - Consumer Problem. but not yet completed.
    Help me...
     
  17. Daniel Duffy

    Daniel Duffy C++ author, trainer

    Please post some results and we can discuss it. I cannot spoon-feed.

    How far have you got etc. etc.?

    Please be more clear as this thread is going nowhere in its current form.
     
  18. TkOp

    TkOp New Member

    Sorry. i'm from Vietnamese. my vocabulary is poor. so it's hard to say clearly...It's relly annoys you.:cry:
    But i really need source code about Shared Queue in C++. Are you post it...?
    Many thanks.
     
  19. Daniel Duffy

    Daniel Duffy C++ author, trainer

    It's not the English that is the problem. I did not mean to offend.

    Please see section 18.11 of my article at the beginning of this thread? Can you see the code?
     
    TkOp likes this.
  20. Daniel Duffy

    Daniel Duffy C++ author, trainer