基于C++11的线程池实现

线程池

    线程池是对一组线程的管理,可以复用已有线程,并不会执行每次任务的时候新开线程,这在效率上和系统开支上都有极大的好处。以往线程池往往用在服务端开发上,目前随着移动端开发的兴起成熟,端上的复杂逻辑实现往往也需要用到线程池。拿Android开发举例,请求网络的基础框架往往会有一个线程池来管理请求和重用线程,加载图片的基础框架也往往需要用到线程池来管理忙于加载图片的工作线程。Android开发是基于Java语言的,Java语言提供了完备的线程池支持,看一段代码。

class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

   public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }

   public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

 class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
 }

    Java中的线程池提供了有限的几个对外接口,使用上十分方便。如今C++11标准赋予了C++语言同样的能力,下面结合之前几篇文章中介绍的C++11的新特性,提供一个基于C++11的线程池实现,使用起来同样如Java一样简单方便。直接看代码

C++11版本的线程池实现

#include <thread>
#include <mutex>
#include <vector>
#include <deque>
#include <condition_variable>
#include <atomic>

namespace stdx
{
    class ThreadPool
    {
    private:
        std::size_t m_pool_size;
        std::size_t m_pool_size_max_growth;
        enum
        {
            DEFAULT_THREAD_POOL_COUNT = 5,
            DEFAULT_THREAD_MAX_COUNT = 24,
            DEFAULT_THREAD_POOL_COUNT_THRESHOLD = 128
        };
        std::vector<std::thread> m_threads;
        std::deque<std::function<void()> > m_threadfuncs;

        std::condition_variable m_cv;
        std::mutex m_mutex;

        std::atomic_int m_atmoic_working_couter;
        std::atomic_bool m_atmoic_quit_sign;

        std::atomic_bool m_atmoic_fifo_lifo;

        using LOCK = std::unique_lock<std::mutex>;

        void init()
        {
            for (int i = 0; i < m_pool_size; ++i)
            {
                pushback_thread();
            }
        }

        //should invoked in lockguard
        void pushback_thread()
        {
            m_threads.emplace_back([this]() -> void
                                   {
                                       while (!m_atmoic_quit_sign.load())
                                       {
                                           std::function<void()> fn;
                                           try
                                           {
                                               LOCK lock(m_mutex);
                                               m_cv.wait(lock,
                                                         [this]() -> bool
                                                         {
                                                             return !m_threadfuncs.empty() ||
                                                                    m_atmoic_quit_sign.load();
                                                         }
                                               );
                                               if (m_atmoic_quit_sign.load())
                                               {
                                                   break;
                                               }
                                               if (!m_threadfuncs.empty())
                                               {
                                                   if (m_atmoic_fifo_lifo.load())
                                                   {
                                                       fn = std::move(m_threadfuncs.front());
                                                       m_threadfuncs.pop_front();
                                                   }
                                                   else
                                                   {
                                                       fn = std::move(m_threadfuncs.back());
                                                       m_threadfuncs.pop_back();
                                                   }

                                               }
                                               lock.unlock();
                                               ++m_atmoic_working_couter;
                                               if (fn)
                                               {
                                                   fn();
                                               }
                                               --m_atmoic_working_couter;
                                           }
                                           catch (...)
                                           {
                                               std::cout << "catch exp:" << __LINE__ << std::endl;
                                           }

                                       }
                                   });
        }

        void uninit()
        {
            for (auto &thread : m_threads)
            {
                if (thread.joinable())
                {
                    thread.join();
                }
            }
            m_threads.clear();
        }

        //should invoked in lockguard
        bool has_idle()
        {
            return m_atmoic_working_couter.load() < m_threads.size();
        }

    public:
        ThreadPool() : ThreadPool(DEFAULT_THREAD_POOL_COUNT, DEFAULT_THREAD_MAX_COUNT)
        {

        }

        ThreadPool(std::size_t count_begin, std::size_t count_max_growth) : m_pool_size(count_begin),
                                                                            m_pool_size_max_growth(count_max_growth),
                                                                            m_atmoic_working_couter(0),
                                                                            m_atmoic_quit_sign(false),
                                                                            m_atmoic_fifo_lifo(true)
        {
            if (m_pool_size > DEFAULT_THREAD_POOL_COUNT_THRESHOLD)
            {
                m_pool_size = DEFAULT_THREAD_POOL_COUNT_THRESHOLD;
            }
            if (m_pool_size_max_growth > DEFAULT_THREAD_POOL_COUNT_THRESHOLD)
            {
                m_pool_size_max_growth = DEFAULT_THREAD_POOL_COUNT_THRESHOLD;
            }
            init();
        }

        ~ThreadPool()
        {
            quit();
            uninit();
        }

        ThreadPool(const ThreadPool &) = delete;

        ThreadPool(ThreadPool &&) = delete;

