فهرست منبع

V0.6.17
1、替换了ThreadPool的日子输出方式

Apple 2 ماه پیش
والد
کامیت
bac4399a29
4فایلهای تغییر یافته به همراه483 افزوده شده و 15 حذف شده
  1. 7 2
      common/StdLog/stdlog.h
  2. 14 13
      common/ThreadPool/ThreadPool.cpp
  3. 316 0
      common/ThreadPool/ThreadPool_spdlog.cpp_
  4. 146 0
      common/ThreadPool/ThreadPool_spdlog.h_

+ 7 - 2
common/StdLog/stdlog.h

@@ -1,12 +1,17 @@
 
-#include <string>
-#include <iostream>
+// #include <string>
+
+#include <sstream>
+
 
 
 #ifdef SPDLOG_ACTIVE_LEVEL
 
 #include "spdlog/spdlog.h"
 
+#else
+#include <iostream>
+
 #endif /* SPDLOG_ACTIVE_LEVEL */
 
 

+ 14 - 13
common/ThreadPool/ThreadPool.cpp

@@ -1,5 +1,6 @@
 #include "ThreadPool.h"
-#include "spdlog/spdlog.h"
+// #include "spdlog/spdlog.h"
+#include "StdLog/stdlog.h"
 
 /**
  * @brief Construct a new Thread Pool:: Thread Pool object
@@ -27,7 +28,7 @@ ThreadPool::ThreadPool() :
     /* 创建管理线程,this表示是这个类的成员函数 */
     m_managerThread = std::thread(&ThreadPool::managerThread, this);
 
-    SPDLOG_DEBUG("***** Hello ThreadPool *****");
+    LOG_DEBUG("***** Hello ThreadPool *****");
 
     // /* 创建初始的numThread个线程 */
     // createThread(numThread);
@@ -35,14 +36,14 @@ ThreadPool::ThreadPool() :
 /* 析构函数 */
 ThreadPool::~ThreadPool()
 {
-    SPDLOG_INFO("线程池正在退出...");
+    LOG_INFO("线程池正在退出...");
     /* 将stop置为true */
     {
         std::unique_lock<std::mutex> lock(m_mutexTask);
         m_stop = true;
     }
 
-    SPDLOG_INFO("通知所有工作线程退出...");
+    LOG_INFO("通知所有工作线程退出...");
     /* 发送条件变量,通知所有线程 */
     m_cond_Task.notify_all();
     /* 等待所有的线程退出并回收完成 */
@@ -53,10 +54,10 @@ ThreadPool::~ThreadPool()
         std::this_thread::sleep_for(std::chrono::microseconds(100));
     }
 
-    SPDLOG_INFO("回收管理线程...");
+    LOG_INFO("回收管理线程...");
     /* 先回收管理线程 */
     m_managerThread.join();
-    SPDLOG_INFO("===== 线程池退出完成 =====");
+    LOG_INFO("===== 线程池退出完成 =====");
 }
 
 
@@ -111,7 +112,7 @@ void ThreadPool::worker()
     auto threadID = std::this_thread::get_id();
     std::hash<std::thread::id> hasher;
     size_t id = hasher(threadID);
-    SPDLOG_DEBUG("线程ID:{}退出任务循环", id);
+    LOG_DEBUG("线程ID: " << id << " 退出任务循环");
 
     return;
 }
@@ -128,7 +129,7 @@ void ThreadPool::managerThread()
         /* 获取空闲线程的个数 */
         int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();
         /* 判断线程是否够用,是否需要创建新的线程 */
