Cell/B.E. & SpursEngine プログラミング
Home記事一覧BBS

SpursEngineでマンデルブロ集合のフルHD動画作成

SpursEngine Linux SDKのclubライブラリを使ってマンデルブロ集合の動画を作成するプログラムを作りました。4基のSPEでマンデルブロ集合の計算をしながらハードウェアエンコーダーでH.264のMPEG4動画を出力します。





ソースコードのダウンロード:spurs_club_mandel.tar.gz

実行方法

SpursEngineの開発環境は以下のページの方法でインストール済みであることとします。

PCにUbuntu 10.04 Lucid Lynxを「軽量」インストールする

UbuntuにSpursEngine Linux SDKをインストールする

FedoraにSpursEngine Linux SDKをインストールする

このプログラムではH.264のエレメンタリ・ストリーム・ファイルを出力します。MPEG-4コンテナへのMuxはFFmpegで行うので、あらかじめFFmpegをインストールしておきます。

●Ubuntuの場合
(一行で)
sudo apt-get --no-install-recommends install ffmpeg

●Fedoraの場合
su -c 'yum install ffmpeg'

ダウンロードしたソースコードspurs_club_mandel.tar.gzをホームディレクトリに置きます。

●解凍

tar xzf spurs_club_mandel.tar.gz

cd spurs_club_mandel

●コンパイル

make

●実行

./run.sh

●ffmpegの使い方など
音楽をMPEG-4にMuxするには、まずWAVファイルをfaacでm4a形式に変換します。

faac -w -b 160 --no-tns --no-midside -o output.m4a input.wav

その後ffmpegでMuxします。

ffmpeg -y -r 29.97 -i outh264.es -i output.m4a -vcodec copy -acodec copy -f mp4 output.mp4

ソースコード

SPE側プログラム(マンデルブロ集合計算部分):job.cpp
#include <stdio.h>
#include <spurs/spsa/spsa.h>
#include <spurs/club/suf.h>
#include <spu_mfcio.h>
#include <spu_intrinsics.h>
#include "data.h"

#define TIMER_COEFF 16.66667
#define MCL_DMA_TAG_ID 0

// YUV画像のバッファ
uint32_t buf_y[0x400] __attribute__((aligned(128)));
uint16_t buf_u[0x400] __attribute__((aligned(128)));
uint16_t buf_v[0x400] __attribute__((aligned(128)));


// 最内ループのマクロ
#define INNERLOOP() {                                   \
    vb = spu_msub(spu_mul(vf2, va), vb, vy);            \
    va = spu_sub(va2, spu_add(vb2, vx));                \
    va2 = spu_mul(va, va);                              \
    vb2 = spu_mul(vb, vb);                              \
    vtmp = spu_add(va2, vb2);                           \
    flag = spu_cmpgt(vf4, vtmp);                        \
    vcount = spu_sub(vcount, spu_and(vui1, flag));      \
    flagui = spu_extract(spu_gather(flag), 0);          \
    if (flagui == 0) break;}


// ローカルメモリから読み込んで転送完了を待つ
// ls:LS側アドレス ea:ローカルメモリ側実効アドレス size:転送サイズ
void mcl_load_and_wait(void * ls, unsigned int ea, unsigned int size)
{
  unsigned int mask;

  // 読み込みDMA実行
  mfc_get(ls, ea, size, MCL_DMA_TAG_ID, 0, 0);

  // DMAが完了するまで待つ
  mask = mfc_read_tag_mask();
  mfc_write_tag_mask(1 << MCL_DMA_TAG_ID);
  mfc_write_tag_update_all();
  mfc_read_tag_status();
  mfc_write_tag_mask(mask);
}


// ローカルメモリに書き込んで転送完了を待つ
// ls:LS側アドレス ea:ローカルメモリ側実効アドレス size:転送サイズ
void mcl_save_and_wait(void * ls, unsigned int ea, unsigned int size)
{
  unsigned int mask;

  // 書き込みDMA実行
  mfc_put(ls, ea, size, MCL_DMA_TAG_ID, 0, 0);

  // DMAが完了するまで待つ
  mask = mfc_read_tag_mask();
  mfc_write_tag_mask(1 << MCL_DMA_TAG_ID);
  mfc_write_tag_update_all();
  mfc_read_tag_status();
  mfc_write_tag_mask(mask);
}


// マイクロ秒単位のスリープ
void mcl_usleep(unsigned int time)
{
  unsigned int count = (unsigned int)((float)time * TIMER_COEFF);

  // イベントマスクを保存
  uint32_t mask_bak = spu_read_event_mask();

  // イベントをリセット
  spu_write_event_mask(0);
  spu_write_event_ack(~0);

  // SPU Decrementerにタイマー値をセット
  spu_writech(SPU_WrDec, count);

  // デクリメンタイベントをセット
  spu_write_event_mask(MFC_DECREMENTER_EVENT);

  // デクリメンタが0になるまでスリープ
  uint32_t events = spu_read_event_status();
  spu_write_event_ack(events);

  // イベントマスクを復元
  spu_write_event_mask(mask_bak);
}


