123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- #include "ThreadPool.h"
- #include "StdLog/stdlog.h"
- ThreadPool::ThreadPool() :
- m_stop(false)
- {
-
-
- m_threadMaxNum = 256;
- m_threadMiniNum = 2;
- m_threadAddNum = 2;
- m_threadMiniIdle = 1;
- m_threadMaxIdle = 4;
- m_threadRunNum = 0;
- m_threadLiveNum = 0;
- m_threadExitNum = 0;
-
- m_managerThread = std::thread(&ThreadPool::managerThread, this);
- LOG_DEBUG("***** Hello ThreadPool *****");
-
-
- }
- ThreadPool::~ThreadPool()
- {
- LOG_INFO("线程池正在退出...");
-
- {
- std::unique_lock<std::mutex> lock(m_mutexTask);
- m_stop = true;
- }
- LOG_INFO("通知所有工作线程退出...");
-
- m_cond_Task.notify_all();
-
- while (!m_mapThreads.empty())
- {
-
- clearThread();
- std::this_thread::sleep_for(std::chrono::microseconds(100));
- }
- LOG_INFO("回收管理线程...");
-
- m_managerThread.join();
- LOG_INFO("===== 线程池退出完成 =====");
- }
- void ThreadPool::worker()
- {
- m_threadLiveNum++;
- while (true)
- {
-
- std::unique_lock<std::mutex> lock(m_mutexTask);
-
- m_cond_Task.wait(lock, [this]
- { return !m_queue_Tasks.empty() || m_stop ||
- (m_threadExitNum > 0); });
-
- if (m_stop && m_queue_Tasks.empty())
- {
- break;
- }
-
- if (m_threadExitNum.load() > 0 )
- {
- m_threadExitNum--;
-
- if ( m_queue_Tasks.empty())
- {
- break;
- }
- }
-
- std::function<void()> task(std::move(m_queue_Tasks.front()));
- m_queue_Tasks.pop();
- lock.unlock();
- m_threadRunNum++;
-
- task();
- m_threadRunNum--;
- }
-
- m_threadLiveNum--;
-
- {
- std::unique_lock<std::mutex> lock(m_mutexExitThreadID);
- m_exitThreadID.emplace_back(std::this_thread::get_id());
- }
-
-
-
- auto threadID = std::this_thread::get_id();
- std::hash<std::thread::id> hasher;
- size_t id = hasher(threadID);
- LOG_DEBUG("线程ID: " << id << " 退出任务循环");
- return;
- }
- void ThreadPool::managerThread()
- {
- while (!m_stop)
- {
-
- int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();
-
-
- if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )
- {
- std::unique_lock<std::mutex> lock(m_mutexTask);
- int numTask = (int)m_queue_Tasks.size();
- lock.unlock();
- int numAdd = 0;
- if(numTask > 0)
- {
-
- if( numTask + m_threadLiveNum.load() <= m_threadMaxNum )
- {
-
- numAdd = numTask;
- }
-
- else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum)
- {
-
- numAdd = m_threadAddNum.load();
- }
-
- else
- {
- numAdd = m_threadMaxNum - m_threadLiveNum.load();
- }
- }
-
- else
- {
- numAdd = m_threadMiniIdle.load() - num_idle;
- }
-
- if(numAdd > 0)
- {
-
- createThread(numAdd);
- continue;
- }
-
- }
-
-
-
- if (num_idle > m_threadMaxIdle.load())
- {
- int num_Exit = num_idle = m_threadMaxIdle.load();
- if (num_Exit > m_threadAddNum.load())
- {
- num_Exit = m_threadAddNum.load();
- }
- m_threadExitNum.exchange(num_Exit);
- LOG_INFO("有" << m_threadExitNum.load() << "个线程需要退出");
-
- for (int i = 0; i < num_Exit; i++)
- {
- m_cond_Task.notify_one();
- }
- }
-
- clearThread();
-
- std::this_thread::sleep_for(std::chrono::seconds(1));
- }
- LOG_INFO("管理线程退出...");
- }
- void ThreadPool::createThread(int num)
- {
- for (int i = 0; i < num; i++)
- {
-
- std::thread t(&ThreadPool::worker, this);
- m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) ));
- }
- }
- void ThreadPool::clearThread()
- {
- for(auto& it : m_exitThreadID)
- {
- auto it1 = m_mapThreads.find(it);
- if(it1 != m_mapThreads.end())
- {
- if(it1->second.joinable())
- {
- it1->second.join();
- m_mapThreads.erase(it1);
- }
- }
- }
- m_exitThreadID.clear();
- }
- int ThreadPool::getThreadMaxNum()
- {
- return m_threadMaxNum;
- }
- void ThreadPool::setThreadMaxNum(int num)
- {
- m_threadMaxNum = num;
- }
- int ThreadPool::getThreadMiniNum()
- {
- return m_threadMiniNum;
- }
- void ThreadPool::setThreadMiniNum(int num)
- {
- m_threadMiniNum = num;
- }
- int ThreadPool::getThreadIdleNum()
- {
- return m_threadLiveNum.load() - m_threadRunNum.load();
- }
- int ThreadPool::getThreadRunNum()
- {
- return m_threadRunNum.load();
- }
-
- int ThreadPool::getThreadLiveNum()
- {
- return m_threadLiveNum.load();
- }
- int ThreadPool::getThreadAddNum()
- {
- return m_threadAddNum.load();
- }
- void ThreadPool::setThreadAddNum(int num)
- {
- m_threadAddNum.exchange(num);
- }
- int ThreadPool::getThreadMiniIdle()
- {
- return m_threadMiniIdle.load();
- }
- void ThreadPool::setThreadMiniIdle(int num)
- {
- m_threadMiniIdle.exchange(num);
- }
- int ThreadPool::getThreadMaxIdle()
- {
- return m_threadMaxIdle.load();
- }
- void ThreadPool::setThreadMaxIdle(int num)
- {
- m_threadMaxIdle.exchange(num);
- }
|