воскресенье, 20 мая 2018 г.

C++: legacy код и указатели

История одного рефакторинга

Эта забавная история случилась пару лет назад, в ходе рефакторинга одного legacy-кода, который позарез был необходим в современном виде переписанным с микса C/C++ на C++11.

Так получилось, что мы имели на руках код, который был очень склонен к утечкам памяти, был написан в высшей степени небрежно, и иногда крашился. Однако использовать его было необходимо. Подобное не редкость в OpenSource, особенно когда первоначальные авторы забрасывают свои репозитории.

Что ж, давайте посмотрим на код (выбраны только фрагменты, относящиеся к утечке).
           struct GzipContext  
           {  
                z_stream     zstream;  
                unsigned char*     gzipBuffer;  
                unsigned int     checksum;  
                unsigned int     originalSize;  
                unsigned int     compressedSize;  
                unsigned int     sendingOffset;  
                unsigned int     lastChunkSize;  
           } *gzipContext;  
   
  ...  
      gzipContext = (struct GzipContext*) malloc(sizeof(struct GzipContext));  
  ...  
       // (re)allocate the gzipBuffer  
      gzipContext->gzipBuffer = (unsigned char*) realloc(gzipContext->gzipBuffer, 256 + gzipContext->originalSize);  
  ...  
       if(gzipContext)  
      {  
           if(gzipContext->gzipBuffer)  
           {  
                free(gzipContext->gzipBuffer);  
           }  
             
           free(gzipContext);  
           gzipContext = 0;  
      }  

Хотя предполагается, что это C++, тем не менее очевидно, что это низкоуровневый C. Весьма неряшливый. С указателями на указатели итп.

Немного деталей. Проблемное место здесь очевидно - это структура GzipContext, в особенности gzipBuffer, выделяемый в heap как множество указателей на байты, с вытекающей отсюда арифметикой указателей и утечками памяти в случае прерывания исполнения кода до блока, выполняющего освобождение.

Первая, и наиболее очевидная итерация - воспользоваться методами C++ new/delete и слегка переписать код (структура переименована ввиду добавленной поддержки deflate):

           struct CompressContext {  
                z_stream zstream;  
                unsigned char* Buffer;  
                unsigned long checksum;  
                unsigned int originalSize;  
                unsigned int compressedSize;  
                unsigned int sendingOffset;  
                unsigned int lastChunkSize;  
           } *compresscontext;  
   
   ...  
   compresscontext = (struct CompressContext*) operator new(sizeof(struct CompressContext));  
   ...  
   // (re)allocate the Buffer  
   compresscontext->Buffer = new unsigned char[256 + compresscontext->originalSize];  
   ...  
        if(compresscontext) {  
    if (compresscontext->Buffer) {  
     //free(compresscontext->Buffer);  
     delete[] compresscontext->Buffer;  
     compresscontext->Buffer = NULL;  
    }  
    operator delete(compresscontext);  
    compresscontext = 0;  
   }  

Немного лучше, однако, кроме некоторых логических ошибок и синтаксической неопрятности, это не совсемсовсем не C++11. Наиболее неприятно здесь то, что этот код - библиотека, вызываемая другой библиотекой, с не слишком линейной логикой, что не гарантирует того, что освобождение буфера будет всегда выполняться.

В оригинальном коде была еще пара проблем. Инициализация структуры выполнялась посредством memset (что само по себе дурной тон в C++), а освобождение памяти структуры должно было выполняться как минимум еще в одной подпрограмме.

Давайте перепишем так, как оно должно быть. Что нужно учесть? Во-первых, будем инициализировать структуру конструктором (деструктор можно использовать и default), а во-вторых, используем smart pointer. Мы достигнем нескольких целей одновременно. Во-первых, можно будет убрать инициализацию структуры, она будет инициализироваться конструктором автоматически (и избавимся от memset) и полностью освобождаться деструктором при покидании scope. Во-вторых, smart pointer позволяет нам не беспокоиться об освобождении памяти вне зависимости от того, в какой момент мы прерываем выполнение подпрограмм библиотеки.

Note: Конечно, можно попытаться решить проблему в лоб - в качестве буфера использовать вектор unsigned char. Это будет работать и решит проблему утечек, но, во-первых, дает приличный оверхед на обмолот вектора, а во-вторых, неэлегантно и приводит к генерации бОльшего количества кода.

