Cell/B.E. & SpursEngine プログラミング
Home記事一覧BBS

SpursEngine-Host間の転送速度の計測

SpursEngineとHost間のデータ転送速度を計測しました。

今回はSpursEngineのローカルメモリ(128MB)への転送のみで、SPEへのDMA転送は行っていません。SpursEngineの性能計測というよりは、PCI Express x1バスの性能計測に近いテストです。

実行方法

SpursEngine Linux SDKは「PCにSpursEngine Linux SDKをインストールする」の方法でインストール済みであることとします。

端末にコマンドを入力して作業を行います。
テストプログラムをダウンロードします。
wget https://cellspe.matrix.jp/files/spurs_mem_obj_test.tar.gz

解凍します。
tar xzf spurs_mem_obj_test.tar.gz

コンパイルします。
cd spurs_mem_obj_test

make

片方向の転送速度計測
./main

双方向の転送速度計測
./mainrw

計測結果

Host→SpursEngineの転送速度、SpursEngine→Hostの転送速度、Host←→SpursEngine双方向同時の転送速度について複数のスレッドを用いて転送のテストを行いました。
なお、この結果は筆者の古いPCのテスト機1台だけで計測した結果であり、テスト環境によってはこれと大きく結果が異なる可能性もあります。

Host→SpursEngine、SpursEngine→Hostの転送速度(片方向)
プログラム:main.cpp
PCI Express x1での理論値:250MB/s
(参考:PCI Express(Wikipedia))

Threads: 1
Host to SpursEngine: 145.079269 MB/s
SpursEngine to Host: 184.843890 MB/s

Threads: 2
Host to SpursEngine: 162.067934 MB/s
SpursEngine to Host: 196.956343 MB/s

Threads: 4
Host to SpursEngine: 162.874685 MB/s
SpursEngine to Host: 197.653417 MB/s

Threads: 8
Host to SpursEngine: 162.870955 MB/s
SpursEngine to Host: 197.660132 MB/s

Threads: 16
Host to SpursEngine: 162.853548 MB/s
SpursEngine to Host: 198.501865 MB/s


Host←→SpursEngine双方向同時の転送速度
プログラム:mainrw.cpp
PCI Express x1での理論値:500MB/s

Threads (all): 2
Threads (read or write): 1
Host <-read & write-> SpursEngine: 181.106558 MB/s

Threads (all): 4
Threads (read or write): 2
Host <-read & write-> SpursEngine: 192.357914 MB/s

Threads (all): 8
Threads (read or write): 4
Host <-read & write-> SpursEngine: 191.442456 MB/s

Threads (all): 16
Threads (read or write): 8
Host <-read & write-> SpursEngine: 193.129940 MB/s

Threads (all): 32
Threads (read or write): 16
Host <-read & write-> SpursEngine: 193.578570 MB/s

Threads (all): 64
Threads (read or write): 32
Host <-read & write-> SpursEngine: 193.719781 MB/s

いずれも複数スレッドで転送した方が少し速くなるようです。
双方向同時の転送ではPCI Expressの理論上2倍の転送速度になるかと期待したのですが、実際はそれほど速くなりませんでした。

このテストでは複数スレッドでの転送で転送内容が一致することを確認していますが、転送のための関数(spha_data_transfer_to()、spha_data_transfer_from())がスレッドセーフであるかどうかはドキュメントには明記されていないので実際のプログラムでの使用には注意が必要です。

テストプログラム ソースコード

特に面白いプログラムではないのですが、並列処理をpthread_create→pthread_joinを繰り返して行うのではなく、スレッドは一度だけ生成して条件変数を使って同期を行い、処理を軽くしています。親スレッドも子スレッドのタスクの終了を条件変数で待つようにして無駄なCPU負荷を減らしています。

main.cpp
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <time.h>
#include <pthread.h>
#include <spurs/spha/spha.h>
#include <spurs/common/byte_order.h>
#include <spurs/sp3/sp3_cmdif.h>

#define BUFFER_SIZE (64 * 1024 * 1024)

#define COMMAND_RUN_H2A 0
#define COMMAND_RUN_A2H 1
#define COMMAND_SLEEP 2
#define COMMAND_QUIT 3

// スレッド数:1,2,4,8,16,...
#define THREAD_NUM 4

typedef struct
{
  // p2c:親スレッドが子スレッドを起こすためのミューテックス、条件変数
  // c2p:子スレッドが親スレッドを起こすためのミューテックス、条件変数
  pthread_mutex_t * mutex_p2c;
  pthread_mutex_t * mutex_c2p;
  pthread_cond_t * cond_p2c;
  pthread_cond_t * cond_c2p;
  // コマンド
  int command;
  // コマンドのカウンタ
  unsigned int command_counter;
  // 完了したコマンドのカウンタ
  unsigned int done_counter;
  // スレッドID
  int thread;
  // 転送サイズ
  uint32_t transfer_size;
  // SPHAセッション
  spha_session_t * session;
  // メモリオブジェクト
  spha_object_t * buffer_obj;
  // ホスト側バッファ
  char * buffer;
  // 転送開始オフセット
  uint32_t offset;
} thread_args_t;


