C++11 - Bellek modeli ve atomik türler

c++11

#1

C++ programlama dili yeni gelen standartlarla beraber belirsiz durum (undefined behaviour) diyebileceğimiz durumları en aza indirmeyi amaçladığını daha önceki yazılarda az çok değinmiştim. C++11’ le gelen en büyük yeniliklerden biri de multi-thread kod yazıldığında memory ordering veya cpu ordering gibi durumlardan kaynaklabilecek belirsiz durumları en aza indirgemeyi amaçlayan standart bir bellek modeli sunmasıdır.

Performans kaygısıyla derleyici veya işlemciniz sizin adınıza bazı değişiklikler yapabilir. Yani aslında tam olarak sizin yazdığınız kodun assembly karşılığını elde edemeyebilirsiniz. Örneğin,

int count = 5;
while(count)
   cout << "Beast is Back!" << endl;

Yukarıdaki koda baktığımızda öncelikle bellekte bir değişken yaratılması ve onun register’a yüklenip işlemci tarafından okunmasını beklemekteyiz. Ama aslında olan derleyicinin bu koda bakıp döngüyü while(true) şekline dönüştürerek çalışma zamanında performans kazanımını sağlar. Tabii ki derleyicinin en iyileme (optimization) seçeneklerinin açık olduğunu farzediyorum. Yukarıdaki kodu gcc 7.3 ile derlendiğinde iki farklı durumda ortaya çıkan işlemci komutları aşağıdaki gibidir.

/// varsayılan şekilde derlendiğinde
https://godbolt.org/g/LnsuZ7

/// -O3 ile derlendiğinde
https://godbolt.org/g/RiCUkU

Threadler arasında sağladığı bölünmezlik garantisi ile bellekte tutarlılık (consistency) sağlanmasına yardımcı olan std::atomic şablon sınıfıdır. Yani diyelim ki bir threadA int türünden bir a değişkenini okurken o sırada threadB tam bu okuma işlemi sırasında a değişkeninin bellekteki değerini değiştirebilir. Bu da tam da yarış durumudur (race condition). Eğer değişkenimiz atomic olarak tanımlansaydı okuma veya yazma işlemi bitmeden bölünmemesi garanti altında olacaktı.

   
    std::atomic<int> a = 0;  // ekrana 200000 olarak yazılması garanti altındadır.
    // int a = 0;  ekrana ne yazılacağı meçhul, belirsizlik durumu (undefined bahviour)

    std::thread writeThread( [&] () {
                                  for(int i = 0; i < 100000; i++) {
                                        a++;
                                  }
                              }
    );

    std::thread readThread( [&] () {
                                 for(int i = 0; i < 100000; i++) {
                                        a++;
                                 }
                             }
    );

    writeThread.join();
    readThread.join();

    cout << a << endl;

atomic şablon sınıfı türünden değişkenlerin erişimi(load) ve yazımı(store) sırasında bellek erişim biçimiyle beraber threadler arasında bellek paylaşımlarını yönetebileceğimiz özellikler getirdi modern C++. Bu özellikler ve getirdiği standartlarla beraber mutex, condition variable gibi özelliklerle elde ettiğimiz threadler arası senkronizasyonu elde etme imkanı sağlıyor hem de daha az bir maliyetle.

memory_order_seq_cst:
Inter-thread değişkenlerin tamamı eğer ‘memory_order_seq_cst’ parametresiyle sıralanmışsa tüm threadlerin üzerinde anlaştığı global bir sıralama vardır. Tüm load ve store operasyonları da kendi aralarında da sıralıdır.
Örneğin,

   
           atomic<int> x, y;

Thread 1                 Thread 2
x.store(10);             cout << y.load() << endl;
y.store(20);             cout << x.load() << endl;

Yukarıdaki sıralamaya göre y değişkenini load ettiğimizde x değişkenininde store edilmesi garanti altındadır. Kısaca ekrana 20 ve 10 yazılabilir. Ne yazılamaz dersek 20, 0 yazılma ihtimali yoktur çünkü ‘memory_order_seq_cst’ sıralı tutarlılığı(sequential concsistency) garanti eder. Ayrıca her iki threadde de store ve load operasyonları sıralıdır. Yani bellekte x store edilmeden y store edilmez aynı şekilde load işleci için de geçerlidir. Unutmadan ekleyelim varsayılan bellek sıralama seçeneğidir.

