Sfoglia il codice sorgente

Implement cpuMask-based near-singleton global Thread Pool

Jules 3 settimane fa
parent
commit
110ae7f645

+ 29 - 30
source/backend/cpu/CPUBackend.cpp

@@ -124,7 +124,7 @@ void CPURuntime::_bindCPUCore() const {
                 break;
         }
     }
-        // Set CPU Affinity
+    // Set CPU Affinity
 #ifdef _OPENMP
     auto threadsNumber = mThreadNumber;
     std::vector<int> result(threadsNumber, 0);
@@ -134,12 +134,14 @@ void CPURuntime::_bindCPUCore() const {
     }
 #endif
 #ifdef MNN_USE_THREAD_POOL
-    ThreadPool::active(mThreadNumber);
-    ThreadPool::enqueue(std::make_pair([&](int i) {
-        MNNSetSchedAffinity(lockCPUIndexes[i].first, lockCPUIndexes[i].second);
-        return 0;
-    }, mThreadNumber), mTaskIndex, mThreadNumber);
-    ThreadPool::deactive(mThreadNumber);
+    if(mThreadPool) {
+        mThreadPool->active();
+        mThreadPool->enqueue(std::make_pair([&](int i) {
+            MNNSetSchedAffinity(lockCPUIndexes[i].first, lockCPUIndexes[i].second);
+            return 0;
+        }, mThreadNumber), mTaskIndex);
+        mThreadPool->deactive();
+    }
 #endif
 }
 