Финальный вариант:


           struct CompressContext {  
                z_stream zstream;  
                std::unique_ptr<unsigned char[]> Buffer;  
                unsigned long checksum;  
                std::size_t originalSize;  
                std::size_t compressedSize;  
                std::size_t sendingOffset;  
                std::size_t lastChunkSize;  
   
                CompressContext() : checksum(crc32(0L, Z_NULL, 0)), originalSize(0), compressedSize(0), sendingOffset(0), lastChunkSize(0) {  
                     zstream.zalloc = Z_NULL;  
                     zstream.zfree = Z_NULL;  
                     zstream.opaque = Z_NULL;  
                }  
           };  
   
           CompressContext compresscontext;  
   
   ...
   std::size_t v_bufSize = 12 + compresscontext.originalSize*1.01; /* Buffers size by Zlib */
   /* Allocate the Buffer and initialize it */  
   compresscontext.Buffer = std::unique_ptr<unsigned char[]>{ new unsigned char[v_bufSize] };  
   

Здесь также исправлена логическая ошибка с размером выделенного буфера (в соответствие с документацией библиотеки ZLib).

Таким образом, мы избежали оверхеда в случае использования вектора, инициализируем структуру конструктором, автоматически освобождаем буфер вне зависимости от того, в какой момент прерываем обработку, полностью избавились от утечек памяти и избыточных указателей и привели код в соответствие с C++11 (полный итоговый код полностью избавлен от рудиментов C).

Чтобы было понятно, речь идет вот об этом коде.

четверг, 26 апреля 2018 г.

Unbound: Использование DNS-over-TLS в версии 1.7.1

Анонсирован пре-релиз версии 1.7.1. Согласно анонсу автора, хотя это и maintenance release, в нем добавлена функциональность DNS-over-TLS, которая поддерживается Quad9 и Clouflare DNS.

Небольшое лирическое отступление. Пока ломаются копья об выработку стандартов шифрования DNS, а автор dnscrypt со своим хипстерским демаршем упёрся в Go, наплевав на почти половину платформ, где нативного Go нет физически (стыдливо умолчим, что выкладывание исходников без Makefile это вообще за гранью добра и зла), сервисы потихоньку пилят свои публичные реализации, пытаясь де-факто сделать их стандартами по методу Корпорации добра. Так как на серверах мы нуждаемся в чистом DNS для поддержки наших сетей, нам приходится изыскивать методы решения данной задачи, так как дракидискуссии с разработчиками СПО контропродуктивны и не ведут абсолютно ни к какому результату, кроме дурной репутации склочника. В этой связи, мне удалось, после получаса работы, сделать такое решение на базе только Unbound и вышеуказанных публичных DNS.

Сначала надо обновить (ну или собрать с нуля) Unbound 1.7.1rc1:


 # 32 bit Solaris Studio  
 export CC=`which CC`  
 export cc=`which cc`  
 ./configure --prefix=/usr/local --with-conf-file=/usr/local/etc/unbound/unbound.conf --with-username=unbound --with-ssl=/opt/csw --with-libevent=/usr/local --with-libexpat=/opt/csw --without-pthreads --with-solaris-threads --enable-tfo-client --disable-dsa --with-pidfile=/tmp/unbound.pid 'CFLAGS=-xO5 -lmtmalloc' 'CPPFLAGS=-I/opt/csw/include'  
   
 # 64 bit Solaris Studio  
 ./configure --prefix=/usr/local --with-conf-file=/usr/local/etc/unbound/unbound.conf --with-username=unbound --with-ssl=/opt/csw --with-libevent=/usr/local --with-libexpat=/opt/csw --without-pthreads --with-solaris-threads --enable-tfo-client --with-pidfile=/tmp/unbound.pid 'CFLAGS=-xO5 -m64 -lmtmalloc' 'CPPFLAGS=-I/opt/csw/include' 'LDFLAGS=-m64 -L/opt/csw/lib/64'  
 make && make strip && make install  
   
 # 64 bit GCC  
 ./configure --prefix=/usr/local --with-conf-file=/usr/local/etc/unbound/unbound.conf --with-username=unbound --with-ssl=/opt/csw --with-libevent=/usr/local --with-libexpat=/opt/csw --without-pthreads --with-solaris-threads --enable-tfo-client --disable-dsa --with-pidfile=/tmp/unbound.pid 'CFLAGS=-O3 -m64' 'CPPFLAGS=-I/opt/csw/include' 'LDFLAGS=-m64 -L/opt/csw/lib/64'  
   
 # 64 bit GCC LibreSSL  
 #./configure --prefix=/usr/local --with-conf-file=/usr/local/etc/unbound/unbound.conf --with-username=unbound --with-ssl=/usr/local --with-libevent=/usr/local --with-libexpat=/opt/csw --without-pthreads --with-solaris-threads --enable-tfo-client --disable-dsa --with-pidfile=/tmp/unbound.pid 'CFLAGS=-O3 -m64' 'CPPFLAGS=-I/usr/local/include -I/usr/local/include' 'LDFLAGS=-m64 -L/usr/local/lib -L/opt/csw/lib/64'  
   
 gmake && gmake strip && gmake install  