// マンデルブロ集合を描画
void job(mandel_params_t * param)
{
  // vector unsigned intの下位1バイトをunsigned intにまとめるためのシャッフルパターン
  vector unsigned char vsp_vtou = {3,7,11,15,128,128,128,128,128,128,128,128,128,128,128,128};
  vector unsigned char vsp_vtos = {128,128,3,11,128,128,128,128,128,128,128,128,128,128,128,128};
  vector unsigned int vui1 = spu_splats(1u);
  vector unsigned int vmaxcount;
  vector float vf2 = spu_splats(2.0f);
  vector float vf4 = spu_splats(4.0f);
  unsigned int i, j, k, xres, yres, vsize, hsize, page, startline, endline, addr_ea_y, addr_ea_u, addr_ea_v, para_my, para_all, frame, max_count, out_ea;
  float xmin, xmax, ymin, ymax, dx, dy, zoom, cx, cy, color_u, color_v;
  // LS上のピクセルデータのポインタ(4バイト単位)
  uint32_t *pix;
  void *ptmp;
  uint16_t *pixu;
  void *ptmpu;
  uint16_t *pixv;
  void *ptmpv;

  xres = param->xres;
  yres = param->yres;
  cx = param->cx;
  cy = param->cy;
  zoom = param->zoom;
  frame = param->frame;
  max_count = param->max_count;
  color_u = param->color_u;
  color_v = param->color_v;
  out_ea = param->out_ea;
  para_my = param->para_my;
  para_all = param->para_all;
  hsize = spurs::YUV_HEADER_WIDTH_HD_I420;
  vsize = spurs::YUV_HEADER_HEIGHT_HD_I420;

  vmaxcount = spu_splats((unsigned int)max_count);

  xmin = cx - zoom * (float)xres / (float)yres;
  xmax = cx + zoom * (float)xres / (float)yres;
  ymin = cy - zoom;
  ymax = cy + zoom;

  dx = (xmax - xmin) / (float)xres;
  dy = (ymax - ymin) / (float)yres;
  startline = para_my;
  endline = yres;

  vector float vxmin = spu_splats(xmin);
  vxmin = spu_insert(xmin + dx, vxmin, 1);
  vxmin = spu_insert(xmin + dx * 2, vxmin, 2);
  vxmin = spu_insert(xmin + dx * 3, vxmin, 3);
  vector float vymin = spu_splats(ymin);
  vector float vdx = spu_splats(dx * 4);
  vector float vdy = spu_splats(dy);

  // バッファをクリア
  for (i = 0; i < hsize / 2; i++)
  {
    vector unsigned int * p;
    p = (vector unsigned int *)(void *)buf_y;
    *p = spu_splats(0x10101010u);
    p += hsize / 32;
    *p = spu_splats(0x10101010u);
    p = (vector unsigned int *)(void *)buf_u;
    *p = spu_splats(0x80808080u);
    p = (vector unsigned int *)(void *)buf_v;
    *p = spu_splats(0x80808080u);
  }

  // ダブルバッファのページをリセット
  page = 0;

  for (i = startline; i < endline; i += para_all)
  {
    // 偶数ライン:0 奇数ライン:1
    unsigned int even = i & 1;

    // 書き込み先アドレス
    addr_ea_y = out_ea + sizeof(spurs::yuv_header_t) + hsize * i ;
    addr_ea_u = out_ea + sizeof(spurs::yuv_header_t) + hsize * vsize + (hsize / 2) * (i / 2);
    addr_ea_v = addr_ea_u + hsize * vsize / 4;

    vector float vy = spu_add(vymin, spu_mul(vdy, spu_convtf(spu_splats(i), 0)));

    // ピクセルバッファのポインタをLS上の汎用バッファの先頭にセット
    pix = buf_y;
    pixu = buf_u;
    pixv = buf_v;

    // ダブルバッファのためのオフセットをプラス
    pix += page * 512;
    pixu += page * 512;
    pixv += page * 512;

    ptmp = pix;
    ptmpu = pixu;
    ptmpv = pixv;

    for (j = 0; j < xres / 4; j++)
    {
      vector float vtmp;
      vector unsigned int flag;
      vector float vx = spu_add(vxmin, spu_mul(vdx, spu_convtf(spu_splats(j), 0)));
      vector float va = vx;
      vector float vb = vy;
      vector float va2 = spu_mul(va, va);
      vector float vb2 = spu_mul(vb, vb);
      vector unsigned int vcount = vmaxcount;

      for (k = 0; k < max_count; k+=8)
      {
        unsigned int flagui;
        INNERLOOP();
        INNERLOOP();
        INNERLOOP();
        INNERLOOP();
        INNERLOOP();
        INNERLOOP();
        INNERLOOP();
        INNERLOOP();
      }

      // LS上のピクセルバッファにピクセルの値を書き込む
      vector float vcountf = spu_convtf(vcount, 0);
      vector unsigned int vcounty = spu_convtu(spu_mul(vcountf, spu_splats(24556.8f / ((float)max_count * (float)max_count))), 0);
      *pix = spu_extract(spu_shuffle(spu_splats(235u) - vcounty, spu_splats(0u), vsp_vtou), 0);

      // 奇数ラインの時はUVの計算を省略
      if (even == 0)
      {
        // y:16-235 u,v:16-240
        vector unsigned int vcountu = spu_convtu(spu_splats(128.0f) + vcountf * spu_splats(color_u / (float)max_count), 0);
        vector unsigned int vcountv = spu_convtu(spu_splats(128.0f) + vcountf * spu_splats(color_v / (float)max_count), 0);
        *pixu = (unsigned short)spu_extract(spu_shuffle(vcountu, spu_splats(0u), vsp_vtos), 0);
        *pixv = (unsigned short)spu_extract(spu_shuffle(vcountv, spu_splats(0u), vsp_vtos), 0);
      }

      // LS上のピクセルバッファのポインタを進める
      pix++;
      pixu++;
      pixv++;
    }

    // ダブルバッファの前回のDMAが完了するまで待つ
    mfc_write_tag_mask(1 << page);
    mfc_write_tag_update_all();
    mfc_read_tag_status();

    // ローカルメモリのフレームバッファに書き込み
    // 書き込みDMA実行。転送サイズは16バイト単位に補正
    mfc_put(ptmp, addr_ea_y, hsize & ~15, page, 0, 0);
    // 奇数ラインの時はUVの書き込みを省略
    if (even == 0)
    {
      mfc_put(ptmpu, addr_ea_u, (hsize / 2) & ~15, page, 0, 0);
      mfc_put(ptmpv, addr_ea_v, (hsize / 2) & ~15, page, 0, 0);
    }

    // ダブルバッファを切り替え(0と1で交互に)
    page = page ^ 1;
  }

  // 最後のDMAが完了するまで待つ
  mfc_write_tag_mask(1 << page);
  mfc_write_tag_update_all();
  mfc_read_tag_status();
}

SPE側プログラム(SPSA実行制御部分):spe.cpp
#include "job.h"

// パラメーターの情報
param_info_t param_info __attribute__((aligned(128)));

// パラメーター
mandel_params_t param __attribute__((aligned(128)));

// SUFのSPEに返す結果
worker_result_t worker_result __attribute__((aligned(128)));