        void operator=(const ThreadPool &) = delete;

        void operator=(ThreadPool &&) = delete;
        ///////////////////////////////////////////////////////////////

        template<typename Fn, typename ...Params>
        void commit(Fn &&fn, Params &&... params)
        {
            try
            {
                LOCK lock(m_mutex);

                if (!has_idle() && m_threads.size() < m_pool_size_max_growth)
                {
                    pushback_thread();
                }

                m_threadfuncs.emplace_back(
                        std::bind(std::forward<Fn>(fn), std::forward<Params>(params)...)
                );
                m_cv.notify_one();
                lock.unlock();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }
        }

        void quit()
        {
            m_atmoic_quit_sign.store(true);
            try
            {
                LOCK lock(m_mutex);
                m_cv.notify_all();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }
        }

        std::size_t get_pending_count()
        {
            std::size_t count = 0;
            try
            {
                LOCK lock(m_mutex);
                count = m_threadfuncs.size();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }

            return count;
        }

        std::size_t get_working_thread_count()
        {
            std::size_t count = 0;
            try
            {
                LOCK lock(m_mutex);
                count = m_threads.size();
            }
            catch (...)
            {
                std::cout << "catch exp:" << __LINE__ << std::endl;
            }

            return count;
        }

        int get_working_count()
        {
            return m_atmoic_working_couter.load();
        }

        void set_execute_fifo_lifo(bool b)
        {
            m_atmoic_fifo_lifo.store(b);
        }


    };
}

测试代码如下

class ThreadPoolTest
{
#define SLEEP(x) std::this_thread::sleep_for(x)
    using TIMESECONDS = std::chrono::seconds;
    using TIMEMILLI = std::chrono::milliseconds;
public:
    static void callback(std::string str, float f, const char *sztmp)
    {
        std::cout << "commit2:" << str << " " << f << sztmp << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));
    }

    struct functor
    {
        void operator()()
        {
            std::cout << "commit3:" << "nothing" << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(3));
        }
    };

    template<typename CALLBACK>
    static void thread_callback(int a, CALLBACK fn)
    {
        std::cout << "commit 5:" << a << std::endl;
        fn();
    }

    static void execute()
    {
        stdx::ThreadPool thread_pool(1, 5);
//        thread_pool.set_execute_fifo_lifo(false);

        thread_pool.commit(
                [](int a, char b) -> int
                {
                    std::cout << "commit1:" << a << " " << b << std::endl;
                    SLEEP(TIMESECONDS(3));
                    return 12;
                },
                123,
                'c'
        );

        SLEEP(TIMEMILLI (10));

        thread_pool.commit(
                callback,
                std::string("456"),
                1.1f,
                "789"
        );

        thread_pool.commit(
                functor()
        );

        auto lambdacallback = []() -> void
        {
            std::cout << "thread rans end" << std::endl;
        };
        thread_pool.commit(thread_callback<decltype(lambdacallback)>,
                           999,
                           lambdacallback
        );

        for (int i = 0; i < 10; ++i)
        {
            thread_pool.commit(
                    [=]
                    {
                        std::cout << "commitn:" << i << std::endl;
                        if (i == 9)
                        {
                            SLEEP(TIMESECONDS(0));
                        }
                    }
            );
        }

        SLEEP(TIMESECONDS(1));

        std::cout << "idle?:" << thread_pool.has_idle() << std::endl;
        std::cout << "pending count:" << thread_pool.get_pending_count() << std::endl;

        SLEEP(TIMESECONDS(3));
        std::cout << "working count:" << thread_pool.get_working_count() << std::endl;

        SLEEP(TIMESECONDS(5));
    }
};

    上面的代码在windows平台,mac平台,android平台均通过了测试,iOS平台没有进行测试但问题应该不大。ThreadPool类管理了若干线程和线程要执行的函数,线程执行函数是一个队列可以指定为先进先出和后进先出两种模式。测试代码中用了普通函数、functor、lambda作为传入参数对commit进行测试,依照个人使用习惯各种线程函数方式均支持,并且后续线程函数需要的参数可以任意。如果需要线程函数执行完后回调通知,则可以将回调函数作为参数传递给线程函数,上面的测试代码也包含了这种情况。
    

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,077评论 25 707
  • 尊敬的老师亲爱的同学们大家好,我是11组的高俊平,我来自美丽的冰城——哈尔滨。有幸遇到易效能遇到叶老师完全...
    gao816阅读 105评论 0 0
  • 亲爱的7组家人们,早安。我是63号陈小平。很感恩从天使3.0进阶到4.0,继续践行努力并过上线上人生,跟一班满满正...
    0be604d101a2阅读 233评论 2 7
  • 我看过很多电影,比如《哈利波特》、《速度与激情》、《生化危机》……但没有任何一部电影比这部电影更让我印象深刻了...
    影子_8b27阅读 184评论 0 0