[C++] ใช้ CPU ให้เต็มที่ด้วย Boost Thread Pool

เมื่อวานมีงานที่ต้อง process ข้อมูลบางอย่างด้วย C++ เลยพยายามใช้ CPU ให้เต็มที่ทุก cores เท่าที่มีให้มากสุด ซึ่งก็ได้มาท่านึงก็คือใช้ library Boost Thread Pool เข้าช่วย

โจทย์สมมติว่า เรามีข้อมูล n ตัว เราต้องการโปรเซสข้อมูลของการจับคู่ของมูลเช่น

n = [1, 2, 3]

แล้วเรามีฟังก์ชัน process ข้อมูล ที่รับสองพารามิเตอร์แบบนี้

void process(int a, int b)
{
        cout << a << "+" << b << " = " << a + b << "\n";
}

ตัวอย่างโค้ดแบบไม่ได้ใช้ Thread Pool ช่วยจะเป็นแบบนี้

#include <iostream>
#include <vector>

using namespace std;

void process(int a, int b)
{
        cout << a << "+" << b << " = " << a + b << "\n";
}

int main()
{
        vector<int> vs = {1, 2, 3, 4, 5};

        for (const auto &a : vs)
        {
                for (const auto &b : vs)
                {
                        process(a, b);
                }
        }
        return EXIT_SUCCESS;
}

(ป.ล. ของจริงที่ใช้ไม่ใช่แค่ print เลขบวกกัน แต่เมื่อให้เข้าใจง่ายเฉยๆ ต้องจินตนาการว่า process เป็นงานที่คำนวณหนักๆหน่อย :D)

ทีนี้เราอยากให้ตอน process แทนทีจะทำทีละ 1 แบบ sequential เราอยากใช้ thread ช่วยเพื่อให้ใช้ cores CPU ได้เต็มที่ เราเอา Boost มาช่วยได้โดยโค้ดจะเป็นแบบนี้แทน

#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>

#include <iostream>
#include <vector>
#include <thread>
#include <functional>

using namespace std;

void process(boost::mutex &io_mutex, int a, int b)
{
        boost::mutex::scoped_lock scoped_lock(io_mutex);
        cout << a << "+" << b << " = " << a + b << "\n";
}

int main()
{
        // สร้าง thread_pool object เก็บในตัวแปร pool
        // โดยกำหนดให้จำนวน thread เท่ากับ core ของ CPU ของเครื่องที่รัน
        // เช็คได้จาก function hardward_concurrency
        boost::asio::thread_pool pool(thread::hardware_concurrency());

        // สร้าง mutex เพื่อช่วยให้ตอน cout ไม่เกิด race condition ที่ทำให้ผลลัพธ์เพี้ยนๆ
        boost::mutex io_mutex;

        vector<int> vs = {1, 2, 3, 4, 5};

        for (const auto &a : vs)
        {
                for (const auto &b : vs)
                {
                        // ส่งฟังก์ชันไปรันใน thread pool ด้วยฟังก์ชัน post
                        // ซึ่งเราต้องใช้ std::bind ฟังก์ชันกับ parameter ของฟังก์ชันก่อนส่งให้ post ด้วย
                        // และส่ง reference io_mutex เข้าไปด้วยเพื่อใช้ scoped_lock กันแย่งกัน print output
                        boost::asio::post(pool, std::bind(process, std::ref(io_mutex), a, b));
                }
        }

        // เรียก pool.join() เพื่อรอให้ทุก process ทำงานจบก่อนแล้วค่อยจบโปรแกรม
        pool.join();

        return EXIT_SUCCESS;
}

จะเห็นว่าโค้ดนี้มีจุดน่าสนใจเช่นตรง boost::asio::post(pool, std::bind(process, std::ref(io_mutex), a, b)); เราได้ใช้ฟีเจอร์ใหม่ๆของ C++ ที่เอาฟังก์ชันที่ประกาศไว้มาทำให้ pass เป็น value ของ post โดยใช้ std::bind ซึ่งส่งฟังก์ชันกับลิสต์ของพารามิเตอร์ไปให้ ซึ่งค่าไหนเป็น reference ก็ให้ใช้ std::ref ครอบก่อน

นอกจากนั้นเรายังได้ใช้ mutex กับ scoped_lock ของ boost::asio ช่วยในการจัดการ lock ตอนแสดงผลด้วย cout ด้วยเพราะถ้าไม่ทำจะเกิด race condition ตอนแสดงผล จนผลลัพธ์ผิดเพี้ยนไปได้

scoped_lock นั้นใช้วิธี RAII ซึ่งจะ unlock เองตอนจบ scope ทำให้เราไม่ต้องเรียก unlock เองอีกด้วย แต่สร้าง scoped_lock ก็พอแล้ว

ใน C++ ใหม่ๆรองรับ lambda express ด้วย ดังนั้นเดี๋ยวเราจะลองแปลงโค้ดใหม่มาใช้ lambda ดูแทนที่จะใช้ฟังก์ชัน std::bind process โค้ดก็จะได้แบบนี้

#include <boost/asio.hpp>
#include <boost/thread/mutex.hpp>

#include <iostream>
#include <vector>
#include <thread>
#include <functional>

using namespace std;

void process(boost::mutex &io_mutex, int a, int b)
{
        boost::mutex::scoped_lock scoped_lock(io_mutex);
        cout << a << "+" << b << " = " << a + b << "\n";
}

int main()
{
        // สร้าง thread_pool object เก็บในตัวแปร pool
        // โดยกำหนดให้จำนวน thread เท่ากับ core ของ CPU ของเครื่องที่รัน
        // เช็คได้จาก function hardward_concurrency
        boost::asio::thread_pool pool(thread::hardware_concurrency());

        // สร้าง mutex เพื่อช่วยให้ตอน cout ไม่เกิด race condition ที่ทำให้ผลลัพธ์เพี้ยนๆ
        boost::mutex io_mutex;

        vector<int> vs = {1, 2, 3, 4, 5};

        for (const auto &a : vs)
        {
                for (const auto &b : vs)
                {

                        // ส่งฟังก์ชันไปรันใน thread pool ด้วยฟังก์ชัน post
                        // โดยส่ง lambda express ที่เรียก process ข้างใน
                        boost::asio::post(
                            pool,
                            // lambda express syntax
                            // แบบนี้คือ [&] หมายถึงทุกตัวแปรที่ capture จาก scope ด้านนอกใช้เป็น reference
                            // ทั้ง io_mutex, a, b โดยเราไม่ต้องลิสต์เองทีละอัน
                            [&]()
                            {
                                    process(io_mutex, a, b);
                            });
                        // boost::asio::post(pool, std::bind(process, std::ref(io_mutex), a, b));
                }
        }

        // เรียก pool.join() เพื่อรอให้ทุก process ทำงานจบก่อนแล้วค่อยจบโปรแกรม
        pool.join();

        return EXIT_SUCCESS;
}

จะเห็นว่าโค้ด C++ ยุคใหม่ๆ ก็ทำให้อ่านและเขียนง่ายขึ้นเยอะเลย ใครมีงานอะไรที่ต้องใช้ lib c, lib cpp ลองมาเขียน C++ ใช้งานดูก็ไม่ได้ดูยากอีกต่อไปแล้วนะ