// 子スレッド
void * thread_child(void * arg)
{
  thread_args_t * t_arg = (thread_args_t *)arg;
  unsigned int quitflag = 0;
  unsigned int command_counter = 0;
  uint32_t transfer_size;
  struct timespec ts;

  while (1)
  {
    // command_counterが更新されていたらcommandを実行
    if (command_counter != t_arg->command_counter)
    {
      command_counter = t_arg->command_counter;

      switch (t_arg->command)
      {
        case COMMAND_RUN_H2A:
          // HostからSpursEngineへ転送
          transfer_size = t_arg->transfer_size;
          spha_data_transfer_to(*(t_arg->session), *(t_arg->buffer_obj), t_arg->offset, t_arg->buffer, &transfer_size);
          break;
        case COMMAND_RUN_A2H:
          // SpursEngineからHostへ転送
          transfer_size = t_arg->transfer_size;
          spha_data_transfer_from(*(t_arg->session), *(t_arg->buffer_obj), t_arg->offset, t_arg->buffer, &transfer_size);
          break;
        case COMMAND_QUIT:
          quitflag = 1;
          break;
        case COMMAND_SLEEP:
          break;
        default:
          break;
      }

      // スリープする前に親スレッドが次のコマンドをbroadcastするのを防ぐため
      // done_counterを更新してからスリープするまでp2c mutexをロックする。
      // p2c mutexは次のpthread_cond_timedwaitで自動解放される
      pthread_mutex_lock(t_arg->mutex_p2c);

      pthread_mutex_lock(t_arg->mutex_c2p);

      // コマンド実行が完了したのでdone_counterを更新する
      t_arg->done_counter = command_counter;

      // 親スレッドを起こす
      pthread_cond_broadcast(t_arg->cond_c2p);
      pthread_mutex_unlock(t_arg->mutex_c2p);
    }

    if (quitflag)
    {
      // 終了時にはp2cをアンロックする
      pthread_mutex_unlock(t_arg->mutex_p2c);
      break;
    }

    // 親スレッドから起こされるまでスリープ(タイムアウト1秒)
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += 1;
    pthread_cond_timedwait(t_arg->cond_p2c, t_arg->mutex_p2c, &ts);
    pthread_mutex_unlock(t_arg->mutex_p2c);
  }

  // スレッド終了
  pthread_exit(NULL);
}


// 子スレッドのコマンド終了を待つ
void wait_for_completion(pthread_mutex_t * mutex_c2p, pthread_cond_t * cond_c2p, thread_args_t * t_arg)
{
  struct timespec ts;

  while(1)
  {
    pthread_mutex_lock(mutex_c2p);

    // 全ての子スレッドのコマンド終了をチェックする
    int busyflag = 0;
    for (int i = 0; i < THREAD_NUM; i++)
    {
      if (t_arg[i].done_counter != t_arg[i].command_counter)
      {
        busyflag++;
      }
    }

    if (busyflag == 0)
    {
      pthread_mutex_unlock(mutex_c2p);
      break;
    }
    else
    {
      // まだ終わっていなければ
      // 子スレッドから起こされるまでスリープ(タイムアウト1秒)
      clock_gettime(CLOCK_REALTIME, &ts);
      ts.tv_sec += 1;
      pthread_cond_timedwait(cond_c2p, mutex_c2p, &ts);
    }

    pthread_mutex_unlock(mutex_c2p);
  }
}


// 時間計測用関数。Tick値を返す(マイクロ秒)
static inline long long mcl_get_tick_count(void)
{
  struct timeval timeprof;
  gettimeofday(&timeprof, NULL);
  return ((long long)timeprof.tv_sec * (long long)1000000 + (long long)timeprof.tv_usec);
}