int main(vector unsigned int arg0, vector unsigned int arg1, vector unsigned int arg2)
{
  // このSPEの通し番号
  unsigned int my_num = spu_extract(arg0, 0);
  // 全SPE数(SPSAの分)
  unsigned int spes_num = spu_extract(arg0, 1);
  // データバッファのea
  unsigned int data_ea = spu_extract(arg0, 2);
  // SUFのSPEに返す結果のea
  unsigned int worker_result_ea = data_ea + sizeof(param_info_t) + sizeof(mandel_params_t) + my_num * sizeof(worker_result_t);

  // 完了したジョブのID
  unsigned int done_job = 0;

  unsigned int run = 1;
  while (run)
  {
    // パラメーター情報を読み込む
    mcl_load_and_wait(&param_info, data_ea, sizeof(param_info_t));
    if (param_info.command == COMMAND_WORK)
    {
      if (counter_greater_than(param_info.job_id, done_job))
      {
        // まだ完了していないジョブを実行

        // パラメーターを読み込む
        unsigned int param_ea = data_ea + sizeof(param_info_t);
        mcl_load_and_wait(&param, param_ea, sizeof(mandel_params_t));
        param.para_all = spes_num;
        param.para_my = my_num;

        if (SUF_SPE_WORK)
        {
          param.para_all += NUM_OF_SPE_CLUB;
        }

        job(&param);

        // SUFのSPEに結果を返す(完了したジョブIDを知らせる)
        done_job = param_info.job_id;
        worker_result.job_id = done_job;
        mfc_put(&worker_result, worker_result_ea, sizeof(worker_result_t), 0, 0, 0);
      }
    }
    else if(param_info.command == COMMAND_QUIT)
    {
      // 終了コマンドが来たらループを抜けて終了
      run = 0;
      break;
    }

    // 少し待ってから繰り返す
    mcl_usleep(WAIT_USLEEP);
  }

  return 0;
}

SPE側プログラム(SUF実行制御部分):suf.cpp
#include "job.h"

//#define DEBUG

// 独自ストリーム定義パラメーター
#define MY_STREAM_TYPE_ID (spurs::STREAM_TYPE_USER_DEFINE_BEGIN + 0)
#define MY_FIFO_SIZE 128
#define MY_FIFO_TYPE spurs::FIFO_TYPE_SINGLE

const char * name  __attribute__((aligned(128))) = "MyFilter";

// パラメーターの情報
param_info_t param_info __attribute__((aligned(128)));

// パラメーター
mandel_params_t param __attribute__((aligned(128)));

// Worker SPEから返される結果
worker_result_t worker_result[NUM_OF_SPE_SPSA] __attribute__((aligned(128)));

// バッファ
mcl_cf_param_data_t buf_in[32] __attribute__((aligned(128)));
uint32_t buf_h[32] __attribute__((aligned(128)));

// エンディアン変換
static inline unsigned int letobe(unsigned int ui)
{
  vector unsigned char vsp_endian = {3,2,1,0,7,6,5,4,11,10,9,8,15,14,13,12};
  return spu_extract(spu_shuffle(spu_splats(ui), spu_splats(0u), vsp_endian), 0);
}

void info()
{
  suf::def_name(name);
  suf::def_version(1, 0, 0, 0);
  suf::def_parallel_mode(spurs::FILTER_PARALLEL_MODE_SINGLE | spurs::FILTER_PARALLEL_MODE_TILE_DIVISION);

  // 独自ストリーム定義
  suf::def_stream(MY_STREAM_TYPE_ID, MY_FIFO_SIZE, MY_FIFO_TYPE);

  suf::def_input(MY_STREAM_TYPE_ID, 1, 1);
  suf::def_output(spurs::STREAM_TYPE_I420_HD, 1, 1);
}

void init(uint32_t parallel_fraction)
{
}

void fmain(suf::packet_t * input[], suf::packet_t * output[], uint32_t parallel_fraction)
{
  unsigned int param_info_ea, param_ea, worker_result_ea;

  suf::packet_t * in0 = input[0];
  suf::packet_t * out0 = output[0];

  // param_infoをクリア
  param_info.command = 0;
  param_info.job_id = 0;
  param_info.param_ea = 0;

  // ホストからのパラメーターを取得
  suf::dma::get(&buf_in, in0[0].ea, 128);

  // データバッファのメモリ配置について
  // データのバッファ開始ea: (main.cppのbuffer_eaより取得:buf_in[1].ui)
  // |-パラメーターリスト(param_info_t)
  // |-マンデルブロ集合用パラメーター(mandel_params_t)
  // |-Worker SPEから返される結果(worker_result_t) x NUM_OF_SPE_SPSA
  param_info_ea = buf_in[1].ui;
  param_ea = param_info_ea + sizeof(param_info_t);
  worker_result_ea = param_ea + sizeof(mandel_params_t);

  // パケットからパラメーターを読み込んで他のSPEに渡す
  if (in0[0].ea == 0)
  {
    // このパケットは空
    param_info.command = COMMAND_NONE;
    param_info.job_id = 0;
  }
  else
  {
    param_info.command = COMMAND_WORK;
    param_info.job_id = buf_in[0].ui + 1;
    param_info.param_ea = param_ea;

    param.xres = buf_in[2].ui;
    param.yres = buf_in[3].ui;
    param.cx = buf_in[4].f;
    param.cy = buf_in[5].f;
    param.zoom = buf_in[6].f;
    param.frame = buf_in[7].ui;
    param.max_count = buf_in[8].ui;
    param.color_u = buf_in[9].f;
    param.color_v = buf_in[10].f;
    param.out_ea = out0[0].ea;

    // パラメーターをローカルメモリに書き込む
    mfc_put(&param, param_info.param_ea, sizeof(mandel_params_t), 0, 0, 0);
  }

  // パラメーター情報をローカルメモリに書き込む
  // ※フェンスを付けてパラメーターより後に書き込まれるようにする
  // 他のSPEはこれを読み込んで実行開始
  mfc_putf(&param_info, param_info_ea, sizeof(param_info_t), 0, 0, 0);
  mfc_write_tag_mask(1 << 0);
  mfc_write_tag_update_all();
  mfc_read_tag_status();

  if (SUF_SPE_WORK)
  {
    // 自分もジョブを処理する場合
    param.para_all = NUM_OF_SPE_CLUB + NUM_OF_SPE_SPSA;
    // このSPEの番号は最後
    param.para_my = NUM_OF_SPE_CLUB + NUM_OF_SPE_SPSA - 1;
    job(&param);
  }

  // YUVヘッダ情報を書き込み
  spurs::yuv_header_t * yuv_header;
  yuv_header = (spurs::yuv_header_t *)(void *)&buf_h;
  yuv_header->width = spurs::YUV_HEADER_WIDTH_HD_I420;
  yuv_header->height = spurs::YUV_HEADER_HEIGHT_HD_I420;
  yuv_header->horizontal_size = param.xres;
  yuv_header->vertical_size = param.yres;
  yuv_header->picture_structure = spurs::YUV_HEADER_PICTURE_STRUCTURE_PROGRESSIVE;
  mfc_put(&buf_h, param.out_ea, 128, 0, 0, 0);
  mfc_write_tag_mask(1 << 0);
  mfc_write_tag_update_all();
  mfc_read_tag_status();

  // 他のSPEがこのIDのジョブを終わらせるまで待つ
  while (1)
  {
    suf::dma::get(&worker_result, worker_result_ea, sizeof(worker_result_t) * NUM_OF_SPE_SPSA);
    unsigned int not_yet = 0;
    for (int i = 0; i < NUM_OF_SPE_SPSA; i++)
    {
      if (counter_greater_than(param_info.job_id, worker_result[i].job_id))
      {
        not_yet++;
      }
    }

    if (!not_yet)
    {
      break;
    }

    // 少し待ってから繰り返す
    mcl_usleep(WAIT_USLEEP);
  }

  if (in0[0].flags & suf::PACKET_FLAGS_EOS)
  {
    // パケットがEOSだったらEOSをプッシュして終了
    suf::push_eos(0);

    // 終了コマンドを書き込む
    param_info.command = COMMAND_QUIT;
    param_info.job_id = 0;
    mfc_putf(&param_info, param_info_ea, sizeof(param_info_t), 0, 0, 0);
    mfc_write_tag_mask(1 << 0);
    mfc_write_tag_update_all();
    mfc_read_tag_status();
  }

  // パケットをリリース
  suf::release_input(0, 1);
  suf::release_output(0, 1);
}