memory_order_relaxed:
Atomic olmayan bir değişkenle yapılan sıralamalar gibidir. Herhangi bir bellek bariyeri (memory barrier) söz konusu değildir. Relax çalışır :slight_smile:. Yalnız hiçbir sıralama garantisi yoktur dersek de doğru olmaz tıpkı single thread uygulamalardaki gibi aynı thread içerisindeki load ve store operasyonlarında happens-before sıralaması vardır. Fakat load operasyonları store operasyonlarını rassal bir sıralama da görür. Yukarıdaki örneğe dönersek, 20-10, 20-0, 0-0, 0-10 gibi tüm ihtimaller olasıdır.

memory_order_acquire - memory_order_release:
memory_order_seq_cst daki gibi üzerinde anlaşılan global bir sıralama yoktur ama memory_order_relaxed sıralamasındaki gibi de değildir. Aynı thread içerisinde yine happens-before ilişkisiyle beraber acquire-release operasyonları arasında da synchronize-with ilişkisi vardır. Aslında bir anlamda memory_order_seq_cst in daha hafif halidir.

   
           atomic<int> x, y;

Thread 1                                                                 Thread 2
x.store(10, memory_order_release);             cout << y.load(memory_order_acquire) << endl;
y.store(20, memory_order_release);             cout << x.load(memory_order_acquire) << endl;

Burda release ve acquire operasyonları ikili (pairwise) olarak senkronize olarak çalışır. Eğer herhangi bir şekilde bu sekronizasyon kurulmadıysa relaxed olarak çalışır.

İhtiyacım olan single producer-single consumer lock-free queueyu şuradaki örneği alarak ihtiyaçlarım doğrultusunda geliştirdim. Fena da olmadı.

Yaptığım testlerde mutex ile implemente edilen versiyonu ile lock-free versiyonu arasında ciddi performans farkı var. Tabi bu fark ringbuffer kaç kez tur attığıyla orantılı olarak büyüyor. Kasıtlı olarak rakam vermiyorum ki herkes kendi bilgisayarında çalıştırıp farkı görsün.

template <class T, size_t m_size>
class GenericRingBuffer
{
public :

    GenericRingBuffer()
    {
        m_write.store(0, std::memory_order::memory_order_relaxed);
        m_read.store(0, std::memory_order::memory_order_relaxed);
    }

    void push(T val)
    {
        while(!try_push(val));
    }

    void pop(T *val)
    {
        while(!try_pop(val));
    }

    void pop_bulk(T *dst, size_t len)
    {
        while(!try_pop_bulk(dst, len));

    }

    void push_bulk(T *src, size_t len)
    {
        while(!try_push_bulk(src, len));
    }

private:

    bool try_push(T val)
    {
        const auto currentWHead = m_write.load(std::memory_order_relaxed);
        const auto nextWHead = currentWHead + 1;
        if (currentWHead - m_read.load(std::memory_order_acquire) <= m_size - 1) {
            m_buffer[currentWHead % m_size] = val;
            m_write.store(nextWHead, std::memory_order_release);
            return true;
        }

        return false;
    }

    bool try_pop(T *pval)
    {
        const auto currentRHead = m_read.load(std::memory_order_relaxed);
        const auto nextRHead = currentRHead + 1;

        if (currentRHead == m_write.load(std::memory_order_acquire)) {
            return false;
        }

        *pval = m_buffer[currentRHead % m_size];
        m_read.store(nextRHead, std::memory_order_release);

        return true;
    }


    bool try_pop_bulk(T *dst, size_t len)
    {
        const auto currentRHead = m_read.load(std::memory_order_relaxed);
        const auto currentWHead = m_write.load(std::memory_order_acquire);

        if (currentWHead >= currentRHead) {
            if (currentWHead -  currentRHead < len)
                return false;
        }

        else if (currentWHead < currentRHead){
            if ((m_size - currentRHead) + currentWHead < len)
                return false;
        }

        size_t nextRHead = (currentRHead + len) % m_size;
        if (nextRHead < currentRHead) {
            memcpy(dst, m_buffer + currentRHead, (m_size - currentRHead) * sizeof(T));
            memcpy(dst + (m_size - currentRHead), m_buffer, nextRHead * sizeof(T));

        }

        else {
            memcpy(dst, m_buffer + currentRHead, len);
        }

        m_read.store(nextRHead, std::memory_order_release);

        return true;
    }