-        // SPDLOG_DEBUG("***** 判断是否需要添加线程 *****");
+        // LOG_DEBUG("***** 判断是否需要添加线程 *****");
         if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )
         {
             std::unique_lock<std::mutex> lock(m_mutexTask);
@@ -163,7 +164,7 @@ void ThreadPool::managerThread()
             
             if(numAdd > 0)
             {
-                // SPDLOG_INFO("需要添加{}个线程", numAdd);
+                // LOG_INFO("需要添加{}个线程", numAdd);
                 createThread(numAdd);
                 continue;   /* 直接下一个循环,无需检查需不需要销毁线程 */
             }
@@ -171,7 +172,7 @@ void ThreadPool::managerThread()
         }
 
         /* 判断空闲线程是否过多,是否需要销毁几个线程 */
-        // SPDLOG_DEBUG("***** 判断是否需要销毁线程 *****");
+        // LOG_DEBUG("***** 判断是否需要销毁线程 *****");
         /* 由于没规定每次销毁的线程个数,所以这里使用m_threadAddNum作为每次销毁的标准个数 */
         if (num_idle > m_threadMaxIdle.load())
         {
@@ -182,7 +183,7 @@ void ThreadPool::managerThread()
             }
             m_threadExitNum.exchange(num_Exit);
 
-            SPDLOG_INFO("有{}个线程需要退出", m_threadExitNum.load());
+            LOG_INFO("有" << m_threadExitNum.load() << "个线程需要退出");
             /* 唤醒需要退出的num_idle个线程 */
             for (int i = 0; i < num_Exit; i++)
             {
@@ -192,10 +193,10 @@ void ThreadPool::managerThread()
         /* 回收退出的线程 */
         clearThread();
 
-        // SPDLOG_INFO("线程池中的线程实例个数:{}", m_threads.size());
+        // LOG_INFO("线程池中的线程实例个数:{}", m_threads.size());
         std::this_thread::sleep_for(std::chrono::seconds(1));
     }