@@ -147,17 +149,14 @@ void CPURuntime::_resetThreadPool() {
     mThreadNumber = std::max(1, mThreadNumber);
     mThreadNumber = std::min(mThreadNumber, MAX_THREAD_NUMBER);
 #ifdef MNN_USE_THREAD_POOL
-    ThreadPool::releaseWorkIndex(mTaskIndex);
-    auto cpuInfo = MNNGetCPUInfo();
-    if (mThreadNumber > 1) {
-        int systemThreadNumber = (int)cpuInfo->cpuNumber;
-        if (systemThreadNumber == 0) {
-            systemThreadNumber = mThreadNumber;
-        }
-        mThreadNumber = ALIMIN(ThreadPool::init(systemThreadNumber), mThreadNumber);
+    if (mThreadPool) {
+        mThreadPool->releaseWorkIndex(mTaskIndex);
     }
     if (mThreadNumber > 1) {
-        mTaskIndex = ThreadPool::acquireWorkIndex();
+        mThreadNumber = ALIMIN(ThreadPool::init(mThreadNumber, mCpuMask, mThreadPool), mThreadNumber);
+        if (mThreadPool) {
+            mTaskIndex = mThreadPool->acquireWorkIndex();
+        }
         if (-1 == mTaskIndex) {
             MNN_ERROR("The ThreadPool has been used to MNN_THREAD_POOL_MAX_TASKS, can't use thread pool\n");
             mThreadNumber = 1;
@@ -235,8 +234,9 @@ void CPURuntime::onReset(int numberThread, const BackendConfig* config, bool ful
         }
     }
     mThreadNumber = numberThread;
-    _resetThreadPool();
     _validateCpuIds();
+    mCpuMask = MNNGetCPUMask(mCpuIds);
+    _resetThreadPool();
 }
 
 CPURuntime::CPURuntime(const Backend::Info& info) {
@@ -258,8 +258,9 @@ CPURuntime::CPURuntime(const Backend::Info& info) {
         mFlags = info.user->flags;
         mCpuIds = info.user->cpuIds;
     }
-    _resetThreadPool();
     _validateCpuIds();
+    mCpuMask = MNNGetCPUMask(mCpuIds);
+    _resetThreadPool();
 #ifdef LOG_VERBOSE
     MNN_PRINT("create CPURuntime:%p\n", this);
 #endif
@@ -267,7 +268,9 @@ CPURuntime::CPURuntime(const Backend::Info& info) {
 
 CPURuntime:: ~ CPURuntime() {
 #ifdef MNN_USE_THREAD_POOL
-    ThreadPool::releaseWorkIndex(mTaskIndex);
+    if(mThreadPool) {
+        mThreadPool->releaseWorkIndex(mTaskIndex);
+    }
 #endif
 }
 float CPURuntime::onGetMemoryInMB() {
@@ -406,9 +409,9 @@ void CPURuntime::onGabageCollect(int level) {
 void CPURuntime::onConcurrencyBegin() const {
 #ifdef MNN_USE_THREAD_POOL
     if (mTaskIndex >= 0) {
-        if (mThreadOpen == 0) {
+        if (mThreadOpen == 0 && mThreadPool) {
             // mThreadOpen 0 -> 1, open ThreadPool
-            ThreadPool::active(mThreadNumber);
+            mThreadPool->active();
         }
         mThreadOpen++;
     }
@@ -427,8 +430,8 @@ void CPURuntime::onConcurrencyEnd() const {
         MNN_ASSERT(mThreadOpen > 0);
         mThreadOpen--;
         mThreadOpen = mThreadOpen < 0 ? 0 : mThreadOpen;
-        if (0 == mThreadOpen) {
-            ThreadPool::deactive(mThreadNumber);
+        if (0 == mThreadOpen && mThreadPool) {
+            mThreadPool->deactive();
         }
     }
 #endif
@@ -464,6 +467,9 @@ CPUBackend::CPUBackend(const CPURuntime* runtime, BackendConfig::PrecisionMode p
     mMemory = memory;
     mRuntime = const_cast<CPURuntime*>(runtime);
     mThreadNumber = mRuntime->mThreadNumber;
+#ifdef MNN_USE_THREAD_POOL
+    mThreadPool = mRuntime->mThreadPool;
+#endif
     // Compute Group Rate
     do {
         if (mThreadNumber <= 1 || mRuntime->mPower == BackendConfig::Power_Low) {
@@ -520,13 +526,6 @@ CPUBackend::CPUBackend(const CPURuntime* runtime, BackendConfig::PrecisionMode p
         mCacheGroup[i].reset(new CPUResizeCache);
     }
     mCache = mCacheGroup[0].get();
-#if 0
-#ifndef MNN_FORBIT_MULTI_THREADS
-    if (initThreadNumber > 0) {
-        mInitWorkQueue.reset(new WorkerThread(initThreadNumber));
-    }
-#endif
-#endif
 }
 
 CPUBackend::~CPUBackend() {

+ 10 - 10
source/backend/cpu/CPUBackend.hpp

@@ -17,6 +17,10 @@
 #include "core/BufferAllocator.hpp"
 #include "MNN_generated.h"
 
+#ifdef MNN_USE_THREAD_POOL
+#include "ThreadPool.hpp"
+#endif
+
 #ifdef MNN_KLEIDIAI_ENABLED
 #include "arm/mnn_kleidiai.h"
 #endif
@@ -45,11 +49,6 @@ public:
     virtual void onConcurrencyEnd() const override;
     virtual bool onCheckInfo(Backend::Info& info) const override;
 
-#ifdef MNN_USE_THREAD_POOL
-    inline bool multiThreadValid() const {
-        return mThreadOpen;
-    }
-#endif
     SingleBufferWithAllocator* buffer(int index) const;
     BufferAllocator* createDynamicBufferAlloctor(int index) const;
 
@@ -60,9 +59,11 @@ private:
     mutable std::shared_ptr<EagerBufferAllocator> mStaticAllocator;
     int mThreadNumber;
     std::vector<int> mCpuIds;
+    unsigned long mCpuMask;
 #ifdef MNN_USE_THREAD_POOL
     mutable int mTaskIndex = -1;
     mutable int mThreadOpen = 0;
+    ThreadPool* mThreadPool = nullptr;
 #endif
     BackendConfig::MemoryMode mMemory;
     BackendConfig::PowerMode mPower;
@@ -151,11 +152,6 @@ public:
     inline int threadNumber() const {
         return mThreadNumber;
     }
-#ifdef MNN_USE_THREAD_POOL
-    inline bool threadOpen() const {
-        return mRuntime->mThreadOpen > 0;
-    }
-#endif
 
     BufferAllocator* getBufferAllocator(bool defer_allocator = true) const {
         return mDmaInfo->mCurrentDynamicAllocator;
@@ -175,6 +171,7 @@ public:
 
 #ifdef MNN_USE_THREAD_POOL
     inline int taskIndex() const {return mRuntime->mTaskIndex;}
+    inline ThreadPool* threadPool() const {return mRuntime->mThreadPool;}
 #endif
     static void initCreatorMap();
     static size_t getBytes(const Backend* backend, const Tensor* output);
@@ -189,6 +186,9 @@ protected:
 private:
     mutable std::shared_ptr<WorkerThread> mInitWorkQueue;
     int mThreadNumber;
+#ifdef MNN_USE_THREAD_POOL
+    ThreadPool* mThreadPool = nullptr;
+#endif
     std::vector<std::pair<float, int>> mGroupWithComputeRate;
     float mComputeI = 0.f;
 

+ 1 - 1
source/backend/cpu/CPURuntime.cpp

@@ -148,7 +148,7 @@ int MNNSetSchedAffinity(const int* cpuIDs, int size) {
     return 0;
 }
 
-unsigned long MNNGetCPUMask(const std::vector<int>& cpuIds) {
+cpu_mask_t MNNGetCPUMask(const std::vector<int>& cpuIds) {
     /**
      * [cpu_set_t](https://man7.org/linux/man-pages/man3/CPU_SET.3.html) is a
      * statically-sized CPU set. See `CPU_ALLOC` for dynamically-sized CPU sets.

+ 2 - 2
source/backend/cpu/CPURuntime.hpp

@@ -25,10 +25,10 @@ struct MNNCPUInfo {
     std::vector<CPUGroup> groups;
     int cpuNumber = 0;
 };
-
+using cpu_mask_t = unsigned long;
 int MNNSetSchedAffinity(const int* cpuIDs, int size);
 int MNNGetCurrentPid();
-unsigned long MNNGetCPUMask(const std::vector<int>& cpuIds);
+cpu_mask_t MNNGetCPUMask(const std::vector<int>& cpuIds);
 const MNNCPUInfo* MNNGetCPUInfo();
 
 #endif /* CPUInfo_hpp */

+ 43 - 60
source/backend/cpu/ThreadPool.cpp

@@ -8,41 +8,43 @@
 #ifdef MNN_USE_THREAD_POOL
 #include "backend/cpu/ThreadPool.hpp"
 #include <string.h>
+#include <unordered_map>
 #include <MNN/MNNDefine.h>
+#include "ThreadPool.hpp"
 
 #define MNN_THREAD_POOL_MAX_TASKS 2
 namespace MNN {
-ThreadPool* ThreadPool::gInstance = nullptr;
+static std::unordered_map<long int, ThreadPool*> gInstances;
 static std::mutex gInitMutex;
-int ThreadPool::init(int number) {
-    if (1 >= number) {
-        return 1;
+int ThreadPool::init(int numberThread, unsigned long cpuMask, ThreadPool*& threadPool) {
+    if (1 >= numberThread) {
+        numberThread = 1;
     }
     std::lock_guard<std::mutex> _l(gInitMutex);
-    if (nullptr != gInstance) {
-        if (gInstance->number() < number) {
-            return gInstance->number();
-        }
+
+    if (gInstances.find(cpuMask) == gInstances.end()){
+        gInstances[cpuMask] = new ThreadPool(numberThread);
     }
-    if (nullptr == gInstance) {
-        gInstance = new ThreadPool(number);
+    threadPool = gInstances[cpuMask];
+    if (gInstances[cpuMask]->numberThread() < numberThread){
+        return gInstances[cpuMask]->numberThread();
     }
-    return number;
+    return numberThread;
 }
+
 void ThreadPool::destroy() {
     std::lock_guard<std::mutex> _l(gInitMutex);
-    if (nullptr != gInstance) {
-        delete gInstance;
-        gInstance = nullptr;
+    for (auto i= gInstances.begin(); i != gInstances.end(); i++){
+        if (i->second){
+            delete i->second;
+        }
     }
+    gInstances.clear();
 }
 
 ThreadPool::ThreadPool(int numberThread) {
     mNumberThread = numberThread;
-    mActiveCount.resize(numberThread);
-    for (int i=0; i<numberThread; ++i) {
-        mActiveCount[i] = new std::atomic_int(0);
-    }
+    mActiveCount  = 0;
     mTaskAvailable.resize(MNN_THREAD_POOL_MAX_TASKS);
     mTasks.resize(MNN_THREAD_POOL_MAX_TASKS);
     for (int t = 0; t < mTasks.size(); ++t) {
@@ -55,7 +57,7 @@ ThreadPool::ThreadPool(int numberThread) {
         int threadIndex = i;
         mWorkers.emplace_back([this, threadIndex]() {
             while (!mStop) {
-                while (*mActiveCount[threadIndex] > 0) {
+                while (mActiveCount > 0) {
                     for (int i = 0; i < MNN_THREAD_POOL_MAX_TASKS; ++i) {
                         if (*mTasks[i].second[threadIndex]) {
                             mTasks[i].first.first(threadIndex);
@@ -65,7 +67,7 @@ ThreadPool::ThreadPool(int numberThread) {
                     std::this_thread::yield();
                 }
                 std::unique_lock<std::mutex> _l(mQueueMutex);
-                mCondition.wait(_l, [this, threadIndex] { return mStop || *mActiveCount[threadIndex] > 0; });
+                mCondition.wait(_l, [this] { return mStop || mActiveCount > 0; });
             }
         });
     }
@@ -85,82 +87,63 @@ ThreadPool::~ThreadPool() {
             delete c;
         }
     }
-    for (int i=0; i<mActiveCount.size(); ++i) {
-        delete mActiveCount[i];
-    }
 }
 
 int ThreadPool::acquireWorkIndex() {
-    if (nullptr == gInstance) {
-        return -1;
-    }
-    std::lock_guard<std::mutex> _l(gInstance->mQueueMutex);
+    std::lock_guard<std::mutex> _l(mQueueMutex);
     for (int i = 0; i < MNN_THREAD_POOL_MAX_TASKS; ++i) {
-        if (gInstance->mTaskAvailable[i]) {
-            gInstance->mTaskAvailable[i] = false;
+        if (mTaskAvailable[i]) {
+            mTaskAvailable[i] = false;
             return i;
         }
     }
     return -1;
 }
 void ThreadPool::releaseWorkIndex(int index) {
-    if (nullptr == gInstance) {
-        return;
-    }
     if (index < 0 || index >= MNN_THREAD_POOL_MAX_TASKS) {
         return;
     }
-    std::lock_guard<std::mutex> _l(gInstance->mQueueMutex);
-    gInstance->mTaskAvailable[index] = true;
+    std::lock_guard<std::mutex> _l(mQueueMutex);
+    mTaskAvailable[index] = true;
 }
 
-void ThreadPool::active(int threadNumber) {
-    if (nullptr == gInstance) {
-        return;
-    }
+void ThreadPool::active() {
     {
-        std::lock_guard<std::mutex> _l(gInstance->mQueueMutex);
-        for (int i=0; i<threadNumber; ++i) {
-            (*gInstance->mActiveCount[i])++;
-        }
+        std::lock_guard<std::mutex> _l(mQueueMutex);
+        mActiveCount++;
     }
-    gInstance->mCondition.notify_all();
+    mCondition.notify_all();
 }
-void ThreadPool::deactive(int threadNumber) {
-    if (nullptr == gInstance) {
-        return;
-    }
-    for (int i=0; i<threadNumber; ++i) {
-        (*gInstance->mActiveCount[i])--;
-    }
+void ThreadPool::deactive() {
+    mActiveCount--;
 }
 
-void ThreadPool::enqueue(TASK&& task, int index, int threadNumber) {
+void ThreadPool::enqueue(TASK&& task, int index) {
     if (1 >= task.second || 0 > index) {
         for (int i = 0; i < task.second; ++i) {
             task.first(i);
         }
         return;
     }
-    MNN_ASSERT(nullptr != gInstance);
-    gInstance->enqueueInternal(std::move(task), index, threadNumber);
+    enqueueInternal(std::move(task), index);
 }
-void ThreadPool::enqueueInternal(TASK&& task, int index, int threadNumber) {
-    if (threadNumber <= 1) {
+void ThreadPool::enqueueInternal(TASK&& task, int index) {
+    if (mActiveCount == 0) {
         for (int i = 0; i < task.second; ++i) {
             task.first(i);
         }
         return;
     }
     int workSize = task.second;
-    if (workSize > threadNumber) {
+    if (workSize > mNumberThread) {
         mTasks[index].first = std::make_pair(
-            [workSize, &task, threadNumber, this](int tId) {
-                for (int v = tId; v < workSize; v += threadNumber) {
+            [workSize, &task, this](int tId) {
+                for (int v = tId; v < workSize; v += mNumberThread) {
                     task.first(v);
                 }
-            },threadNumber);
-        workSize = threadNumber;
+            },
+            mNumberThread);
+        workSize = mNumberThread;
     } else {
         mTasks[index].first = std::move(task);
     }

+ 10 - 11
source/backend/cpu/ThreadPool.hpp

@@ -22,25 +22,24 @@ class MNN_PUBLIC ThreadPool {
 public:
     typedef std::pair<std::function<void(int)>, int> TASK;
 
-    int number() const {
+    int numberThread() const {
         return mNumberThread;
     }
-    static void enqueue(TASK&& task, int index, int threadNumber);
+    void enqueue(TASK&& task, int index);
 
-    static void active(int threadNumber);
-    static void deactive(int threadNumber);
+    void active();
+    void deactive();
 
-    static int acquireWorkIndex();
-    static void releaseWorkIndex(int index);
+    int acquireWorkIndex();
+    void releaseWorkIndex(int index);
 
-    static int init(int number);
+    static int init(int numberThread, unsigned long cpuMask, ThreadPool*& threadPool);
     static void destroy();
 
 private:
-    void enqueueInternal(TASK&& task, int index, int threadNumber);
+    void enqueueInternal(TASK&& task, int index);
 
-    static ThreadPool* gInstance;
-    ThreadPool(int number = 0);
+    ThreadPool(int numberThread = 0);
     ~ThreadPool();
 
     std::vector<std::thread> mWorkers;
@@ -52,7 +51,7 @@ private:
     std::mutex mQueueMutex;
 
     int mNumberThread            = 0;
-    std::vector<std::atomic_int*> mActiveCount;
+    std::atomic_int mActiveCount = {0};
 };
 } // namespace MNN
 #endif

+ 2 - 1
source/core/Concurrency.h

@@ -28,7 +28,8 @@
     }                                                              \
     ;                                                              \
     auto cpuBn = (CPUBackend*)backend();                           \
-    MNN::ThreadPool::enqueue(std::move(task), cpuBn->taskIndex(), cpuBn->threadOpen() ? cpuBn->threadNumber() : 1); \
+    auto thrPl = cpuBn->threadPool();                              \
+    thrPl->enqueue(std::move(task), cpuBn->taskIndex());           \
     }
 
 #else

+ 7 - 6
test/core/ThreadPoolTest.cpp

@@ -20,18 +20,19 @@ public:
         std::vector<std::thread> threads;
         for (int i = 0; i < 10; ++i) {
             threads.emplace_back([i]() {
-                int number = MNN::ThreadPool::init(10 - i);
+                MNN::ThreadPool* threadPool = nullptr;
+                MNN::ThreadPool::init(10 - i, 0, threadPool);
                 // initializer
-                auto workIndex = ThreadPool::acquireWorkIndex();
+                auto workIndex = threadPool->acquireWorkIndex();
                 FUNC_PRINT(workIndex);
-                ThreadPool::active(number);
+                threadPool->active();
                 auto func = [](int index) {
                     FUNC_PRINT(index);
                     std::this_thread::yield();
                 };
-                ThreadPool::enqueue(std::make_pair(std::move(func), 10), workIndex, number);
-                ThreadPool::deactive(number);
-                ThreadPool::releaseWorkIndex(workIndex);
+                threadPool->enqueue(std::make_pair(std::move(func), 10), workIndex);
+                threadPool->deactive();
+                threadPool->releaseWorkIndex(workIndex);
             });
         }
         for (auto& t : threads) {