    bool try_push_bulk(T *src, size_t len)
    {
        const auto currentWHead = m_write.load(std::memory_order_relaxed);
        const auto currentRHead = m_read.load(std::memory_order_acquire);

        if (currentWHead > currentRHead) {
            if ((m_size - currentWHead) + currentRHead < len)
                return false;
        }

        else if (currentWHead < currentRHead) {
            if (currentRHead -  currentWHead < len)
                return false;
        }

        const auto nextWHead = (currentWHead + len) % m_size;
        if (nextWHead < currentWHead) {
            memcpy(m_buffer + currentWHead, src, (m_size - currentWHead) * sizeof(T));

            memcpy(m_buffer, src + (m_size - currentWHead),
                   nextWHead);
        }

        else
            memcpy(m_buffer + currentWHead, src, len);

        m_write.store(nextWHead, std::memory_order_release);
        return true;
    }

private :
    char cachePad_1[64];
    std::atomic<size_t> m_write;
    char cachePad_2[64];
    std::atomic<size_t> m_read;
    T m_buffer[m_size];
};

Mutexli versiyon ise;

template <class T, size_t m_size>
class GenericRingBuffer
{
public :

    GenericRingBuffer() 
    {
        m_write = 0;
        m_read = 0;
    }

    void clear()
    {
        std::lock_guard<std::mutex> lock(mtx);
        m_read = 0;
        m_write = 0;

    }

    bool push(T val)
    {
        std::lock_guard<std::mutex> lock(mtx);
        const auto nextWHead = m_write + 1;
        if (m_write - m_read <= m_size - 1) {
            m_buffer[m_write % m_size] = val;
            m_write = nextWHead;
            return true;
        }

        return false;
    }

    bool pop(T *val)
    {
        std::lock_guard<std::mutex> lock(mtx);
        const auto nextRHead = m_read + 1;

        if (m_read == m_write) {
            return false;
        }

        *val = m_buffer[m_read % m_size];
        m_read = nextRHead;

        return true;
    }

    bool pop_bulk(T *dst, size_t len)
    {
        std::lock_guard<std::mutex> lock(mtx);

        if (m_write >= m_read) {
            if (m_write -  m_read < len)
                return false;
        }

        else if (m_write < m_read){
            if ((m_size - m_read) + m_write < len)
                return false;
        }

        size_t nextRHead = (m_read + len) % m_size;
        if (nextRHead < m_read) {
            memcpy(dst, m_buffer + m_read, (m_size - m_read) * sizeof(T));
            memcpy(dst + (m_size - m_read), m_buffer, nextRHead * sizeof(T));

        }

        else {
            memcpy(dst, m_buffer + m_read, len);
        }

        m_read = nextRHead;

        return true;

    }

    bool push_bulk(T *src, size_t len)
    {
        std::lock_guard<std::mutex> lock(mtx);

        if (m_write > m_read) {
            if ((m_size - m_write) + m_read < len)
                return false;
        }

        else if (m_write < m_read) {
            if (m_read -  m_write < len)
                return false;
        }

        const auto nextWHead = (m_write + len) % m_size;

        if (nextWHead < m_write) {
            memcpy(m_buffer + m_write, src, (m_size - m_write) * sizeof(T));

            memcpy(m_buffer, src + (m_size - m_write),
                   nextWHead);
        }

        else
            memcpy(m_buffer + m_write, src, len);

        m_write = nextWHead;
        return true;

    }


private :
    size_t m_write;
    size_t m_read;
    T m_buffer[m_size];
    std::mutex mtx;
};

Bu da test kodu;

int main (int argc, char** argv)
{
    auto start = chrono::steady_clock::now();

    char a[10] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'k'};

    GenericRingBuffer<char, 1024> grb;

    std::thread write_thread( [&] () {
                                  for(int i = 0; i < 2000; i++) {
                                      grb.push_bulk(a, 10);
                                  }
                              }
    );

    std::thread read_thread( [&] () {
                                 for(int i = 0; i < 2000; i++) {
                                     char b[10];
                                     grb.pop_bulk(b, 10);
                                 }
                             }
    );

    write_thread.join();
    read_thread.join();


    auto end = chrono::steady_clock::now();
    cout << chrono::duration <double, milli> (end - start).count() << " ms " << endl;

    return 0;
}

Şunu da şuraya bırakayım:

Not: Burada verilen kodlar eğitim amaçlıdır hiçbir şekilde ticari bir garanti verilmemektedir. Kullananın kendi sorumluluğundadır.
#The End


#4

Merhaba,

Bilgiler için teşekkürler. Deneyip direk olarak çalıştıramayanlar için header başlık dosyaları ve namespace std kısmını eklemelere gerekiyor. Bazı kısımlarda cout direk kullanılmış.(Test kodunda)

#include <iostream>
#include <atomic>
#include <cstring>
#include <chrono>
#include <thread>

using namespace std;