Проверьте конфигурационные опции, пути библиотек могут быть иными в вашей системе. Убедитесь, что установлен libevent - это dependency.

Следующий шаг - конфигурация. Первое - необходим CA bundle. Я рекомендую взять его с Мозиллы и обновлять кроном раз в неделю.

Далее. Надо добавить (или изменить) в unbound.conf следующие строки:

В секцию server:

      # Certificates used to authenticate connections made upstream.  
      # tls-cert-bundle: ""  
      tls-cert-bundle: "/usr/local/squid/etc/ca-bundle.crt"   

В секцию forward-zone:

 forward-zone:  
  name: "."  
  forward-addr: 1.1.1.1@853#cloudflare-dns.com  
  forward-addr: 1.0.0.1@853#cloudflare-dns.com  
  forward-addr: 9.9.9.9@853#dns.quad9.net  
  forward-addr: 149.112.112.112@853#dns.quad9.net  
  forward-tls-upstream: yes  

Важно: Обратите внимание на 853 порт, который надо задать явно (во всяком случае, в rc1) и на псевдокомментарии - они необходимы для аутентификации на серверах апстрима.

Несколько соображений на закуску. 853 порт в принципе светится у провайдера и может быть заблокирован. В случае блокировки его придется туннелить, для чего есть параметр:

      # upstream connections use TCP only (and no UDP), "yes" or "no"  
      # useful for tunneling scenarios, default no.  
      # tcp-upstream: no  

Рекомендую, в случае проблем, проверить доступность 853 порта телнетом с сервера. Остальное очевидно. Порт должен быть разрешен на граничном роутере. Если это так, а доступа нет - тогда используется туннелинг.