ホスト側プログラム:main.cpp
#include <iostream>
#include <fstream>
#include <memory>
#include <stdlib.h>
#include <wchar.h>
#include <spurs/spha/spha.h>
#include <spurs/club/club.h>
#include <spurs/common/byte_order.h>
#include <spurs/sp3/sp3_cmdif.h>
#include <sys/time.h>
#include "data.h"

//#define DEBUG

// エンコーダー設定
#define POP_BUFFER_SIZE 0x200000
#define VIDEO_WIDTH 1920
#define VIDEO_HEIGHT 1080
#define BIT_RATE 20000
#define MAX_BIT_RATE 20000
#define PROFILE_ID ST("Baseline Profile")
#define LEVEL_ID ST("Level 4")
#define NUM_B_FRAMES 0
#define PICTURE_STRUCTURE ST("Progressive")
#define PTS_INC 3003

// マンデルブロ集合描画設定
#define FRAMES 1800
#define MAX_COUNT 192

#define COLORU -111.3f
#define COLORV 76.8f
#define ZOOM_START 0.008346f
#define ZOOM_SPEED 0.997f
#define CENTER_X 1.260543f
#define CENTER_Y -0.040396f

/*****************************
#define COLORU 111.3f
#define COLORV -30.0f
#define ZOOM_START 0.005708f
#define ZOOM_SPEED 0.9975f
#define CENTER_X 0.180487f
#define CENTER_Y -0.662772f

#define ZOOM_START 2.0f
#define ZOOM_SPEED 0.994f
#define CENTER_X 0.189705f
#define CENTER_Y -0.65044f
******************************/

// 処理がTIMEOUT us滞ったらタイムアウトにする
#define TIMEOUT 20000

// ファイル名
#define SUF_FILENAME ST("suf.elf")
#define SPE_FILENAME "spe.elf"
#define ES_FILENAME "outh264.es"

// spurs::rc_tを右に28bitシフトして4bitのエラータイプを得る
#define RSHIFT_TYPE_RC 28

// spurs::rc_tのエラータイプ(上位4bit)
#define TYPE_RC_OK 0x0
#define TYPE_RC_WARNING 0x1
#define TYPE_RC_ERROR 0x2
#define TYPE_RC_SPSA_ERROR 0x3
#define TYPE_RC_CODEC_WARNING 0x4
#define TYPE_RC_CODEC_ERROR 0x5
#define TYPE_RC_SUF_WARNING 0x6
#define TYPE_RC_SUF_ERROR 0x7
#define TYPE_RC_INTERNAL_ERROR 0xf


// SPE実行スレッド用構造体
typedef struct
{
  // セッション
  spha_session_t * session;
  // スレッド
  spha_thread_t * thread;
  // SPEのmain()に与えるパラメーター
  uint32_t * param;
  // パラメーターのサイズ
  uint32_t param_size;
} spe_thread_args_t;


// pusher threadのパラメーターの構造体
typedef struct
{
  spurs::i_system * system_if;
  spurs::i_pusher * pusher_if;
  spurs::i_pusher * pusher_fd_if;
  int err;
  unsigned int buffer_ea;
} pusher_thread_args_t;


// popper threadのパラメーターの構造体
typedef struct
{
  spurs::i_system * system_if;
  spurs::i_popper * popper_if;
  int err;
} popper_thread_args_t;


// リトルエンディアンからビッグエンディアンに変換
static inline uint32_t letobe32(uint32_t x)
{
  return (((x & 0x000000ffU) << 24) |
          ((x & 0x0000ff00U) << 8) |
          ((x & 0x00ff0000U) >> 8) |
          ((x & 0xff000000U) >> 24));
}


// リトルエンディアンからビッグエンディアンに変換(64bit)
static inline uint64_t letobe64(uint64_t x)
{
  return (((x & 0x00000000000000ffULL) << 56) |
          ((x & 0x000000000000ff00ULL) << 40) |
          ((x & 0x0000000000ff0000ULL) << 24) |
          ((x & 0x00000000ff000000ULL) << 8) |
          ((x & 0x000000ff00000000ULL) >> 8) |
          ((x & 0x0000ff0000000000ULL) >> 24) |
          ((x & 0x00ff000000000000ULL) >> 40) |
          ((x & 0xff00000000000000ULL) >> 56));
}


// SPHAエラーチェック
// 戻り値: 0:正常 1:エラー
static inline int check_st(spha_status_t st, const char * message)
{
  int error = 0;
  if (st)
  {
    printf("Error: %s\n", message);
    printf("SPHA: %s\n", spha_util_result_code_name(st));
    printf("SP3: %s\n", spha_util_sp3_result_code_name(st));
    error = 1;
  }
  return error;
}


// spurs::rc_tをチェック
// 戻り値: 0:正常 1:ワーニング
// エラーの場合は強制終了
static inline int check_rc(spurs::rc_t rc)
{
  int error = 0;
  switch (rc >> RSHIFT_TYPE_RC)
  {
    case TYPE_RC_OK:
      break;
    case TYPE_RC_WARNING:
      error = 1;
      wprintf(L"RC_WARNING: 0x%08x\n", rc);
      break;
    case TYPE_RC_ERROR:
      error = 2;
      wprintf(L"RC_ERROR: 0x%08x\n", rc);
      throw 1;
      break;
    case TYPE_RC_SPSA_ERROR:
      error = 2;
      wprintf(L"RC_SPSA_ERROR: 0x%08x\n", rc);
      throw 1;
      break;
    case TYPE_RC_CODEC_WARNING:
      error = 1;
      wprintf(L"RC_CODEC_WARNING: 0x%08x\n", rc);
      break;
    case TYPE_RC_CODEC_ERROR:
      error = 2;
      wprintf(L"RC_CODEC_ERROR: 0x%08x\n", rc);
      throw 1;
      break;
    case TYPE_RC_SUF_WARNING:
      error = 1;
      wprintf(L"RC_SUF_WARNING: 0x%08x\n", rc);
      break;
    case TYPE_RC_SUF_ERROR:
      error = 2;
      wprintf(L"RC_SUF_ERROR: 0x%08x\n", rc);
      throw 1;
      break;
    case TYPE_RC_INTERNAL_ERROR:
      error = 2;
      wprintf(L"RC_INTERNAL_ERROR: 0x%08x\n", rc);
      throw 1;
      break;
    default:
      break;
  }

  if (error == 2)
  {
    exit(EXIT_FAILURE);
  }

  return error;
}