int main(int argc, char ** argv)
{
  static pthread_mutex_t  mutex_p2c;
  static pthread_mutex_t  mutex_c2p;
  static pthread_cond_t  cond_p2c;
  static pthread_cond_t  cond_c2p;
  static pthread_t thread[THREAD_NUM];
  static thread_args_t t_arg[THREAD_NUM];
  spha_session_t session;
  unsigned long long tick_s, tick_e;

  printf("Threads: %d\n", THREAD_NUM);

  // ミューテックス、条件変数の初期化
  pthread_mutex_init(&mutex_p2c, NULL);
  pthread_mutex_init(&mutex_c2p, NULL);
  pthread_cond_init(&cond_p2c, NULL);
  pthread_cond_init(&cond_c2p, NULL);

  // 子スレッドの生成
  for (int i = 0; i < THREAD_NUM; i++)
  {
    t_arg[i].mutex_p2c = &mutex_p2c;
    t_arg[i].mutex_c2p = &mutex_c2p;
    t_arg[i].cond_p2c = &cond_p2c;
    t_arg[i].cond_c2p = &cond_c2p;
    t_arg[i].command = COMMAND_SLEEP;
    t_arg[i].command_counter = 1;
    t_arg[i].done_counter = 0;
    t_arg[i].thread = i;
  }
  for (int i = 0; i < THREAD_NUM; i++)
  {
    pthread_create(&(thread[i]), NULL, thread_child, &(t_arg[i]));
  }

  // 子スレッドの起動を待つ
  wait_for_completion(&mutex_c2p, &cond_c2p, t_arg);

  // セッションの作成
  spha_create_session(NULL, &session);

  // 通信路を確立
  spha_connect_session(session);

  // SpursEngine側ローカルメモリ領域の確保
  spha_object_t buffer_obj;
  spha_create_memory(session, 0, 0, 0, BUFFER_SIZE, 0, &buffer_obj);

  // メモリオブジェクトをマップ
  uint32_t buffer_ea;
  spha_map_memory(session, buffer_obj, 0, 0, 0, BUFFER_SIZE, 0, &buffer_ea);

  // ホスト側バッファを確保(128バイト・アライン)
  char * buffer1;
  char * buffer2;
  int err = posix_memalign((void **)&buffer1, 128, BUFFER_SIZE);
  if (err)
  {
    abort();
  }
  err = posix_memalign((void **)&buffer2, 128, BUFFER_SIZE);
  if (err)
  {
    abort();
  }

  // 一度転送させる(キャッシュ等の影響を排除して計測するため)
  uint32_t transfer_size = BUFFER_SIZE;
  spha_data_transfer_to(session, buffer_obj, 0, buffer1, &transfer_size);
  transfer_size = BUFFER_SIZE;
  spha_data_transfer_from(session, buffer_obj, 0, buffer2, &transfer_size);

  // バッファ初期化
  printf("Initializing Buffer...\n");
  for (int i = 0; i < BUFFER_SIZE; i++)
  {
    *(buffer1 + i) = rand() >> 24;
  }

  printf("Transfering...\n");

  tick_s = mcl_get_tick_count();

  // ホストからSpursEngineへ転送
  pthread_mutex_lock(&mutex_p2c);
  for (int i = 0; i < THREAD_NUM; i++)
  {
    t_arg[i].command = COMMAND_RUN_H2A;
    t_arg[i].command_counter++;
    t_arg[i].session = &session;
    t_arg[i].buffer_obj = &buffer_obj;
    t_arg[i].transfer_size = BUFFER_SIZE / THREAD_NUM;
    t_arg[i].offset = i * t_arg[i].transfer_size;
    t_arg[i].buffer = buffer1 + t_arg[i].offset;
  }
  pthread_cond_broadcast(&cond_p2c);
  pthread_mutex_unlock(&mutex_p2c);

  wait_for_completion(&mutex_c2p, &cond_c2p, t_arg);

  tick_e = mcl_get_tick_count();
  printf("Host to SpursEngine: %f MB/s\n", (float)BUFFER_SIZE / (float)(tick_e - tick_s) * 0.953674);

  tick_s = mcl_get_tick_count();

  // SpursEngineからホストへ転送
  pthread_mutex_lock(&mutex_p2c);
  for (int i = 0; i < THREAD_NUM; i++)
  {
    t_arg[i].command = COMMAND_RUN_A2H;
    t_arg[i].command_counter++;
    t_arg[i].session = &session;
    t_arg[i].buffer_obj = &buffer_obj;
    t_arg[i].transfer_size = BUFFER_SIZE / THREAD_NUM;
    t_arg[i].offset = i * t_arg[i].transfer_size;
    t_arg[i].buffer = buffer2 + t_arg[i].offset;
  }
  pthread_cond_broadcast(&cond_p2c);
  pthread_mutex_unlock(&mutex_p2c);

  wait_for_completion(&mutex_c2p, &cond_c2p, t_arg);

  tick_e = mcl_get_tick_count();
  printf("SpursEngine to Host: %f MB/s\n", (float)BUFFER_SIZE / (float)(tick_e - tick_s) * 0.953674);

  // メモリオブジェクトをアンマップ
  spha_unmap_memory(session, buffer_ea, 0);

  // SpursEngine側ローカルメモリ領域を解放
  spha_delete_object(session, buffer_obj);

  // セッションの通信路を切断
  spha_close_session(session);

  // セッションの破棄
  spha_delete_session(session);

  // 子スレッドを終了
  pthread_mutex_lock(&mutex_p2c);
  for (int i = 0; i < THREAD_NUM; i++)
  {
    t_arg[i].command = COMMAND_QUIT;
    t_arg[i].command_counter++;
  }
  pthread_cond_broadcast(&cond_p2c);
  pthread_mutex_unlock(&mutex_p2c);

  for (int i = 0; i < THREAD_NUM; i++)
  {
    pthread_join(thread[i], NULL);
  }

  // 転送内容チェック
  printf("Checking...\n");
  for (int i = 0; i < BUFFER_SIZE; i++)
  {
    if (*(buffer1 + i) != *(buffer2 + i))
    {
      printf("bad data at %d buffer1:%d buffer2:%d\n", i, *(buffer1 + i), *(buffer2 + i));
    }
  }

  // ホスト側バッファを解放
  free(buffer1);
  free(buffer2);

  printf("Done.\n");

  return 0;
}