UPDATE В текущей версии Unbound, включая описываемую, есть критический баг forward-zone: в случае блокировки/смерти forward-addr (даже если это не первый peer, даже если остальные пиры работают), резолвинг DNS рекурсора в целом прекращается. Багрепорт автору отправлен, баг подтвержден и пофикшен, включен в текущую ветку и будет окончательно исправлен в релизе 1.7.1. Для текущего 1.7.1rc1 есть патч:

 Index: services/outside_network.c  
 ===================================================================  
 --- services/outside_network.c     (revision 4669)  
 +++ services/outside_network.c     (working copy)  
 @@ -1301,8 +1301,8 @@  
       w->ssl_upstream = sq->ssl_upstream;  
       w->tls_auth_name = sq->tls_auth_name;  
  #ifndef S_SPLINT_S  
 -     tv.tv_sec = timeout;  
 -     tv.tv_usec = 0;  
 +     tv.tv_sec = timeout/1000;  
 +     tv.tv_usec = (timeout%1000)*1000;  
  #endif  
       comm_timer_set(w->timer, &tv);  
       if(pend) {  
 @@ -1812,7 +1812,12 @@  
       }  
       if(sq->tcp_upstream || sq->ssl_upstream) {  
         struct timeval now = *sq->outnet->now_tv;  
 -       if(now.tv_sec > sq->last_sent_time.tv_sec ||  
 +       if(error==NETEVENT_TIMEOUT) {  
 +         if(!infra_rtt_update(sq->outnet->infra, &sq->addr,  
 +            sq->addrlen, sq->zone, sq->zonelen, sq->qtype,  
 +            -1, sq->last_rtt, (time_t)now.tv_sec))  
 +            log_err("out of memory in TCP exponential backoff.");  
 +       } else if(now.tv_sec > sq->last_sent_time.tv_sec ||  
            (now.tv_sec == sq->last_sent_time.tv_sec &&  
            now.tv_usec > sq->last_sent_time.tv_usec)) {  
            /* convert from microseconds to milliseconds */  
 @@ -1822,7 +1827,7 @@  
            log_assert(roundtime >= 0);  
            /* only store if less then AUTH_TIMEOUT seconds, it could be  
             * huge due to system-hibernated and we woke up */  
 -          if(roundtime < TCP_AUTH_QUERY_TIMEOUT*1000) {  
 +          if(roundtime < 60000) {  
              if(!infra_rtt_update(sq->outnet->infra, &sq->addr,  
                 sq->addrlen, sq->zone, sq->zonelen, sq->qtype,  
                 roundtime, sq->last_rtt, (time_t)now.tv_sec))  
 @@ -1863,18 +1868,26 @@  
  static int  
  serviced_tcp_send(struct serviced_query* sq, sldns_buffer* buff)  
  {  
 -     int vs, rtt;  
 +     int vs, rtt, timeout;  
       uint8_t edns_lame_known;  
       if(!infra_host(sq->outnet->infra, &sq->addr, sq->addrlen, sq->zone,  
            sq->zonelen, *sq->outnet->now_secs, &vs, &edns_lame_known,  
            &rtt))  
            return 0;  
 +     sq->last_rtt = rtt;  
       if(vs != -1)  
            sq->status = serviced_query_TCP_EDNS;  
       else      sq->status = serviced_query_TCP;  
       serviced_encode(sq, buff, sq->status == serviced_query_TCP_EDNS);  
       sq->last_sent_time = *sq->outnet->now_tv;  
 -     sq->pending = pending_tcp_query(sq, buff, TCP_AUTH_QUERY_TIMEOUT,  
 +     if(sq->tcp_upstream || sq->ssl_upstream) {  
 +          timeout = rtt;  
 +          if(rtt >= 376 && rtt < TCP_AUTH_QUERY_TIMEOUT)  
 +               timeout = TCP_AUTH_QUERY_TIMEOUT;  
 +     } else {  
 +          timeout = TCP_AUTH_QUERY_TIMEOUT;  
 +     }  
 +     sq->pending = pending_tcp_query(sq, buff, timeout,  
            serviced_tcp_callback, sq);  
       return sq->pending != NULL;  
  }  
 Index: services/outside_network.h  
 ===================================================================  
 --- services/outside_network.h     (revision 4669)  
 +++ services/outside_network.h     (working copy)  
 @@ -376,7 +376,7 @@  
       int retry;  
       /** time last UDP was sent */  
       struct timeval last_sent_time;  
 -     /** rtt of last (UDP) message */  
 +     /** rtt of last message */  
       int last_rtt;  
       /** do we know edns probe status already, for UDP_EDNS queries */  
       int edns_lame_known;  
 @@ -456,7 +456,7 @@  
  * checks id.  
  * @param sq: serviced query.  
  * @param packet: wireformat query to send to destination. copied from.  
 - * @param timeout: in seconds from now.  
 + * @param timeout: in milliseconds from now.  
  *  Timer starts running now. Timer may expire if all buffers are used,  
  *  without any query been sent to the server yet.  
  * @param callback: function to call on error, timeout or reply.  
 Index: util/net_help.h  
 ===================================================================  
 --- util/net_help.h     (revision 4669)  
 +++ util/net_help.h     (working copy)  
 @@ -73,10 +73,10 @@  
  /** set RCODE bits in uint16 flags */  
  #define FLAGS_SET_RCODE(f, r) (f = (((f) & 0xfff0) | (r)))  
   
 -/** timeout in seconds for UDP queries to auth servers. */  
 -#define UDP_AUTH_QUERY_TIMEOUT 4  
 -/** timeout in seconds for TCP queries to auth servers. */  
 -#define TCP_AUTH_QUERY_TIMEOUT 30  
 +/** timeout in milliseconds for UDP queries to auth servers. */  
 +#define UDP_AUTH_QUERY_TIMEOUT 3000  
 +/** timeout in milliseconds for TCP queries to auth servers. */  
 +#define TCP_AUTH_QUERY_TIMEOUT 3000  
  /** Advertised version of EDNS capabilities */  
  #define EDNS_ADVERTISED_VERSION     0  
  /** Advertised size of EDNS capabilities */  
   

Патч накатывается обычным образом посредством patch -p0 и Unbound перекомпилируется.

вторник, 27 марта 2018 г.

C++: std::condition_variable vs std::this_thread::sleep_for


Все видели конструкции, вынесенные в заголовок. std::this_thread::sleep_for используется для реализации всевозможных вариаций на тему поллинга и спинлоков в 9 случаях из 10.

Я не могу сказать, что std::this_thread::sleep_for - это всегда плохо. Нет, конечно. Если речь идет о каком-то второстепенном процессе, с большим интервалом выполнения - его можно использовать.

Но, в случае критичных по скорости участков кода - использование std::this_thread::sleep_for чревато, как минимум потерей латентности и, в целом, снижением масштабирования. А иногда и повышенным использованием CPU.

Давайте рассмотрим одну очень хорошую библиотеку thread pool. На мой взгляд, это лучшая из имеющихся на GitHub библиотек данного назначения.

Она всем хороша. Кроме одного. В ней в worker.hpp используется std::this_thread::sleep_for:
 template <typename Task, template<typename> class Queue>  
 inline void Worker<Task, Queue>::threadFunc(size_t id, Worker* steal_donor)  
 {  
   *detail::thread_id() = id;  
   
   Task handler;  
   
   while (m_running_flag.load(std::memory_order_relaxed))  
   {  
     if (m_queue.pop(handler) || steal_donor->steal(handler))  
     {  
       try  
       {  
         handler();  
       }  
       catch(...)  
       {  
         // suppress all exceptions  
       }  
     }  
     else  
     {  
       std::this_thread::sleep_for(std::chrono::milliseconds(1));  
     }  
   }  
 }  
   

Казалось бы, ничего страшного. 1 миллисекунда ожидания в случае, если очередь заданий пуста.

Однако, все не так радужно.



Первое. Когда поток заданий иссякает - воркеры начинают с этой периодичностю тыкаться в очередь, что дает постоянную загрузку CPU от 0.04 до 0.3% (в зависимости от платформы).

Второе. Когда поток заданий неравномерный - латентность прыгает от 3 до 15 миллисекунд (зависит от платформы, ОС, нагрузки, числа ядер, режима планировщика итп.)

Кроме того, std::this_thread::sleep_for помещает поток не всегда в очередь спящих.

Попробуем это исправить.


 --- worker.hpp     Thu Mar 22 23:20:12 2018  
 +++ worker.hpp     Sat Mar 24 22:28:13 2018  
 @@ -2,6 +2,8 @@  
    
  #include <atomic>  
  #include <thread>  
 +#include <condition_variable>  
 +#include <mutex>  
    
  namespace tp  
  {  
 @@ -78,6 +80,8 @@  
    Queue<Task> m_queue;  
    std::atomic<bool> m_running_flag;  
    std::thread m_thread;  
 +  std::mutex m_conditional_mutex;  
 +  std::condition_variable m_conditional_lock;  
  };  
    
    
 @@ -121,6 +125,7 @@  
  inline void Worker<Task, Queue>::stop()  
  {  
    m_running_flag.store(false, std::memory_order_relaxed);  
 +  m_conditional_lock.notify_all();  
    m_thread.join();  
  }  
    
 @@ -140,6 +145,7 @@  
  template <typename Handler>  
  inline bool Worker<Task, Queue>::post(Handler&& handler)  
  {  
 +  m_conditional_lock.notify_one();  
    return m_queue.push(std::forward<Handler>(handler));  
  }  
    
 @@ -171,7 +177,8 @@  
      }  
      else  
      {  
 -      std::this_thread::sleep_for(std::chrono::milliseconds(1));  
 +      std::unique_lock<std::mutex> lock(m_conditional_mutex);  
 +      m_conditional_lock.wait(lock);  
      }  
    }  
  }  
   

Воспользуемся std::condition_variable.

std::condition_variable при запросе ожидания сразу отправляет поток в очередь планировщика sleep, не расходуя ресурсы процессора. Приведенный выше патч делает следующее. Каждый воркер, обнаруживая, что очередь заданий пуста, отправляется спать. И пробуждается по выполнении метода post(). (Ну и, разумеется, при шатдауне пула все воркеры пробуждаются перед завершением).

Что мы получаем?


Получаем мы трехкратное улучшение латентности пула при непостянной нагрузке заданиями, меньшее использование CPU и вот такой график латентности пула:


Слева на графике - оригинальный worker.hpp. Справа - тот же пул после наложения вышеприведенного патча.

std::condition_variable имеет еще одно преимущество. Несмотря на использование mutex, все операции с ней атомарны и не вызывают заметного повышения wait.

Тесты показали, что данный патч не нарушает алгоритмической логики thread pool и не вызывает сериализации.

пятница, 2 марта 2018 г.

С++: Thread affinity, thread pools и performance

Thread pool и affinity

Пулы тредов используются весьма часто. Очевидный факт - бОльшая часть имеет тредов больше, чем физических ядер и это имеет смысл: в пул часто отправляются разнородные задачи, просто ради отзывчивости. В этих ситуациях лучше отдать аффинити на откуп планировщику ОС, и, разумеется, здесь речь не идет о кратном ускорении производительности пропорционально числу ядер/степени параллелизма пула (и не может идти. Как максимум это мультитаскинг).

Однако, если мы говорим о серверах, и о ситуациях, когда thread pool используется для параллельного выполнения однотипных задач - AIO, например, или некий процессинг - то в подобных случаях ожидается именно кратное ускорение за счет распараллеливания.

Конечно, здесь вступает в игру механизм синхронизации - как параллелящихся задач, так и самого пула (обычно его очереди). Мьютексы сами по себе дают точки сериализации и приводят к тому, что выигрыш от паралеллизма не кратен числу тредов/числу физических ядер.

Но речь в данном случае не о реализациях пулов тредов, а об affinity.

В общей ситуации ничего подобного не делается и нередки случаи, когда потоки пула оказываются на одном ядре по несколько штук. В результате, то, что подразумевалось как выполняющееся параллельно, по факту выполняется последовательно.

Приведу один пример.

Допустим, у вас 8 ядер. Вы запускаете процесс, создающий пул из 8 тредов, и на эти 8 тредов запускаете потоком одинаковые задачи. 

Пока треды распределены по ядрам более-менее равномерно - все в шоколаде. Степень параллелизма 6-8. Если вы хорошо написали сами выполняющиеся задачи - то есть они в достаточной степени автономны и не требуют синхронизации, а реализация пула тредов у вас lock-free - выигрыш очевиден.

Представьте себе теперь, что произошел ой - и по причине либо программной ошибки в вашем коде, либо по причине перегрузки ОС - все треды пула оказались на одном ядре.

Представили?

Это трындец. Треды, которые должны быть параллельными - по факту выполняются на одном ядре последовательно.

Что в итоге?
А в итоге восьмикратное (минимум) замедление (минимум потому, что собственные механизмы очереди пула дополнительно замедлены тем, что сошлись на одном ядре вместе с выполняющимися процессами).

В подобной ситуации thread affinity спасет гигантов мысли.

Одно маленькое НО: это надо реализовывать на уровне библиотеки thread pool.

Thread pool with affinity

В предыдущей статье приведен очень простенький пример, который мало применим на практике. На самом деле нам нужно выполнять processor binding на уровне библиотеки thread pool.

Для начала, постановка задачи.

Есть однотипные задачи (параллельный процессинг слабо связанных данных), есть пул, размеры которого по умолчанию равны числу физических ядер сервера, есть необходимость обеспечить минимальную (и одинаковую) латентность выполнения каждой отдельно взятой задачи.

То есть, нам необходимо обеспечить привязку тредов по ядрам с использованием Round-Robin.

Задача кажется очевидной, однако несколько осложняется тем, что не во всех системах идентификаторы ядер идут по порядку - 0,1,2,3,4,5 ...... и так далее.

Например:





Это SPARC.

То есть, если в сервере несколько материнских плат, то нумерация ядер может идти и не по порядку.

Что ж, ничего сложного. Немного видоизменим алгоритм для Солярис, только и всего.

Фрагмент библиотеки thread pool:
 #ifdef AFFINITY  
 #if defined __sun__  
 #include <sys/types.h>  
 #include <sys/processor.h>  
 #include <sys/procset.h>  
 #include <unistd.h>     /* For sysconf */  
 #elif __linux__  
 #include <cstdio>     /* For fprintf */  
 #include <sched.h>  
 #endif  
 #endif  
   
 ...  
   
  // Init thread pool  
  void init() {  
   #if (defined __sun__ || defined __linux__) && defined AFFINITY    
   std::size_t v_cpu = 0;  
   std::size_t v_cpu_max = std::thread::hardware_concurrency() - 1;  
   #endif  
   
   #if defined __sun__ && defined AFFINITY  
   std::vector<processorid_t> v_cpu_id;     /* Struct for CPU/core ID */  
   
   processorid_t i, cpuid_max;  
   cpuid_max = sysconf(_SC_CPUID_MAX);  
   for (i = 0; i <= cpuid_max; i++) {  
     if (p_online(i, P_STATUS) != -1)     /* Get only online cores ID */  
       v_cpu_id.push_back(i);  
   }  
   #endif  
   
   for (std::size_t i = 0; i < m_threads.size(); ++i) {  
   
      #if (defined __sun__ || defined __linux__) && defined AFFINITY  
      if (v_cpu > v_cpu_max) {  
           v_cpu = 0;  
      }  
   
      #ifdef __sun__  
      processor_bind(P_LWPID, P_MYID, v_cpu_id[v_cpu], NULL);  
      #elif __linux__  
      cpu_set_t mask;  
      CPU_ZERO(&mask);  
      CPU_SET(v_cpu, &mask);  
      pthread_t thread = pthread_self();  
      if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &mask) != 0) {  
           fprintf(stderr, "Error setting thread affinity\n");  
      }  
      #endif  
   
      ++v_cpu;  
      #endif  
   
    m_threads[i] = std::thread(ThreadWorker(this, i));  
   }  
  }  
   

То есть мы запрашиваем у системы фактические ID онлайн-ядер процессоров, заталкиваем их в вектор, и потом, пробегая по вектору, задаем привязку каждого треда к ядру.

Все просто.

В результате наша программа с 8 тредами будет распределена по ядрам вот таким образом:


lwp id 25633/1: 19 - это родительский процесс, запускающий пул.

Результаты тестирования показали 3-5-кратное улучшение латентности в сравнении с пулом без affinity в данной конкретной задаче (с 15 мс до 3 мс среднего времени отклика).

вторник, 27 февраля 2018 г.

CPU Affinity в программах C++11

У Эли Бендерского есть хороший блог на тему тредов и сабжа. Однако там не подчеркивается несколько специфических моментов, а исходники неполны.

Исправим это.

Что стоит знать про cpu affinity вообще и в программах C++11 в частности (thread-aware, разумеется. Нет тредов - нет аффинити).

  1. Для C/C++ не существует portable решения по биндингу тредов к ядрам/процессорам. Оно всегда будет OS-specific.
  2. Написать подобное решение для в C++ можно, однако это будет куча директив препроцессора, распознающего различные ОС и библиотеки (как в примере ниже).
  3. В большинстве случаев cpu affinity не имеет смысла. Например, в случаях, когда планировщик ОС, работающей на голом железе, равномерно раскидывает треды по ядрам/процессорам сам по себе. Обычно affinity используют в действительно сложных конфигурациях и случаях оптимизации, когда ОС невовремя вытесняет треды с ядер и тому подобных ситуациях. Однако при использовании специализированных серверов с фиксированной нагрузкой, или когда необходимо обеспечивать некую постоянную латентность программы/сервиса - affinity может помочь.
Отполированный вариант демонстрационного кода для программной установки affinity перед запуском треда в C++11 (с включением всех необходимых библиотек и препроцессорных директив):

tools_gethwc.h:

 #ifndef TOOLS_GETHWC_H  
 #define TOOLS_GETHWC_H  
 #if !__cplusplus >= 201103L || !__cplusplus >= 199711L  
  #error This program needs at least a C++11 compliant compiler  
 #endif  
 #if defined __sun__  
 #include <sys/types.h>  
 #include <sys/processor.h>  
 #include <sys/procset.h>  
 #elif __linux__  
 #include <sched.h>  
 #endif  
 #include <cstddef>     /* For std::size_t */  
 #include <iostream>  
 #include <thread>  
 #include <mutex>  
 #include <vector>  
 #include <chrono>  
 #endif  

tools_gethwc.cc:

 #include "tools_gethwc.h"  
 std::mutex g_iomutex;  
 #if defined __sun__ || defined __Linux__  
 static inline void set_cpu(int cpu) {  
 #ifdef __sun__  
      processor_bind(P_LWPID,P_MYID, cpu, NULL);  
 #elif __linux__  
      cpu_set_t mask;  
      CPU_ZERO(&mask);  
      CPU_SET(cpu, &mask);  
      pthread_t thread = pthread_self();  
      if (pthread_setaffinity_np(thread, sizeof(cpu_set_t), &mask) != 0) {  
           fprintf(stderr, "Error setting thread affinity\n");  
      }  
 #endif  
 }  
 #endif  
 void thr_exec() {  
      {  
           std::lock_guard<std::mutex> lock(g_iomutex);  
           #ifdef __sun__  
           std::cout << "Thread #" << std::this_thread::get_id() << " is running on CPU/core: " << getcpuid() << std::endl;  
           #elif __Linux__  
           std::cout << "Thread #" << std::this_thread::get_id() << " is running on CPU/core: " << sched_getcpu() << std::endl;  
           #else  
           std::cout << "Thread #" << std::this_thread::get_id() << " is running" << std::endl;  
           #endif  
      }  
      std::this_thread::sleep_for(std::chrono::milliseconds(200));  
 }  
 int main(int argc, char* argv[])  
 {  
      if (argc > 1) {  
           if (std::string(argv[1]) == "-H" || std::string(argv[1]) == "-h" || std::string(argv[1]) == "-?") {  
                std::cout << "This is utility to get and check hardware concurrency." << std::endl;  
                std::cout << "To get it just run tool without arguments." << std::endl;  
                exit(0);  
           }  
      }  
      std::size_t v_num_cpus = std::thread::hardware_concurrency();  
      std::vector<std::thread> v_threads(v_num_cpus);  
      std::cout << "Launching " << v_num_cpus << " threads" << std::endl;  
      for (std::size_t i = 0; i < v_num_cpus; ++i) {  
           #if defined __sun__ || defined __linux__  
           set_cpu(i);     /* Affinity */  
           #endif  
           v_threads[i] = std::thread([]() { thr_exec(); });  
      }  
      for (auto& t : v_threads) {  
           t.join();  
      }  
      return 0;  
 }  

Для компиляции использовались следующие команды:

      # Build get hardware concurrency  
      g++ -pthread -O3 -std=c++11 -m64 -c -o tools_gethwc.o tools_gethwc.cc -I.  
      g++ -pthread -s -m64 -o tools_gethwc tools_gethwc.o  

После компиляции можно запустить бинарник и убедиться, что треды биндятся по ядрам строго последовательно и не более, чем по одному на ядро (как и было запрограммировано):
 # tools_gethwc  
 Launching 8 threads  
 Thread #2 is running on CPU/core: 0  
 Thread #3 is running on CPU/core: 1  
 Thread #4 is running on CPU/core: 2  
 Thread #5 is running on CPU/core: 3  
 Thread #6 is running on CPU/core: 4  
 Thread #7 is running on CPU/core: 5  
 Thread #8 is running on CPU/core: 6  
 Thread #9 is running on CPU/core: 7  

На линукс тоже работает:

 # ./tools_gethwc   
 Launching 4 threads  
 Thread #140300913096448 is running on CPU/core: 0  
 Thread #140300904703744 is running on CPU/core: 1  
 Thread #140300896311040 is running on CPU/core: 2  
 Thread #140300887918336 is running on CPU/core: 3  

Идея с функцией set_cpu() стырена в исходниках PostgreSQL :) Или в Плазме. Которая, говорят, уже даже не падает :)

Код различает Solaris/Linux и использует соответствующие платформенно-специфичные вызовы. Для платформ, отличных от этих двух, affinity не выполняется (и не компилируется соответствующий код) и запуск тредов производится как получится.