// 時間計測用関数。Tick値を返す(マイクロ秒)
static inline long long mcl_get_tick_count(void)
{
  struct timeval timeprof;
  gettimeofday(&timeprof, NULL);
  return ((long long)timeprof.tv_sec * (long long)1000000 + (long long)timeprof.tv_usec);
}


class mandel_club
{
private:
  spurs::i_system * system_if;
  spurs::i_pusher * pusher_if;
  spurs::i_pusher * pusher_fd_if;
  spurs::i_popper * popper_if;
  spurs::i_filter * filter_filter_if;
  spurs::i_task * codec_task_if;
  spurs::i_task * filter_task_if;
  spurs::i_parameter * system_parameter_if;
  spurs::i_parameter * filter_parameter_if;
  spurs::i_parameter * codec_parameter_if;
  spurs::i_exception_log * system_log_if;
  spurs::i_exception_log * filter_log_if;
  spurs::i_exception_log * codec_log_if;
  pusher_thread_args_t pusher_thread_arg;
  popper_thread_args_t popper_thread_arg;
  pthread_t pusher_th;
  pthread_t popper_th;
  int err;
  spha_session_t session;
  uint32_t access_attributes;
  spha_object_t buffer_obj_elf;
  spha_object_t buffer_obj;
  // SPEのmain()に与えるパラメーター
  uint32_t arg[NUM_OF_SPE_SPSA][12];
  spha_thread_t spe_th[NUM_OF_SPE_SPSA];
  spe_thread_args_t spe_arg[NUM_OF_SPE_SPSA];
  pthread_t pth[NUM_OF_SPE_SPSA];
  char * buffer_elf;
  char * buffer_data;
  uint32_t buffer_ea, buffer_ea_elf;

  static void * pusher_thread(void * arg);
  static void * popper_thread(void * arg);
  static void * spe_run_thread(void * arg);

protected:

public:
  IUnknown * system_obj;
  IUnknown * pusher_obj;
  IUnknown * pusher_fd_obj;
  IUnknown * popper_obj;
  IUnknown * codec_obj;
  IUnknown * filter_obj;

  mandel_club();
  ~mandel_club();
  void run();
  void init_spha();
  void init_club();
  void release_spha();
  void release_club();
};


mandel_club::mandel_club()
{
  system_if = NULL;
  pusher_if = NULL;
  pusher_fd_if = NULL;
  popper_if = NULL;
  filter_filter_if = NULL;
  codec_task_if = NULL;
  filter_task_if = NULL;
  system_parameter_if = NULL;
  filter_parameter_if = NULL;
  codec_parameter_if = NULL;
  system_log_if = NULL;
  filter_log_if = NULL;
  codec_log_if = NULL;
  err = 0;
  access_attributes = SP3_OBJ_ATTR_HOST_READABLE | SP3_OBJ_ATTR_HOST_WRITABLE | SP3_OBJ_ATTR_HOST_EXECUTABLE | SP3_OBJ_ATTR_SPE_READABLE | SP3_OBJ_ATTR_SPE_WRITABLE | SP3_OBJ_ATTR_SPE_EXECUTABLE | SP3_OBJ_ATTR_SPE_MAPPED_READABLE | SP3_OBJ_ATTR_SPE_MAPPED_WRITABLE;
  buffer_elf = NULL;
  for (int i = 0; i < NUM_OF_SPE_SPSA; i++)
  {
    spe_th[i] = NULL;
    pth[i] = NULL;
  }
}


mandel_club::~mandel_club()
{
  release_club();
  release_spha();
}


// SPE実行スレッド
void * mandel_club::spe_run_thread(void * arg)
{
  spe_thread_args_t * spearg = (spe_thread_args_t *)arg;

  // SPEスレッドを実行
  spha_resume_spe_thread(*(spearg->session), *(spearg->thread), SP3_SPE_THREAD_RESUME_FLAG_BLOCKING, SP3_SPE_THREAD_RESUME_DEFAULT_ENTRY, spearg->param, spearg->param_size, NULL, NULL, NULL);

  // スレッド終了
  pthread_exit(NULL);
}


// pusherスレッド
// パラメーターのパケットをpushする
void * mandel_club::pusher_thread(void * arg)
{
  pusher_thread_args_t * a = (pusher_thread_args_t *)arg;
  uint32_t * data;
  spurs::frame_description_t * fd;
  uint32_t data_size = 128;
  uint32_t fd_size = sizeof(spurs::frame_description_t);
  int err = posix_memalign((void **)&data, 128, data_size);
  err = posix_memalign((void **)&fd, 128, fd_size);

  mcl_cf_param_data_t zoom, cx, cy, color_u, color_v;
  zoom.f = ZOOM_START;
  cx.f = CENTER_X;
  cy.f = CENTER_Y;
  uint64_t pts = 0;

  unsigned int j = 0;
  unsigned int i = 0;
  while ((i < FRAMES) && (a->system_if->get_status() == spurs::STATUS_RUNNING))
  {
    float v = (float)i / (float)FRAMES;
    color_u.f = COLORU * v + COLORV * (1 - v);
    color_v.f = COLORV * v + COLORU * (1 - v);

    if (a->pusher_if->has_space() && a->pusher_fd_if->has_space())
    {
      // パラメーターをセットしてpush
      data[0] = letobe32(i); // ジョブID
      // SUFのSPEとSPSAのSPEで共有させるメモリ領域
      data[1] = letobe32(a->buffer_ea);
      data[2] = letobe32(VIDEO_WIDTH);
      data[3] = letobe32(VIDEO_HEIGHT);
      data[4] = letobe32(cx.ui);
      data[5] = letobe32(cy.ui);
      data[6] = letobe32(zoom.ui);
      data[7] = letobe32(i);
      data[8] = letobe32(MAX_COUNT);
      data[9] = letobe32(color_u.ui);
      data[10] = letobe32(color_v.ui);
      // Frame Descriptionパケットのパラメーター
      pts += PTS_INC; // Presentation Time Stamp
      fd->pts = letobe64(pts & 0x1ffffffffULL);
      fd->flag = letobe32(spurs::FRAME_DESCRIPTION_FLAG_PTS);
      // パケットをプッシュ
      check_rc(a->pusher_if->push(data, data_size, NULL, 0));
      check_rc(a->pusher_fd_if->push(fd, fd_size, NULL, 0));
      //printf("host push %d\n", i);
      zoom.f = zoom.f * ZOOM_SPEED;
      i++;
      j = 0;
    }
    else
    {
      j++;
      if (j > TIMEOUT)
      {
        wprintf(L"ERROR. PUSHER TIMEOUT at frame: %d\n", i);
        a->err = 1;
        break;
      }
    }
  }

  free(data);
  free(fd);

  // 終了パケットをプッシュ
  a->pusher_if->push_eos();
  a->pusher_fd_if->push_eos();

  // スレッド終了
  pthread_exit(NULL);
}