-    SPDLOG_INFO("管理线程退出...");
+    LOG_INFO("管理线程退出...");
 }
 
 /**

+ 316 - 0
common/ThreadPool/ThreadPool_spdlog.cpp_

@@ -0,0 +1,316 @@
+#include "ThreadPool.h"
+#include "spdlog/spdlog.h"
+
+/**
+ * @brief Construct a new Thread Pool:: Thread Pool object
+ *        构造函数,从这里创建线程,和Linux C的线程池不同,Linux C的线程池数组管理的是线程ID,
+ *        而C++的线程数组直接存储的就是线程函数体,里面有个function的变量,指向任务队列中任务函数
+ * 
+ * @param numThread 
+ */
+ThreadPool::ThreadPool() : 
+    m_stop(false)
+{
+
+    /* 初始化变量 */
+    // m_threadMaxNum = std::thread::hardware_concurrency();   /* 根据CPU核心数规定线程数目 */
+    m_threadMaxNum = 256;
+    m_threadMiniNum = 2;
+    m_threadAddNum = 2;
+    m_threadMiniIdle = 1;
+    m_threadMaxIdle = 4;
+
+    m_threadRunNum = 0;
+    m_threadLiveNum = 0;
+    m_threadExitNum = 0;
+
+    /* 创建管理线程,this表示是这个类的成员函数 */
+    m_managerThread = std::thread(&ThreadPool::managerThread, this);
+
+    SPDLOG_DEBUG("***** Hello ThreadPool *****");
+
+    // /* 创建初始的numThread个线程 */
+    // createThread(numThread);
+}
+/* 析构函数 */
+ThreadPool::~ThreadPool()
+{
+    SPDLOG_INFO("线程池正在退出...");
+    /* 将stop置为true */
+    {
+        std::unique_lock<std::mutex> lock(m_mutexTask);
+        m_stop = true;
+    }
+
+    SPDLOG_INFO("通知所有工作线程退出...");
+    /* 发送条件变量,通知所有线程 */
+    m_cond_Task.notify_all();
+    /* 等待所有的线程退出并回收完成 */
+    while (!m_mapThreads.empty())
+    {
+        /* 管理线程自己退出了,所以要手动清空容器 */
+        clearThread();
+        std::this_thread::sleep_for(std::chrono::microseconds(100));
+    }
+
+    SPDLOG_INFO("回收管理线程...");
+    /* 先回收管理线程 */
+    m_managerThread.join();
+    SPDLOG_INFO("===== 线程池退出完成 =====");
+}
+
+
+/* 工作线程函数 */
+void ThreadPool::worker()
+{
+    m_threadLiveNum++;
+    while (true)
+    {
+        /* 等待任务队列中有任务 */
+        std::unique_lock<std::mutex> lock(m_mutexTask);
+        /* 这里的wait第二个参数是lamba表达式,被唤醒后再次检查条件是否满足 */
+        m_cond_Task.wait(lock, [this]
+                               { return !m_queue_Tasks.empty() || m_stop ||
+                                        (m_threadExitNum > 0); });
+        /* 任务队列中有任务了,条件变量被唤醒了,先判断是不是需要结束线程 */
+        if (m_stop && m_queue_Tasks.empty())
+        {
+            break;
+        }
+        /* 判断是不是需要销毁多余的线程 */
+        if (m_threadExitNum.load() > 0 )
+        {
+            m_threadExitNum--;
+            /* 再次判断有没有新任务,有就不退出 */
+            if ( m_queue_Tasks.empty())
+            {
+                break;
+            }
+        }
+        /* 取出任务,执行任务 */
+        std::function<void()> task(std::move(m_queue_Tasks.front()));
+        m_queue_Tasks.pop();  /* 取出的任务出队 */
+        lock.unlock();              /* 解锁任务队列 */
+        m_threadRunNum++;     /* 更新线程状态数 */
+        /* 开始执行任务 */
+        task();
+        m_threadRunNum--;         /* 更新线程状态数 */
+    }
+
+    /* 线程结束 */
+    m_threadLiveNum--;
+    /* 将自身ID加入到退出列表中 */
+    {
+        std::unique_lock<std::mutex> lock(m_mutexExitThreadID);
+        m_exitThreadID.emplace_back(std::this_thread::get_id());
+    }
+    /* 使用流将线程ID转换成字符串,不然fmt无法打印
+     * 这里通过hash转换成了size_t */
+    // std::stringstream ss;
+    // ss << std::this_thread::get_id();
+    auto threadID = std::this_thread::get_id();
+    std::hash<std::thread::id> hasher;
+    size_t id = hasher(threadID);
+    SPDLOG_DEBUG("线程ID:{}退出任务循环", id);
+
+    return;
+}
+
+
+/**
+ * @brief 管理者线程,维护线程创建或死亡
+ * 
+ */
+void ThreadPool::managerThread()
+{
+    while (!m_stop)
+    {
+        /* 获取空闲线程的个数 */
+        int num_idle = m_threadLiveNum.load() - m_threadRunNum.load();
+        /* 判断线程是否够用,是否需要创建新的线程 */
+        // SPDLOG_DEBUG("***** 判断是否需要添加线程 *****");
+        if ((num_idle < m_threadMiniIdle.load()) && (m_threadLiveNum.load() < m_threadMaxNum) )
+        {
+            std::unique_lock<std::mutex> lock(m_mutexTask);
+            int numTask = (int)m_queue_Tasks.size();            /* 获取任务队列中的任务个数 */
+            lock.unlock();
+            int numAdd = 0;
+            if(numTask > 0)
+            {
+                /* 任务数 + 存在的线程个数是否大于最大线程数 */
+                if( numTask + m_threadLiveNum.load() <= m_threadMaxNum )
+                {
+                    /* 创建numTask个线程 */
+                    numAdd = numTask;
+                }
+                /* 默认添加的个数 + 存在的线程数是否大于最大线程数 */
+                else if ( (m_threadAddNum.load() + m_threadLiveNum.load()) <= m_threadMaxNum)
+                {
+                    /* 创建m_threadAddNum个线程 */
+                    numAdd = m_threadAddNum.load();
+                }
+                /* 能添加几个线程就添加几个线程 */
+                else
+                {
+                    numAdd = m_threadMaxNum - m_threadLiveNum.load();
+                }
+            }
+            /* 空闲线程数低于设置的最小空闲线程数 */
+            else
+            {
+                numAdd = m_threadMiniIdle.load() - num_idle;
+            }
+            
+            if(numAdd > 0)
+            {
+                // SPDLOG_INFO("需要添加{}个线程", numAdd);
+                createThread(numAdd);
+                continue;   /* 直接下一个循环,无需检查需不需要销毁线程 */
+            }
+            
+        }
+
+        /* 判断空闲线程是否过多,是否需要销毁几个线程 */
+        // SPDLOG_DEBUG("***** 判断是否需要销毁线程 *****");
+        /* 由于没规定每次销毁的线程个数,所以这里使用m_threadAddNum作为每次销毁的标准个数 */
+        if (num_idle > m_threadMaxIdle.load())
+        {
+            int num_Exit = num_idle = m_threadMaxIdle.load();
+            if (num_Exit > m_threadAddNum.load())
+            {
+                num_Exit = m_threadAddNum.load();
+            }
+            m_threadExitNum.exchange(num_Exit);
+
+            SPDLOG_INFO("有{}个线程需要退出", m_threadExitNum.load());
+            /* 唤醒需要退出的num_idle个线程 */
+            for (int i = 0; i < num_Exit; i++)
+            {
+                m_cond_Task.notify_one();
+            }
+        }
+        /* 回收退出的线程 */
+        clearThread();
+
+        // SPDLOG_INFO("线程池中的线程实例个数:{}", m_threads.size());
+        std::this_thread::sleep_for(std::chrono::seconds(1));
+    }
+    SPDLOG_INFO("管理线程退出...");
+}
+
+/**
+ * @brief 创建新的线程
+ *        注意:这里只能使用lambda表达式,或者将do_work变成全局函数,emplace_back会调用thread构造函数将lambda表达式构造成一个std::thread实例
+ *        lambda表达式里是子线程,外面是主线程
+ * 
+ */
+void ThreadPool::createThread(int num)
+{
+    for (int i = 0; i < num; i++)
+    {
+        /* 创建线程,传入工作函数 */
+        std::thread t(&ThreadPool::worker, this);
+        m_mapThreads.insert(std::make_pair( t.get_id(), std::move(t) ));
+    }
+}
+
+/**
+ * @brief 删除线程池中失效的线程实例,使用递归的方法遍历全部
+ * 
+ */
+void ThreadPool::clearThread()
+{
+    for(auto& it : m_exitThreadID)
+    {
+        auto it1 = m_mapThreads.find(it);
+        if(it1 != m_mapThreads.end())
+        {
+            if(it1->second.joinable())
+            {
+                it1->second.join();
+                m_mapThreads.erase(it1);
+            }
+        }
+    }
+    m_exitThreadID.clear();
+}
+
+
+
+/* 获取线程池最大线程的个数 */
+int ThreadPool::getThreadMaxNum()
+{
+    return m_threadMaxNum;
+}
+
+/* 设置线程池最大线程的个数 */
+void ThreadPool::setThreadMaxNum(int num)
+{
+    m_threadMaxNum = num;
+}
+
+/* 获取线程池最大线程的个数 */
+int ThreadPool::getThreadMiniNum()
+{
+    return m_threadMiniNum;
+}
+
+/* 设置线程池最大线程的个数 */
+void ThreadPool::setThreadMiniNum(int num)
+{
+    m_threadMiniNum = num;
+}
+
+/* 获取线程池空闲线程的个数 */
+int ThreadPool::getThreadIdleNum()
+{
+    return m_threadLiveNum.load() - m_threadRunNum.load();
+}
+
+/* 获取线程池正在运行的线程个数 */
+int ThreadPool::getThreadRunNum()
+{
+    return m_threadRunNum.load();
+}
+
+ /* 获取线程池现存的线程个数 */
+int ThreadPool::getThreadLiveNum()
+{
+    return m_threadLiveNum.load();
+}
+
+/* 线程池每次创建线程的个数 */
+int ThreadPool::getThreadAddNum()
+{
+    return m_threadAddNum.load();
+}
+
+/* 设置线程池每次创建线程的个数 */
+void ThreadPool::setThreadAddNum(int num)
+{
+    m_threadAddNum.exchange(num);
+}
+
+/* 线程池最小空闲线程的个数 */
+int ThreadPool::getThreadMiniIdle()
+{
+    return m_threadMiniIdle.load();
+}
+
+/* 设置线程池最小空闲线程的个数 */
+void ThreadPool::setThreadMiniIdle(int num)
+{
+    m_threadMiniIdle.exchange(num);
+}
+
+/* 线程池最大空闲线程的个数 */
+int ThreadPool::getThreadMaxIdle()
+{
+    return m_threadMaxIdle.load();
+}
+
+/* 设置线程池最大空闲线程的个数 */
+void ThreadPool::setThreadMaxIdle(int num)
+{
+    m_threadMaxIdle.exchange(num);
+}

