/// \file PriorityThreadQueue.h /// \brief This file contains the definition of the class PriorityThreadQueue. If the macro PTQVERBOSE is defined as an integer not equal /// to 0 then PriorityThreadQueue will print verbose output. #ifndef PriorityThreadQueue_h #define PriorityThreadQueue_h #include #include #include #include #include #include #ifndef PTQVERBOSE /// This macro causes verbose output to pre printed to std::cerr. If the user desires this output, PTQVERBOSE should be defined /// to a non zero number, before PriorityThreadQueue.h is included. #define PTQVERBOSE 0 #endif /// \brief This struct provides a way to stop a thread, and start it again from another thread. All of the attributes of this class /// are non-copyable, so the "typedef boost::shared_ptr< boost_thread_control > p_boost_thread_control" was defined to allow /// containers to to reference and control groups of these objects. class boost_thread_control { boost::mutex _mutex; boost::mutex::scoped_lock _lock; boost::condition _condition; boost_thread_control() : _mutex(), _lock( _mutex), _condition() { } friend class ThreadControler; friend class PriorityThreadQueue; }; /// \brief This typedef was defined to allow containers to reference and control groups of objects of type thread_control. typedef boost::shared_ptr< boost_thread_control > p_boost_thread_control; /// \brief This struct provides a way to stop and start individual threads. /// /// Assumes: _id is a unique id corresponding to a specific thread.
/// Description: This struct provides a way to stop and start individual threads. struct ThreadControler { /// This is a unique id corresponding to a specific thread. This value is defined by a call to PriorityThreadQueue::register_thread. int _id; /// This is the priority of the thread. This is initialized at the time the ThreadControler is defined. At present /// it can not be changed afater the ThreadControler object is defined. This value is defined by a call to /// PriorityThreadQueue::register_thread. int _priority; /// This boolian is true, if the execution of the thread is currently controled by an object of class PriorityThreadQueue. /// Control of the thread is passed to the PriorityThreadQueue, when the individual thread calls the function request_cpu. bool _is_controled; /// This boolian is true, if the execution of the thread is currently controled by an object of class PriorityThreadQueue, /// and if the controling object is allowing the thread to execute. bool _is_executing; /// This is a boost::shared_ptr to an object of type boost_thread_control. It is contained in a shared pointer so that /// objects of type ThreadControler are copyable, and can be manipulated by class std::priority_queue p_boost_thread_control _control; /// Assumes: Id is unique, and corresponds to a specific thread. This function is called only by /// PriorityThreadQueue::register_thread.
/// Calls: the constructor boost_thread_control::boost_thread_control
/// Description: This contructs a an object to reference and control an individual thread. This function /// is called by PriorityThreadQueue::register_thread. The constructed objects are stored in PriorityThreadQueue::_map_threads. /// They are referenced as pointers in PriorityThreadQueue::_queue_executing_threads and PriorityThreadQueue::_queue_waiting_threads. ThreadControler( int Id, int Priority) : _id(Id), _priority( Priority), _is_controled(false), _is_executing(false), _control( new boost_thread_control) { } /// \brief This pauses execution of the thread, corresponding to "_id"
/// /// Assumes: This is called from the thread corresponding to "_id", and is called from PriorityThreadQueue::request_cpu
/// Changes: This stops execution of the thread corresponding to "_id"
/// Description: This pauses execution of the thread, corresponding to "_id". It is called from /// PriorityThreadQueue::request_cpu. Exection is resumed when another thread of higher prioirty calls PriorityThreadQueue::release_cpu, /// thus causing the PriorityThreadQueue::_queue_executing_threads to unwind and call ThreadControler::notify for all the threads /// which no longer need cpu. void wait() { _control->_condition.wait( _control->_lock); } /// \brief This function causes the thread, corresonding to "_id" to resume execution.
/// /// Assumes: ThreadControler::wait was called previously for by the thread corresponding to "_id". This function is called /// by ThreadControler::release_cpu, as it unwinds the ThreadControler::_queue_executing_threads, for all the threads which are /// done needing cpu.
/// Changes: allows the thread corresponding to "_id" to resume execution.
/// Description: This function causes the thread, corresonding to "_id" to resume execution. void notify() { _control->_condition.notify_one(); } public: /// \brief This is the copy constructor which allows the STL library to manipulate objects of class ThreadControler /// /// Assumes: Id corresponds to a specific thread. This function is called only by /// member functions of std::map from PriorityThreadQueue::_map_threads, and std::priority_queue /// from PriorityThreadQueue::_queue_waiting_threads and PriorityThreadQueue::_queue_executing_threads.
/// Calls: The copy constructor for boost::shared_ptr, in order to copy the information for objects of /// class boost_thread_cotrol
/// Description: This function allows std::map, and std::priority_queue to manipulate objects of type ThreadControler. ThreadControler( const ThreadControler &rhs) : _id(rhs._id), _priority(rhs._priority), _is_controled( rhs._is_controled), _is_executing( rhs._is_executing), _control( rhs._control) { } /// \brief This is the equal operator which allows the STL library to manipulate objects of class ThreadControler /// /// Assumes: Id corresponds to a specific thread. This function is called only by /// member functions of std::map from PriorityThreadQueue::_map_threads, and std::priority_queue /// from PriorityThreadQueue::_queue_waiting_threads and PriorityThreadQueue::_queue_executing_threads.
/// Calls: The copy constructor for boost::shared_ptr, in order to copy the information for objects of /// class boost_thread_cotrol
/// Description: This function allows std::map, and std::priority_queue to manipulate objects of type ThreadControler. ThreadControler& operator=( const ThreadControler &rhs) { _id = rhs._id; _priority = rhs._priority; _is_executing = rhs._is_executing; _is_controled = rhs._is_controled; _control = rhs._control; return *this; } }; /// \brief This class provides the comparision function for the priority queues. struct CompareThreadControler { bool operator () (const ThreadControler* lhs, const ThreadControler* rhs) { return lhs->_priority < rhs->_priority; } }; /// \brief This class manages a set of threads. Each thread registers with a call to PriorityThreadQueue::register_thread. /// Each thread request cpu access with a call to PriorityThreadQueue::request_cpu, and releases CPU privaleges with a call /// to PriorityThreadQueue::release_cpu class PriorityThreadQueue { /// This map contains the thread Controler object for each thread. This container is filled by the calls to /// PriorityThreadQueue::register_thread. std::map _map_threads; /// This queue references the set of ThreadControler's which are currently executing. std::priority_queue< ThreadControler*, std::vector, CompareThreadControler> _queue_executing_threads; /// This queue references the set of ThreadControler's which are waiting for permission to execute. std::priority_queue< ThreadControler*, std::vector, CompareThreadControler> _queue_waiting_threads; /// This mutex prevents a race condition for the different threads calling this object. boost::mutex _mutex; public: /// \brief This function registers the thread with the object of class PriorityThreadQueue. This function creates /// an object of class ThreadControler and inserts it into PriorityThreadQueue::_map_threads /// /// Assumes: each thread has a unique id, each thread calls register thread only once
/// Description: adds thread to list
/// Returns : 0 on success, -1 on error
int register_thread( int id, int priority) { boost::mutex::scoped_lock _lock( _mutex); if (_map_threads.find( id) != _map_threads.end()) { return -1; } //ThreadControler tc( id, priority, requesting_thread_mutex); _map_threads.insert( std::pair (id , ThreadControler( id, priority))); return 0; } /// \brief This function changes the ThreadControler object for thread "id", to ThreadControler::_is_controled = true. If the /// priority of thread "id" is greater than or equal to _queue_executing_threads.top()->_priority then the thread continues /// execution and ThreadControler::_is_executing is set to true. If it is less than this number then the thread is haulted. /// /// Assumes: id is in map of thread id's, request_cpu is not called iteratively, or put another way, is_controled != true
/// Changes: local containers, may stop requesting thread
/// Returns: -1 on error, 0 if thread executes, 1, if thread waits
/// Description: This function changes the ThreadControler object for thread "id", to ThreadControler::_is_controled = true. /// If the priority of thread "id" is greater than or equal to _queue_executing_threads.top()->_priority then the thread continues /// execution and ThreadControler::_is_executing is set to true, and a pointer to the object of type ThreadControler is pushed onto /// PriorityThreadQueue::_queue_executing_threads. If this priority has insufficient priority then the thread is haulted with a call to /// ThreadControler::wait, and a pointer to the ThreadControler object is pushed onto PriorityThreadQueue::_queue_waiting_threads. int request_cpu( int id) { std::map::iterator pt; { boost::mutex::scoped_lock _lock( _mutex); pt = _map_threads.find( id); if (pt == _map_threads.end()) return -1; // alternatively this could be a std::logic_error or a std::domain_error if (pt->second._is_controled) return -1; // alternatively this could be a std::logic_error or a std::domain_error // Logical Table Analysis // case 1 _executing_priority_queue.empty() // case 2 pt->_priority == _executing_priority_queue.top()._priority // case 3 pt->_priority > _executing_priority_queue.top()._priority // set pt->is_executing = true // push pt onto executing queue // return 0 // case 4 pt->_priority < _executing_priority_queue.top()._priority // set pt->is_executing = false // push pt onto waiting queue // unlock _mutex // call pt->second.wait() pt->second._is_controled = true; if (_queue_executing_threads.empty() || pt->second._priority >= _queue_executing_threads.top()->_priority) { pt->second._is_executing = true; #if PTQVERBOSE std::cerr << "thread with id = " << pt->second._id << " will execute" << endl; #endif _queue_executing_threads.push( &pt->second); return 0; } else if (pt->second._priority < _queue_executing_threads.top()->_priority) { pt->second._is_executing = false; #if PTQVERBOSE std::cerr << "thread with id = " << pt->second._id << " will wait" << endl; #endif _queue_waiting_threads.push( &pt->second); } } /// this is done outside the scope of _lock, so as to prevent a deadlock for _lock(_mutex) pt->second.wait(); return 1; } /// \brief This function tells the PriorityThreadQueue object that thread "id" will no longer need CPU privileges, and that /// lower priority threads can now execute. /// /// Assumes: Thread "id" has called PriorityThreadQueue::register_thread, and PriorityThreadQueue::request_cpu
/// Changes: PriorityThreadQueue::_queue_executing_threads, PriorityThreadQueue::_queue_waiting_threads
/// Description: This function tells the PriorityThreadQueue object that thread "id" will no longer need CPU privileges, /// and that lower priority threads can now execute. int release_cpu( int id) { boost::mutex::scoped_lock _lock( _mutex); std::map::iterator pt; pt = _map_threads.find( id); #if PTQVERBOSE cerr << "id = " << id << endl; cerr << "_queue_executing_threads.top()->_id = " << id << endl; cerr << "_queue_executing_threads.top()->_is_executing = "; if (_queue_executing_threads.top()->_is_executing) cerr << "true"; else cerr << "false"; cerr << endl; #endif // alternatively any of these 3 error conditions could be a std::logic_error or a std::domain_error if (pt == _map_threads.end()) return -1; if (!pt->second._is_controled) return -1; if (!pt->second._is_executing) return -1; pt->second._is_executing = false; pt->second._is_controled = false; // case 1 id == _queue_executing_threads.top()->_id // unwind queue // if executing queue is empty, or new top priority is less than // previous top priority then push threads from waiting to executing // case 2 id != _queue_executing_threads.top()->_id // do nothing if (id == _queue_executing_threads.top()->_id) { while (!_queue_executing_threads.empty() && _queue_executing_threads.top()->_is_executing==false) { #if PTQVERBOSE cerr << "calling _queue_executing_threads.pop()" << endl; #endif _queue_executing_threads.pop(); } } #if PTQVERBOSE cerr << "_queue_waiting_threads.empty() = "; if (_queue_waiting_threads.empty()) cerr << "true"; else cerr << "false"; cerr << endl; #endif while (!_queue_waiting_threads.empty() && (_queue_executing_threads.empty() || _queue_waiting_threads.top()->_priority >= _queue_executing_threads.top()->_priority)) { #if PTQVERBOSE cerr << "calling _queue_executing_threads.push" << endl; #endif _queue_executing_threads.push( _queue_waiting_threads.top()); _queue_waiting_threads.top()->_is_executing = true; _queue_waiting_threads.top()->notify(); // this lets the thread start executing again #if PTQVERBOSE cerr << "calling _queue_waiting_threads.pop() for id = " << _queue_waiting_threads.top()->_id << endl; #endif _queue_waiting_threads.pop(); } #if PTQVERBOSE cerr << "returning 0 in release_cpu" << endl; #endif return 0; } }; #endif