// popperスレッド
// エンコード結果をpopしてファイルに保存する
void * mandel_club::popper_thread(void * arg)
{
  popper_thread_args_t * a = (popper_thread_args_t *)arg;
  char * data;
  spurs::frame_description_t * fd;
  uint32_t data_size = POP_BUFFER_SIZE;
  uint32_t fd_size = sizeof(spurs::frame_description_t);
  int err = posix_memalign((void **)&data, 128, data_size);
  err = posix_memalign((void **)&fd, 128, fd_size);

  unsigned int j = 0;
  unsigned int i = 0;

  // h264エレメンタリストリーム出力ファイルをオープン
  std::ofstream fout;
  fout.open(ES_FILENAME, std::ios::binary);

  // エンコーダーからのパケットをpopしてファイルに書き込み
  while ((!a->popper_if->is_eos()) && (a->system_if->get_status() == spurs::STATUS_RUNNING))
  {
    if (a->popper_if->has_packet())
    {
      data_size = POP_BUFFER_SIZE;
      fd_size = sizeof(spurs::frame_description_t);
      check_rc(a->popper_if->pop(fd, &fd_size, data, &data_size));
      fout.write(data, data_size);
      printf("host pop %d\n", i);
      i++;
      j = 0;
    }
    else
    {
      j++;
      if (j > TIMEOUT)
      {
        wprintf(L"ERROR. POPPER TIMEOUT at frame: %d\n", i);
        a->err = 1;
        break;
      }
    }
  }

  // ストリーム処理終了
  if (a->system_if->get_status() == spurs::STATUS_RUNNING)
  {
    check_rc(a->system_if->stop());
  }

  // ファイルをクローズ
  fout.close();

  free(data);
  free(fd);

  // スレッド終了
  pthread_exit(NULL);
}


void mandel_club::init_spha()
{
  // ---------- SPHA/SPSA初期化 ----------
  // セッションの作成
  check_st(spha_create_session(NULL, &session), "create");

  // 通信路を確立
  check_st(spha_connect_session(session), "connect");

  // SPEプログラムファイルをオープン
  std::ifstream fin;
  fin.open(SPE_FILENAME, std::ios::binary);

  // プログラムサイズを調べる
  fin.seekg(0, std::ios::end);
  uint32_t spe_elf_size = fin.tellg();
  fin.seekg(0, std::ios::beg);

  // プログラムサイズを128の倍数に揃える
  spe_elf_size = (spe_elf_size + 127) & ~127;

  // ホスト側バッファを確保(128バイト・アライン)
  int err = posix_memalign((void **)&buffer_elf, 128, spe_elf_size);
  if (err)
  {
    abort();
  }

  // SPEプログラムファイルをロード
  fin.read(buffer_elf, spe_elf_size);
  fin.close();

  uint32_t buffer_size = 0x1000;
  // ホスト側データ用バッファを確保
  err = posix_memalign((void **)&buffer_data, 128, buffer_size);
  if (err)
  {
    abort();
  }
  for (unsigned int i = 0; i < buffer_size; i++)
  {
    buffer_data[i] = 0;
  }

  // SpursEngine側ローカルメモリ領域の確保
  // プログラム用
  spha_create_memory(session, access_attributes, 0, 0, spe_elf_size, 0, &buffer_obj_elf);
  // データ用
  spha_create_memory(session, access_attributes, 0, 0, buffer_size, 0, &buffer_obj);

  // メモリオブジェクトをマップ
  spha_map_memory(session, buffer_obj_elf, 0, 0, 0, spe_elf_size, 0, &buffer_ea_elf);
  spha_map_memory(session, buffer_obj, 0, 0, 0, buffer_size, 0, &buffer_ea);

  // SpursEngine側ローカルメモリに転送
  uint32_t transfer_size = spe_elf_size;
  spha_data_transfer_to(session, buffer_obj_elf, 0, buffer_elf, &transfer_size);
  transfer_size = buffer_size;
  spha_data_transfer_to(session, buffer_obj, 0, buffer_data, &transfer_size);

  // SPEプログラムローダに与える引数をセット
  // エンディアン変換して渡す
  uint32_t loader_arg[4];
  // プログラムデータの実効アドレス
  loader_arg[0] = letobe32(buffer_ea_elf);
  // プログラムデータのサイズ
  loader_arg[1] = letobe32(spe_elf_size);
  loader_arg[2] = 0;
  loader_arg[3] = 0;

  for (int i = 0; i < NUM_OF_SPE_SPSA; i++)
  {
    // SPEスレッドを作成
    spha_create_spe_thread(session, access_attributes, 0, SP3_SPE_LOADER_DEFAULT, loader_arg, sizeof(loader_arg), &spe_th[i]);
  }

  for (int i = 0; i < NUM_OF_SPE_SPSA; i++)
  {
    // SPEの通し番号
    arg[i][0] = spurs_convert_ui32((uint32_t)i);
    // 全SPE数
    arg[i][1] = spurs_convert_ui32(NUM_OF_SPE_SPSA);
    // データバッファのea
    arg[i][2] = spurs_convert_ui32(buffer_ea);

    // SPEスレッドを実行
    spe_arg[i].session = &session;
    spe_arg[i].thread = &spe_th[i];
    spe_arg[i].param = arg[i];
    spe_arg[i].param_size = sizeof(arg);

    pthread_create(&pth[i], NULL, &spe_run_thread, &spe_arg[i]);
  }
}