+ 146 - 0
common/ThreadPool/ThreadPool_spdlog.h_

@@ -0,0 +1,146 @@
+#ifndef THREADPOOL_H
+#define THREADPOOL_H
+
+#include <list>
+#include <thread>
+#include <condition_variable>
+#include <queue>
+#include <functional>
+#include <atomic>
+#include <map>
+#include <future>
+
+// #include "spdlog/spdlog.h"
+// #include "fmt/std.h"
+
+/**
+ * @brief   1、使用map存储工作线程,线程id作为键
+ *          2、采用可变参数模板函数添加任务到任务队列中
+ *          3、线程池采用单例模式
+ *          4、有两种添加工作线程的方式,一种带有返回值,一种不带返回值
+ * 
+ * 使用方式:
+ *     1、非成员函数 
+ *          void func(int a, int b) { std::cout << a + b << std::endl; }
+ *          CPPTP.add_task(func, 1, 2);
+ *     2、成员函数 
+ *          void A::func(int a, int b) { std::cout << a + b << std::endl; }
+ *          A a;
+ *          CPPTP.add_task(&A::func, &a, 1, 2);
+ * 
+ */
+
+#define CPPTP ThreadPool::getInstance()
+
+class ThreadPool
+{
+private:
+    ThreadPool();
+    ThreadPool(const ThreadPool &tp) = delete;
+    ThreadPool &operator=(const ThreadPool &tp) = delete;
+
+
+public:
+    
+    static ThreadPool& getInstance()
+    {
+        static ThreadPool tp;
+        return tp;
+    }
+
+    ~ThreadPool();
+
+    /**
+     * @brief 向任务队列添加任务函数
+     * 
+     * @tparam F 函数(指针?)
+     * @tparam Args 参数包
+     * @param f 万能引用
+     * @param args 万能引用
+     */
+    template <typename F, typename... Args>
+    void add_task(F &&f, Args &&...args)
+    {
+        /* 将函数参数和函数绑定 */
+        std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
+        /* 作用域是用于解锁lock */
+        {
+            std::unique_lock<std::mutex> lock(m_mutexTask);
+            m_queue_Tasks.emplace(std::move(task));         /* 入队 */
+        }
+        /* 唤醒一个线程 */
+        m_cond_Task.notify_one();
+    }
+
+    /* 添加带有返回值的任务函数
+     * future内部定义了一个类型std::result_of<F(Args...)>::Type */
+    template<typename F, typename... Args>
+    auto add_task_with_ret(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
+    {
+        /* 将类型去一个别名 */
+        using  resultType = typename std::result_of<F(Args...)>::type;
+        /* 将传入的函数与参数绑定并包装成一个可调用的指针,使用智能指针管理 */
+        auto task = std::make_shared<std::packaged_task<resultType()>>(
+            /* 使用forward完美转发,防止右值变成了左值 */
+            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
+        );
+        /* 获取future对象,用于获取返回值 */
+        std::future<resultType> res = task->get_future();
+        {
+            std::unique_lock<std::mutex> lock(m_mutexTask);
+            /* 使用lamba表达式调用封装的函数 */
+            m_queue_Tasks.emplace([task](){
+                (*task)();
+            });
+        }
+        /* 解锁条件变量 */
+        m_cond_Task.notify_one();
+        /* 返回值,函数调用完成后,可以通过这个返回值获取任务的返回值 */
+        return res;
+    }
+
+    /******  获取当前线程池在运行的线程个数,空闲线程的个数  ******/
+    int getThreadMaxNum();                      /* 获取线程池最大线程的个数 */
+    void setThreadMaxNum(int num);              /* 设置线程池最大线程的个数 */
+    int getThreadMiniNum();                     /* 获取线程池最小线程的个数 */
+    void setThreadMiniNum(int num);             /* 设置线程池最小线程的个数 */
+    int getThreadIdleNum();                     /* 获取线程池空闲线程的个数 */
+    int getThreadRunNum();                      /* 获取线程池正在运行的线程个数 */
+    int getThreadLiveNum();                     /* 获取线程池现存的线程个数 */
+    int getThreadAddNum();                      /* 线程池每次创建线程的个数 */
+    void setThreadAddNum(int num);              /* 设置线程池每次创建线程的个数 */
+    int getThreadMiniIdle();                    /* 线程池最小空闲线程的个数 */
+    void setThreadMiniIdle(int num);            /* 设置线程池最小空闲线程的个数 */
+    int getThreadMaxIdle();                     /* 线程池最大空闲线程的个数 */
+    void setThreadMaxIdle(int num);             /* 设置线程池最大空闲线程的个数 */
+
+private:
+    void worker();                              /* 工作线程函数 */
+    void managerThread();                       /* 维护线程池的线程,如提前创建线程,销毁线程 */
+    void createThread(int num);                 /* 创建新的线程 */
+    void clearThread();                         /* 清除失效的线程实例 */
+
+private:
+    class OneThread;                                    /* 类声明 */
+    std::map<std::thread::id, std::thread> m_mapThreads;/* 线程数组,维护线程池 */
+    std::list<std::thread::id> m_exitThreadID;          /* 已经退出任务函数的线程ID */
+    std::queue<std::function<void(void)>> m_queue_Tasks;/* 任务队列,任务函数都是无返回值的函数 */
+    std::condition_variable m_cond_Task;                /* 条件变量,没有任务的时候阻塞住 */
+
+    bool m_stop;                                        /* 线程是否停止 */
+    std::mutex m_mutexExitThreadID;                     /* 互斥锁,搭配环境变量使用,对已经退出的线程ID上锁 */
+    std::mutex m_mutexTask;                             /* 互斥锁,搭配环境变量使用,对任务队列上锁 */
+    std::thread m_managerThread;                        /* 管理线程 */
+
+    int m_threadMaxNum = 256;                               /* 默认最大线程数 */
+    int m_threadMiniNum = 5;                                /* 默认线程池中最小线程数 */
+    std::atomic<int> m_threadAddNum;                        /* 每次需要添加的线程个数 */
+    std::atomic<int> m_threadMiniIdle;                      /* 如果空闲线程小于这个数,就添加m_ThreadAddNum个线程 */
+    std::atomic<int> m_threadMaxIdle;                       /* 最大空闲的线程个数,超过这个个数一定时间,就会销毁多余的 */
+
+    std::atomic<int> m_threadRunNum;                        /* 正在运行的线程个数 */
+    std::atomic<int> m_threadLiveNum;                       /* 现有的线程个数 */
+    std::atomic<int> m_threadExitNum;                       /* 需要销毁的线程个数 */
+};
+
+#endif /* THREADPOOL_H */