void mandel_club::init_club()
{
  // ---------- club初期化 ----------
  // systemクラスインスタンスを生成
  // ※ここでSPHAで作成したセッションを指定しないとSHPA/SPSAとメモリ領域を共有できないので注意
  check_rc(spurs::create_system(&system_obj, session));

  // pusherクラスインスタンスを生成
  check_rc(spurs::create_pusher(&pusher_obj));
  check_rc(spurs::create_pusher(&pusher_fd_obj));

  // popperクラスインスタンスを生成
  check_rc(spurs::create_popper(&popper_obj));

  // codecクラスインスタンスを生成
  // (H264エンコーダーコーデックを指定)
  check_rc(spurs::create_codec(&codec_obj, spurs::CODEC_TYPE_SPURS_H264_ENCODER_HD, NULL));
  //check_rc(spurs::create_codec(&codec_obj, spurs::CODEC_TYPE_SPURS_MPEG2_ENCODER_HD, NULL));

  // filterクラスインスタンスを生成
  check_rc(spurs::create_filter(&filter_obj, NULL));

  // systemのi_parameterインターフェースを取得
  check_rc(system_obj->QueryInterface(spurs::IID_PARAMETER, (void **)&system_parameter_if));

  // 使用するSPEの数をセット
  check_rc(system_parameter_if->set_uint32(ST("Number of SPEs"), NUM_OF_SPE_CLUB));

#ifdef DEBUG
  // デバッグ用
  check_rc(system_parameter_if->set_bool(ST("Debug Kernel"), 1));
#endif // DEBUG

  // systemのi_parameterインターフェースを解放
  system_parameter_if->Release();

  // i_systemインターフェースを取得
  check_rc(system_obj->QueryInterface(spurs::IID_SYSTEM, (void **)&system_if));

  // filterのi_filterインターフェースを取得
  check_rc(filter_obj->QueryInterface(spurs::IID_FILTER, (void **)&filter_filter_if));
  // SUF ELFファイルをロード
  check_rc(filter_filter_if->load(SUF_FILENAME));
  // filterのi_filterインターフェースを解放
  filter_filter_if->Release();

  // filterのi_parameterインターフェースを取得
  check_rc(filter_obj->QueryInterface(spurs::IID_PARAMETER, (void **)&filter_parameter_if));

  if (NUM_OF_SPE_CLUB > 1)
  {
    // filterを並列実行モードにする
    check_rc(filter_parameter_if->set_enum(ST("Parallel Mode"), ST("Tile Division")));
    check_rc(filter_parameter_if->set_uint32(ST("Parallel Divisor"), NUM_OF_SPE_CLUB));
  }

  // filterのi_parameterインターフェースを解放
  filter_parameter_if->Release();

  // i_pusherインターフェースを取得
  check_rc(pusher_obj->QueryInterface(spurs::IID_PUSHER, (void **)&pusher_if));
  check_rc(pusher_fd_obj->QueryInterface(spurs::IID_PUSHER, (void **)&pusher_fd_if));
  // i_popperインターフェースを取得
  check_rc(popper_obj->QueryInterface(spurs::IID_POPPER, (void **)&popper_if));
  // filterのi_taskインターフェースを取得
  check_rc(filter_obj->QueryInterface(spurs::IID_TASK, (void **)&filter_task_if));
  // codecのi_taskインターフェースを取得
  check_rc(codec_obj->QueryInterface(spurs::IID_TASK, (void **)&codec_task_if));
  // パイプラインを接続
  // pusher -> filter -+-> codec -> popper
  // pusher_fd --------+
  check_rc(system_if->connect(pusher_if->get_output_pin(), filter_task_if->get_input_pin(0)));
  check_rc(system_if->connect(filter_task_if->get_output_pin(0), codec_task_if->get_input_pin(0)));
  check_rc(system_if->connect(pusher_fd_if->get_output_pin(), codec_task_if->get_input_pin(1)));
  check_rc(system_if->connect(codec_task_if->get_output_pin(0), popper_if->get_input_pin()));

  // codecのi_parameterインターフェースを取得
  check_rc(codec_obj->QueryInterface(spurs::IID_PARAMETER, (void **)&codec_parameter_if));

  // codecのパラメーターを設定
  check_rc(codec_parameter_if->set_uint32(ST("Horizontal Size"), VIDEO_WIDTH));
  check_rc(codec_parameter_if->set_uint32(ST("Vertical Size"), VIDEO_HEIGHT));
  check_rc(codec_parameter_if->set_uint32(ST("Bit Rate"), BIT_RATE));
  check_rc(codec_parameter_if->set_uint32(ST("Max Bit Rate"), MAX_BIT_RATE));
  check_rc(codec_parameter_if->set_enum(ST("Profile Identification"), PROFILE_ID));
  check_rc(codec_parameter_if->set_uint32(ST("Number of Consecutive B-Frames"), NUM_B_FRAMES));
  check_rc(codec_parameter_if->set_enum(ST("Picture Structure"), PICTURE_STRUCTURE));
  check_rc(codec_parameter_if->set_enum(ST("Level Identification"), LEVEL_ID));
  codec_parameter_if->Release();

  // パラメーター、パイプラインを確定
  check_rc(system_if->allocate());

  // codecのi_taskインターフェースを解放
  codec_task_if->Release();
  // filterのi_taskインターフェースを解放
  filter_task_if->Release();
  // i_pusherインターフェースを解放
  pusher_if->Release();
  pusher_fd_if->Release();
  // i_popperインターフェースを解放
  popper_if->Release();
}


void mandel_club::release_spha()
{
  // ---------- SPHA/SPSA解放  ----------
  // SPE実行スレッドの終了を待つ
  for (int i = 0; i < NUM_OF_SPE_SPSA; i++)
  {
    pthread_join(pth[i], NULL);
  }

  // SPEスレッドの破棄
  for (int i = 0; i < NUM_OF_SPE_SPSA; i++)
  {
    spha_delete_spe_thread(session, spe_th[i]);
  }

  // メモリオブジェクトをアンマップ
  spha_unmap_memory(session, buffer_ea_elf, 0);
  spha_unmap_memory(session, buffer_ea, 0);

  // SpursEngine側ローカルメモリ領域を解放
  spha_delete_object(session, buffer_obj_elf);
  spha_delete_object(session, buffer_obj);

  // セッションの通信路を切断
  spha_close_session(session);

  // セッションの破棄
  spha_delete_session(session);

  // ホスト側バッファを解放
  free(buffer_elf);
  free(buffer_data);
}


void mandel_club::release_club()
{
  // ---------- club解放  ----------
  // i_systemインターフェースを解放
  system_if->Release();

#ifdef DEBUG
  // ログの取得
  check_rc(system_obj->QueryInterface(spurs::IID_EXCEPTION_LOG, (void **)&system_log_if));
  check_rc(filter_obj->QueryInterface(spurs::IID_EXCEPTION_LOG, (void **)&filter_log_if));
  check_rc(codec_obj->QueryInterface(spurs::IID_EXCEPTION_LOG, (void **)&codec_log_if));
  spurs::rc_t log_rcs[128];
  spurs::time_t log_times[128];
  const tchar_t * log_msgs[128];
  uint32_t log_num;
  log_num = 128;
  system_log_if->get_log(log_rcs, log_times, log_msgs, &log_num);
  for (unsigned int i = 0; i < log_num; i++)
  {
    wprintf(L"system: %s\n", log_msgs[i]);
  }
  log_num = 128;
  filter_log_if->get_log(log_rcs, log_times, log_msgs, &log_num);
  for (unsigned int i = 0; i < log_num; i++)
  {
    wprintf(L"filter: %s\n", log_msgs[i]);
  }
  log_num = 128;
  codec_log_if->get_log(log_rcs, log_times, log_msgs, &log_num);
  for (unsigned int i = 0; i < log_num; i++)
  {
    wprintf(L"codec: %s\n", log_msgs[i]);
  }
  system_log_if->Release();
  filter_log_if->Release();
  codec_log_if->Release();
#endif // DEBUG

  // codecクラスインスタンスを解放
  codec_obj->Release();

  // popperクラスインスタンスを解放
  popper_obj->Release();

  // pusherクラスインスタンスを解放
  pusher_obj->Release();
  pusher_fd_obj->Release();

  // systemクラスインスタンスを解放
  system_obj->Release();
}


void mandel_club::run()
{
  // SPHA/SPSAの初期化
  init_spha();

  // clubの初期化
  init_club();

  // ストリーム処理開始
  check_rc(system_if->run());

  // pusher用インターフェースを取得
  check_rc(system_obj->QueryInterface(spurs::IID_SYSTEM, (void **)&pusher_thread_arg.system_if));
  check_rc(pusher_obj->QueryInterface(spurs::IID_PUSHER, (void **)&pusher_thread_arg.pusher_if));
  check_rc(pusher_fd_obj->QueryInterface(spurs::IID_PUSHER, (void **)&pusher_thread_arg.pusher_fd_if));
  pusher_thread_arg.err = 0;
  pusher_thread_arg.buffer_ea = buffer_ea;

  // pusher thread実行
  pthread_create(&pusher_th, NULL, (void* (*)(void*))pusher_thread, &pusher_thread_arg);

  // popper用インターフェースを取得
  check_rc(system_obj->QueryInterface(spurs::IID_SYSTEM, (void **)&popper_thread_arg.system_if));
  check_rc(popper_obj->QueryInterface(spurs::IID_POPPER, (void **)&popper_thread_arg.popper_if));
  popper_thread_arg.err = 0;

  // popper thread実行
  pthread_create(&popper_th, NULL, (void* (*)(void*))popper_thread, &popper_thread_arg);

  // コーデックカーネル終了を待つ
  check_rc(system_if->wait_spe_thread());

  if (err || pusher_thread_arg.err || popper_thread_arg.err)
  {
    // エラー時はスレッドを強制終了
    pthread_cancel(pusher_th);
    pthread_cancel(popper_th);
  }
  else
  {
    // スレッドの終了を待つ
    pthread_join(pusher_th, NULL);
    pthread_join(popper_th, NULL);
  }

  // pusher用インターフェースを解放
  pusher_thread_arg.system_if->Release();
  pusher_thread_arg.pusher_if->Release();
  pusher_thread_arg.pusher_fd_if->Release();

  // popper用インターフェースを解放
  popper_thread_arg.popper_if->Release();
}


int main(int argc, char ** argv)
{
  mandel_club * t = new mandel_club();

  long long time_s = mcl_get_tick_count();

  t->run();

  printf("Done. %f sec\n", (float)(mcl_get_tick_count() - time_s) / 1000000);

  delete t;

  return 0;
}

プログラム解説

clubライブラリは複数のSPEで並列実行することができるようになっていますが、後述の不具合のため、clubライブラリ側のSUFを担当するSPEは1基のみとし、他の3基はSPSAライブラリから起動するようにしています。SUFのSPEはホストとの通信と他3基のSPEの制御を行い、また自分自身も演算に加わっています。

SUFのSPEと他の3基のSPEとの同期はMutex等は用いずに受け渡しの変数の領域を独立させて一意のジョブIDの更新を確認するという方法で行っています。ジョブIDの確認は若干ウェイトを入れてポーリングで行っています。

プログラミング上の注意点など

●clubライブラリで複数のSPEを使用した場合の不具合について

clubライブラリでSUFに複数のSPEを使用した時、不規則にストールしてしまう不具合が発生しました。
この不具合はSDKの公式対応環境であるCentOS 5.2上でclubサンプルプログラムを若干改造して複数のSPEで動かすようにしたものを実行した場合でも同様に発生したのでclubライブラリ自体の問題かも知れません。
単体のSPEでSUFを実行した場合は問題が出ないので、clubライブラリで複数のSPEを使用したい場合は、このプログラムのようにSPSAライブラリで制御したSPEと連携させるといいでしょう。

●SUFでFIFOブロック数を2以上にした場合の不具合

入出力のFIFOブロックを複数持たせればパイプライン的に効率よくパケットを処理できるはずなのですが、FIFOブロック数を2以上にした場合、不具合が発生します。
色々テストして調べてみたところ、ドキュメント(SRKj_APLSW10_FilterPlatform_Users_Guide_Rev0-2.pdf)の仕様では複数のFIFOブロックを使用する設定の場合、パケットのないFIFOブロックは空き(packet_t構造体のeaに0がセットされ、それで空きであることを判定する)となるだけでインデックスは存在しているはずですが、実装では空きブロックのインデックスは前に詰められる形でfmain()のinput[]引数に入っているようです。このため、空きブロックが発生した場合に各pinに届いたパケットのインデックスを正しく取得できず、事実上FIFOブロックは1つの設定でしか使用できません。

●SPSAライブラリとclubライブラリを連動させる時の注意点

SPSAライブラリとclubライブラリを連動させる場合、まずSPSAライブラリを初期化してセッションを作成し、そのセッションIDをclubライブラリの初期化時(create_system)に指定する必要があります。こうしないとメモリ領域の共有を行うことができません。(別領域にマップされるので同じ実効アドレスを指定しても異なるデータが見えてしまいます)