C#(.NET)で密行列積を高速化する

はじめに

C#で書かれたプログラムはC/C++のそれと比べたら演算速度で劣ります。とはいってもPythonC/C++間の差ほど開いてはいません。上手に書けばC#でもそれなりに高速なプログラムが書けるはずだと考えています。今回は密行列積を題材にC#(.NET)で出来る限り高速化をしようと思います。なお、私はHPCをしっかりと勉強したことが無い素人なのでここで紹介するコードもまだまだ高速化の余地があると思います。

行列積の難しさ

行列積は物理シミュレーション、人工知能、ゲーム、数値計算などの様々な分野で必要となる極めて基本的な計算です。しかしながら行列積は非常に計算コストがかかります。定義通りに普通に実装してしまうとコンピュータの理論性能の1%も出ません。そこで普通はHPCの専門家たちが書いた線形代数ライブラリを用います。有名どころではintel MKL、OpenBLAS、ATLASです。intel MKLはその名の通りintelのプロセッサ専用です。OpenBLASとATLASは汎用的なライブラリで、大体のプロセッサで使えます。速度としてはintel MKL > OpenBLAS > ATLASの順になります。但しintel MKLとOpenBLAS間の速度差はそこまで開いてはないうえに、ほとんどのプロセッサで高速に動作することから、OpenBLASの方が広く使われている印象です。numpyでもOpenBLASが内部で使われています。今回はこのようなライブラリに頼らずに高速化していきます。

FLOPS

FLoating-point Operations Per Secondの略称です。これは1秒間に何回の浮動小数点演算ができるのかを表しており、コンピュータの演算性能の指標に用いられます。
試しに行列積の計算速度からコンピュータの実効性能を計測してみます。行列Aと行列Bの行列積は以下のように定義されます。

 A = (a_{ij}) ・・・ 列数がNの行列
 B = (b_{ij}) ・・・ 行数がNの行列
 C = (c_{ij}) ・・・ 行列積(Aの行数 x Bの列数)


 c_{ij} = \sum_{k = 1}^{N}{a_{ik}b_{kj}}

これではわかりにくいと思うので、この定義をC#風の疑似コードに起こしてみます。

var a = new float[A_ROW_NUM, A_COL_NUM];
var b = new float[B_ROW_NUM, B_COL_NUM];
var c = new float[A_ROW_NUM, B_COL_NUM];

for (var i = 0; i < A_ROW_NUM; i++)
                for (var j = 0; j < B_COL_NUM; j++)
                    for (var k = 0; k < B_ROW_NUM; k++)
                        c[i, j] += a[i, k] * b[k, j];

最も内側のループに注目すると、 「c[i, j] +=」の部分で足し算を1回、「a[i, k] * b[k, j]」で掛け算を1回行っています。つまり最も内側のループ内では2回の浮動小数点演算が行われていることになります。そしてこの計算がA_ROW_NUM x B_COL_NUM x B_ROW_NUM 回実行されるので、行列積の計算1回で2 x A_ROW_NUM x B_COL_NUM x B_ROW_NUM 回の浮動小数点演算が実行されます。よって、行列積の計算1回にt秒要した場合、そのコンピュータの実効性能は


 FLOPS =\frac{2 \times A\_ROW\_NUM \times B\_COL\_NUM \times B\_ROW\_NUM}{t}

この値がコンピュータの理論性能に近づけば近づくほど良い行列積のプログラムと考えられます。では理論性能も求めてみます。ここではintel Haswell以降のCPUについて考えます。行列積の「c[i, j] += a[i, k] * b[k, j]」の部分はFMA(後述)という命令を用いて1サイクルで処理可能です。そしてAVXでは256bit幅のレジスタを用いるので、32bitのfloat型の場合は8つの数値を1サイクルで同時に処理できます。よって以下の計算式で計算可能です。


 FLOPS = (クロック周波数{Hz}) \times (物理コア数) \times 2(FMA演算回路の数) \times \frac{256}{32}

この記事ではFLOPSの値を10^9で割ったGFLOPSを用います。

実験方法

全ての要素が0.0 ~ 1.0の一様乱数で初期化された1024 x 1024の正方行列の行列積計算を100回行い、これに要した時間からGFLOPSを求める。

以下が実験に用いるコンピュータの性能です。


(表1) 実験に用いるコンピュータの性能

CPU intel(R) Core(TM) i7-10510U 4コア 8スレッド
ベースクロック : 1.80GHz TB時 :最大4.90GHz
L1キャッシュ : 64K(1コアあたり)
L2キャッシュ : 256K(1コアあたり)
メモリ(RAM) 16GB

Core(TM) i7-10510Uについてネットで調べたのですが、FMA演算回路の数がどうしても分かりませんでした。 仮に1個だとしたらベースクロックにおける理論性能は115.2GFLOPS、TB時には313.6GFLOPSとなります。ですがあくまでも仮定なので参考程度にしてください。
もちろんコンパイラの最適化を有効にして計測します。

実装したコードの説明

実装したコードはここにあります。このプロジェクトには以下の2つのコードファイルが含まれています。

Matrix.cs ・・・ Matrixクラスと行列積の実装が記述されている
Program.cs ・・・ メインメソッドとプログラムの性能測定に関するコードが記述されている

Matrixクラスについて説明します。Matrixクラスは行列を表すクラスで、内部にDATAという名前のfloat型の1次元配列を保有しています。この配列には行列の各要素が列優先方式で配置されています。以下がMatrixクラスの疑似コードです。

    public class Matrix     
    {
        public int RowNum { get; }
        public int ColumnNum { get; }

        readonly float[] DATA;

        public float this[int i, int j]
        {
            get { return this.DATA[i + j * this.RowNum]; }
            set { this.DATA[i + j * this.RowNum] = value; }
        }

        public Matrix(int rowNum, int colNum)
        {
            this.RowNum = rowNum;
            this.ColumnNum = colNum;
            this.DATA = new float[this.RowNum * this.ColumnNum];
        }

        public Matrix(int rowNum, int colNum, params float[] elements)
        {
            this.RowNum = rowNum;
            this.ColumnNum = colNum;
            this.DATA = Transpose(elements, this.ColumnNum, this.RowNum);
        }

        public void Init(float num = 0.0f)
        {
            for (var j = 0; j < this.ColumnNum; j++)
                for (var i = 0; i < this.RowNum; i++)
                    this[i, j] = num;
        }

        public override string ToString()
        {
             // 文字列に変換する
        }

        public override bool Equals(object obj)
        {
             // 行列同士の比較を行う
        }
    }

行列の各要素にはインデクサー経由でアクセスします。実際は直接1次元配列にアクセスしたほうが速いですが、今回は見やすさのためにこのようにしました。
行列積の実装はこのMatrixクラス内の静的メンバ関数として実装されています。

単純実装

        public static void Mult_0(Matrix left, Matrix right, Matrix product)    
        {
            for (var i = 0; i < left.RowNum; i++)
                for (var j = 0; j < right.ColumnNum; j++)
                    for (var k = 0; k < right.RowNum; k++)
                        product[i, j] += left[i, k] * right[k, j];
        }

これは行列積の定義を元に素直にコードに起こした実装です。
このコードを実際に実行すると以下の結果となりました。


(表2) 単純実装の性能

1回の行列積の平均実行時間 2654.66 ms
性能 0.81 GFLOPS

更にSystem.Threading.Tasks.Parallel.Forメソッドを用いて並列化してみます。

        public static void ParallelMult_0(Matrix left, Matrix right, Matrix product)
        {
            Parallel.For(0, left.RowNum, (i) =>
            {
                for (var j = 0; j < right.ColumnNum; j++)
                    for (var k = 0; k < right.RowNum; k++)
                        product[i, j] += left[i, k] * right[k, j];
            });
        }

結果は以下の通りです。


(表3) 単純実装の性能(並列化)

1回の行列積の平均実行時間 1563.18 ms
性能 1.37 GFLOPS

多少はマシになりましたがだいぶ遅いです。遅い原因を考えていきましょう。まずはkのループに注目します。先ほど説明した通り、Matrixクラスでは行列の各要素を列優先方式で1次元配列に格納しています。以下のMatrixクラスのインデクサーを見てください。

        public float this[int i, int j]
        {
            get { return this.DATA[i + j * this.RowNum]; }
            set { this.DATA[i + j * this.RowNum] = value; }
        }

このコードからleft[i, k]の部分が連続アクセスではなく、left.RowNumの分だけ飛びながらアクセスしていることが分かります。これが行列積計算を遅くしている原因の一つであると考えられます。また、product[i, j]の部分に注目します。product[i, j]のアクセスにはkは関与していないため、kのループが回っている間はproductのアクセス先は変化しません。故にk = 0の処理が終わるまでk = 1の処理に進むことができないのです。これをループの前後に依存関係があるといいます。コンパイラは依存関係が無い処理をパイプライン処理に置き換えて並列に実行してくれるのでこのような依存関係はコンパイラの最適化を妨げます。これら2つの問題点を次のループ交換法で解決します。

ループ交換法

ループ交換法はその名の通りループの順番を交換する手法です。先ほどの行列積の実装において、i、j、kそれぞれのループの順番は行列積の結果に影響を与えません。故にこれらの順番を入れ替えることで、行列に連続でアクセスできるようになり、さらに依存関係も無くすことができます。以下のようにループを交換します。

        public static void Mult_1(Matrix left, Matrix right, Matrix product)    
        {
            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];
        }

先ほどのコードではi、j、kの順番でしたが、このコードではj、k、iの順番になっています。このようにすることでproduct、left、rightのそれぞれについて、出来る限り連続でアクセスできるようになっています。また最も内側にiのループがあるので、パイプライン処理によってi = 0の処理が終わるのを待たずして、i = 1の処理に進めます。以下が結果です。


(表4) ループ交換法の性能

1回の行列積の平均実行時間 1523.18 ms
性能 1.41 GFLOPS


なんと逐次実行で先ほどの単純実装を並列化したものに匹敵しています。
ではこの実装も並列化してみます。

            Parallel.For(0, right.ColumnNum, (j) =>
            {
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];
            });

結果は以下の通りです。


(表5) ループ交換法の性能(並列化)

1回の行列積の平均実行時間 795.24 ms
性能 2.7 GFLOPS

こんな単純な変更でも大きくパフォーマンスに影響します。

ループアンローリング

次にループアンローリングを行います。ループアンローリングとはその名の通りループを展開する手法です。以下のコードを見てください。

        public static void Mult_2(Matrix left, Matrix right, Matrix product)    
        {
            const int UNROLL_STEP_NUM = 4;

            var j_rest = right.ColumnNum % UNROLL_STEP_NUM;
            var k_rest = right.RowNum % UNROLL_STEP_NUM;

            for (var j = 0; j < right.ColumnNum - j_rest; j += UNROLL_STEP_NUM)
            {
                var j_1 = j + 1;
                var j_2 = j + 2;
                var j_3 = j + 3;
                for (var k = 0; k < right.RowNum - k_rest; k += UNROLL_STEP_NUM)
                {
                    var k_1 = k + 1;
                    var k_2 = k + 2;
                    var k_3 = k + 3;
                    var right_00 = right[k, j]; 
                    var right_01 = right[k_1, j]; 
                    var right_02 = right[k_2, j]; 
                    var right_03 = right[k_3, j];
                    var right_10 = right[k, j_1]; 
                    var right_11 = right[k_1, j_1];
                    var right_12 = right[k_2, j_1]; 
                    var right_13 = right[k_3, j_1];
                    var right_20 = right[k, j_2]; 
                    var right_21 = right[k_1, j_2]; 
                    var right_22 = right[k_2, j_2]; 
                    var right_23 = right[k_3, j_2];
                    var right_30 = right[k, j_3]; 
                    var right_31 = right[k_1, j_3]; 
                    var right_32 = right[k_2, j_3];
                    var right_33 = right[k_3, j_3];

                    for (var i = 0; i < left.RowNum; i++)
                    {
                        var left_0 = left[i, k];
                        var left_1 = left[i, k + 1];
                        var left_2 = left[i, k + 2];
                        var left_3 = left[i, k + 3];

                        product[i, j] += left_0 * right_00 
                                              + left_1 * right_01 
                                              + left_2 * right_02 
                                              + left_3 * right_03;

                        product[i, j_1] += left_0 * right_10 
                                                 + left_1 * right_11 
                                                 + left_2 * right_12 
                                                 + left_3 * right_13;

                        product[i, j_2] += left_0 * right_20 
                                                 + left_1 * right_21 
                                                 + left_2 * right_22 
                                                 + left_3 * right_23;

                        product[i, j_3] += left_0 * right_30 
                                                 + left_1 * right_31 
                                                 + left_2 * right_32 
                                                 + left_3 * right_33;
                    }
                }
            }

            // 端数処理
            for (var j = right.ColumnNum - j_rest; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product.DATA[i + j * product.RowNum] += left.DATA[i + k * left.RowNum] * right[k, j];

            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = right.RowNum - k_rest; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product.DATA[i + j * product.RowNum] += left.DATA[i + k * left.RowNum] * right[k, j];
        }

この実装ではj, k, iそれぞれのループ回数が先ほどの2つの実装の\frac{1}{UNROLL\_STEP\_NUM}となっています。そして一番内側のループではUNROLL_STEP_NUM個の式が並んでいます。これはUNROLL_STEP_NUM回のループを展開した形となります。このようにする利点は主に以下の2つがあります。

1つ目 : ループの条件分岐が少なくなる(具体的には条件分岐が\frac{1}{UNROLL\_STEP\_NUM}にまで削減される)

2つ目 : レジスタのデータを再利用できる

2つ目の利点について説明します。CPUが演算を行うとき、演算対象の値はレジスタに格納されます。上のコードの最も内側のループ内には4つの式がありますが、そのすべてについて left[i, k], left[i, k_1], left[i, k_2], left[i, k_3]が含まれています。故に4つの式を計算する際にレジスタの値を再利用できるのです。またこのコードではrightの各要素も一時変数に格納し再利用をするようにしています。
このコードを実際に実行した結果は以下の通りです。UNROLL_STEP_NUMは4としました。


(表6) 4段ループアンローリングの性能

1回の行列積の平均実行時間 559.25 ms
性能 3.84 GFLOPS

更に並列化を行ったコードとその結果です。

        public static void ParallelMult_2(Matrix left, Matrix right, Matrix product)    
        {
            const int UNROLL_STEP_NUM = 4;

            var j_blocks = right.ColumnNum / STEP_SIZE;
            var j_rest = right.ColumnNum % UNROLL_STEP_NUM;
            var k_rest = right.RowNum % UNROLL_STEP_NUM;

            Parallel.For(0, j_blocks, (block) =>
            {
                var j = block * STEP_SIZE;
                var j_1 = j + 1;
                var j_2 = j + 2;
                var j_3 = j + 3;
                for (var k = 0; k < right.RowNum - k_rest; k += UNROLL_STEP_NUM)
                {
                    var k_1 = k + 1;
                    var k_2 = k + 2;
                    var k_3 = k + 3;
                    var right_00 = right[k, j]; 
                    var right_01 = right[k_1, j]; 
                    var right_02 = right[k_2, j]; 
                    var right_03 = right[k_3, j];
                    var right_10 = right[k, j_1]; 
                    var right_11 = right[k_1, j_1];
                    var right_12 = right[k_2, j_1]; 
                    var right_13 = right[k_3, j_1];
                    var right_20 = right[k, j_2]; 
                    var right_21 = right[k_1, j_2]; 
                    var right_22 = right[k_2, j_2]; 
                    var right_23 = right[k_3, j_2];
                    var right_30 = right[k, j_3]; 
                    var right_31 = right[k_1, j_3]; 
                    var right_32 = right[k_2, j_3];
                    var right_33 = right[k_3, j_3];

                    for (var i = 0; i < left.RowNum; i++)
                    {
                        var left_0 = left[i, k];
                        var left_1 = left[i, k + 1];
                        var left_2 = left[i, k + 2];
                        var left_3 = left[i, k + 3];

                        product[i, j] += left_0 * right_00 
                                              + left_1 * right_01 
                                              + left_2 * right_02 
                                              + left_3 * right_03;

                        product[i, j_1] += left_0 * right_10 
                                                 + left_1 * right_11 
                                                 + left_2 * right_12 
                                                 + left_3 * right_13;

                        product[i, j_2] += left_0 * right_20 
                                                 + left_1 * right_21 
                                                 + left_2 * right_22 
                                                 + left_3 * right_23;

                        product[i, j_3] += left_0 * right_30 
                                                 + left_1 * right_31 
                                                 + left_2 * right_32 
                                                 + left_3 * right_33;
                    }
                }
            });

            // 端数処理
            for (var j = right.ColumnNum - j_rest; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product.DATA[i + j * product.RowNum] += left.DATA[i + k * left.RowNum] * right[k, j];

            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = right.RowNum - k_rest; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product.DATA[i + j * product.RowNum] += left.DATA[i + k * left.RowNum] * right[k, j];
        }


(表7) 4段ループアンローリングの性能(並列化)

1回の行列積の平均実行時間 199.89 ms
性能 10.74 GFLOPS

単純実装を並列化したもの(表3)と比べると、10倍近く高速化しました。これでも理論性能には程遠いですが、レジスタを効率よく利用できるようになってきました。

タイリング(キャッシュブロッキング)

さらなる高速化を目指してタイリングという手法を用います。タイリングとは演算対象の行列をいくつかの部分行列に分けて計算する手法です。以下のコードを見てください。

        public static void Mult_3(Matrix left, Matrix right, Matrix product)    
        {    
                const int BLOCK_SIZE = 64;       
                const int UNROLL_STEP_NUM = 4;

                var j_rest = right.ColumnNum % BLOCK_SIZE;
                var k_rest = right.RowNum % BLOCK_SIZE;
                var i_rest = left.RowNum % BLOCK_SIZE;
                for (var jj = 0; jj < right.ColumnNum - j_rest; jj += BLOCK_SIZE)
                    for (var kk = 0; kk < right.RowNum - k_rest; kk += BLOCK_SIZE)
                        for (var ii = 0; ii < left.RowNum - i_rest; ii += BLOCK_SIZE)
                        {
                            for (var dj = 0; dj < BLOCK_SIZE; dj += UNROLL_STEP_NUM)
                            {
                                var j = jj + dj;
                                var j_1 = j + 1;
                                var j_2 = j + 2;
                                var j_3 = j + 3;
                                for (var dk = 0; dk < BLOCK_SIZE; dk += UNROLL_STEP_NUM)
                                {
                                    var k = kk + dk;
                                    var k_1 = k + 1;
                                    var k_2 = k + 2;
                                    var k_3 = k + 3;
                                    var right_00 = right[k, j]; 
                                    var right_01 = right[k_1, j]; 
                                    var right_02 = right[k_2, j]; 
                                    var right_03 = right[k_3, j];
                                    var right_10 = right[k, j_1]; 
                                    var right_11 = right[k_1, j_1];
                                    var right_12 = right[k_2, j_1]; 
                                    var right_13 = right[k_3, j_1];
                                    var right_20 = right[k, j_2]; 
                                    var right_21 = right[k_1, j_2]; 
                                    var right_22 = right[k_2, j_2]; 
                                    var right_23 = right[k_3, j_2];
                                    var right_30 = right[k, j_3]; 
                                    var right_31 = right[k_1, j_3]; 
                                    var right_32 = right[k_2, j_3];
                                    var right_33 = right[k_3, j_3];
                                    for (var di = 0; di < BLOCK_SIZE; di++)
                                    {
                                        var i = ii + di;
                                        var left_0 = left[i, k];
                                        var left_1 = left[i, k + 1];
                                        var left_2 = left[i, k + 2];
                                        var left_3 = left[i, k + 3];

                                        product[i, j] += left_0 * right_00 
                                                            + left_1 * right_01 
                                                            + left_2 * right_02 
                                                            + left_3 * right_03;

                                        product[i, j_1] += left_0 * right_10 
                                                                + left_1 * right_11 
                                                                + left_2 * right_12 
                                                                + left_3 * right_13;

                                        product[i, j_2] += left_0 * right_20 
                                                                + left_1 * right_21 
                                                                + left_2 * right_22 
                                                                + left_3 * right_23;

                                        product[i, j_3] += left_0 * right_30 
                                                                + left_1 * right_31 
                                                                + left_2 * right_32 
                                                                + left_3 * right_33;
                                    }
                                }
                            }
                        }

                // 端数処理
                for (var j = right.ColumnNum - j_rest; j < right.ColumnNum; j++)
                    for (var k = 0; k < right.RowNum; k++)
                        for (var i = 0; i < left.RowNum; i++)
                            product[i, j] += left[i, k] * right[k, j];

                for (var j = 0; j < right.ColumnNum; j++)
                    for (var k = right.RowNum - k_rest; k < right.RowNum; k++)
                        for (var i = 0; i < left.RowNum; i++)
                            product[i, j] += left[i, k] * right[k, j];

                for (var j = 0; j < right.ColumnNum; j++)
                    for (var k = 0; k < right.RowNum; k++)
                        for (var i = left.RowNum - i_rest; i < left.RowNum; i++)
                            product[i, j] += left[i, k] * right[k, j];
        }


このコードでは行列のタイリングと4段のループアンローリングを行っています。BLOCK_SIZEは対象の行列をBLOCK_SIZE x BLOCK_SIZEの部分行列に分割することを表しています。このようにすると嬉しい理由はメモリの階層構造にあります。以下の図を見てください。

f:id:KALMIA:20210301193814j:plain
(図2)メモリの階層構造

この図はメモリの階層構造を表しており、上の階層であればある程、高速で容量が少ないです。上から順に説明します。

レジスタ ・・・ 数B単位の小領域。 演算対象のデータが格納される。

キャッシュメモリ ・・・L1キャッシュ、L2キャッシュ、L3キャッシュの三階層に分かれており、L1キャッシュが最もアクセス速度が速く、L3キャッシュが最もアクセス速度が遅い。
CPUにもよるが、一般的にはL1キャッシュとL2キャッシュは各コアごとに独立に存在し、L3キャッシュは全コアで共有。
容量はL1キャッシュが各コアごとに数十KiB単位、L2キャッシュが各コアごとに数百KiB単位、L3キャッシュは全コア共有で数MiB単位。

メインメモリ ・・・ アクセス速度は非常に遅いが大容量。数GiB~数TiBの容量がある。

A x B = Cの行列積計算において、Bのデータは何度も再利用することになります。しかし、行列のサイズが大きいとキャッシュメモリに乗り切らないので、何度もメインメモリやL3キャッシュからデータを持ってくる必要があります。これが大きなロスとなります。一度キャッシュに乗せたデータはキャッシュに乗っている間に再利用しておきたいのです。故にタイリングではL2キャッシュに乗り切るサイズに行列を分割して行列積を計算します。キャッシュの利用量は \frac{BLOCK\_SIZE \times BLOCK\_SIZE \times sizeof(float) \times 3}{1024}KiBと概算できますが、あくまでも概算なので色々とBLOCK_SIZEの値を変えながら良さそうな値を探します。今回はBLOCK_SIZEを64としました。性能は以下の通りです。


(表8) 64 x 64タイリング + 4段ループアンローリングの性能

1回の行列積の平均実行時間 637.80 ms
性能 3.37 GFLOPS

4段ループアンローリング単体の性能と大差ないです。並列化してみると結果は以下のようになりました。


(表9) 64 x 64タイリング + 4段ループアンローリングの性能(並列化)

1回の行列積の平均実行時間 194.96 ms
性能 11.01 GFLOPS

こちらも性能は4段ループアンローリング単体とほぼ同じです。あまり高速化した感じがありませんが、これ以上ブロックサイズを増やしたり、逆に減らしたりすると性能が落ちるのでこれが限界でした。

組み込み関数を用いた高速化

FLOPSの説明の際に行列積の「product[i, j] += left[i, k] * right[k, j]」の部分はFMAという命令で1サイクルで可能だと述べました。FMAとはFused Multiply Addの略称で日本語では融合積和演算といいます。積和演算とは以下の式で表される演算です。


 a \times b + c

「product[i, j] += left[i, k] * right[k, j]」は書き換えると「product[i, j] = left[i, k] * right[k, j] + product[i, j]」となり積和演算を行っていることが分かります。これを1命令で行うので融合積和演算と呼ぶのです。1命令で行える利点は高速に処理できることはもちろん、値の精度が落ちにくいことも挙げられます。積和演算を普通にソフトウェア実装した場合、 a \times bを処理した後にcを加えます。その際、 a \times bの結果を一時的にレジスタに保存するので丸め誤差が発生します。しかし、1サイクルでまとめて計算すれば、 a \times bの結果を一時的に保存する必要が無いので丸め誤差の影響を小さくできます。intel CPUではAVX2(Haswell以降)と同時にFMA演算回路*1が搭載されました。ちなみにAVX2と同時に搭載されたこともあって、FMAはAVX2の仕様の中に入っていると勘違いされることもあるようですが両者は別々の仕様です。
さて、このFMAですが実行時に使われているかどうかはアセンブリを見れば分かります。先ほどの64 x 64タイリング + 4段ループアンローリングのコードをVisual Studioで逆アセンブルしてみましょう。その結果が以下のスクリーンショットです。

f:id:KALMIA:20210304222721p:plain
(図2) 実際に積和計算を行っている部分の逆アセンブル結果

xmmとはレジスタの名前で大きさは128bitです。C#のfloat型は32bitなのでxmmに格納されている値は4次元の実数ベクトルとなります。vmulssはベクトルの各要素を掛け合わせる命令で、vaddssはベクトルの各要素を加算する命令です。つまり、FMAを用いずに積和演算を積と和に分割して計算しています。コンパイラの最適化では必ずしもプログラマーが使って欲しい命令を使ってくれるとは限りません。ですのでプログラマーが明示的にどの命令を用いるのかをコードに記述する必要があります。それを実現するのが組み込み関数です。FMA命令を用いたいのであれば、_mm_fmadd_pdもしくは_mm256_fmadd_pdという関数を用います。両者は128bitのxmmレジスタを用いるのか256bitのymmレジスタを用いるかの違いです。元々C#ではサードパーティー製のライブラリを用いなければ組み込み関数を使うことはできませんでしたが、.NET Core 3.0から公式でサポートされました。System.Runtime.Intrinsics.X86名前空間Fma.MultiplyAddメソッドが_mm_fmadd_pd、_mm256_fmadd_pd関数に相当します。またSystem.Runtime.Intrinsics名前空間にはVector128とVector256という構造体があり、前者は128bitのベクトル、後者は256bitのベクトルを表します。float型を用いる場合は、Vector128が4次元実数ベクトル、Vector256が8次元実数ベクトルとなります。Vector256を用いると256bitのymmレジスタを用いて計算してくれるので、今回はそちらを用います。以下がFMAを用いたコードです。

        public static unsafe void Mult_4(Matrix left, Matrix right, Matrix product)   
        {
            const int BLOCK_SIZE = 64;
            const int AVX_FLOAT_VEC_DIM = 8;

            var j_rest = right.ColumnNum % BLOCK_SIZE;
            var k_rest = right.RowNum % BLOCK_SIZE;
            var i_rest = left.RowNum % BLOCK_SIZE;

            fixed (float* leftPtr = left.DATA)
            fixed (float* productPtr = product.DATA)
            {
                for (var jj = 0; jj < right.ColumnNum - j_rest; jj += BLOCK_SIZE)
                    for (var kk = 0; kk < right.RowNum - k_rest; kk += BLOCK_SIZE)
                        for (var ii = 0; ii < left.RowNum - i_rest; ii += BLOCK_SIZE)
                        {
                            for (var dj = 0; dj < BLOCK_SIZE; dj++)
                            {
                                var j = jj + dj;
                                for (var dk = 0; dk < BLOCK_SIZE; dk++)
                                {
                                    var k = kk + dk;
                                    var rightReg = Vector256.Create(right[k, j]);
                                    float* p0 = &leftPtr[ii + k * left.RowNum];
                                    float* p1 = &productPtr[ii + j * product.RowNum];
                                    var leftReg_0 = Avx.LoadVector256(&p0[0]);
                                    var productReg_0 = Avx.LoadVector256(&p1[0]);
                                    productReg_0 = Fma.MultiplyAdd(leftReg_0, rightReg, productReg_0);

                                    var leftReg_1 = Avx.LoadVector256(&p0[AVX_FLOAT_VEC_DIM]);
                                    var productReg_1 = Avx.LoadVector256(&p1[AVX_FLOAT_VEC_DIM]);
                                    productReg_1 = Fma.MultiplyAdd(leftReg_1, rightReg, productReg_1);

                                    var leftReg_2 = Avx.LoadVector256(&p0[2 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_2 = Avx.LoadVector256(&p1[2 * AVX_FLOAT_VEC_DIM]);
                                    productReg_2 = Fma.MultiplyAdd(leftReg_2, rightReg, productReg_2);

                                    var leftReg_3 = Avx.LoadVector256(&p0[3 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_3 = Avx.LoadVector256(&p1[3 * AVX_FLOAT_VEC_DIM]);
                                    productReg_3 = Fma.MultiplyAdd(leftReg_3, rightReg, productReg_3);

                                    var leftReg_4 = Avx.LoadVector256(&p0[4 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_4 = Avx.LoadVector256(&p1[4 * AVX_FLOAT_VEC_DIM]);
                                    productReg_4 = Fma.MultiplyAdd(leftReg_4, rightReg, productReg_4);

                                    var leftReg_5 = Avx.LoadVector256(&p0[5 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_5 = Avx.LoadVector256(&p1[5 * AVX_FLOAT_VEC_DIM]);
                                    productReg_5 = Fma.MultiplyAdd(leftReg_5, rightReg, productReg_5);

                                    var leftReg_6 = Avx.LoadVector256(&p0[6 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_6 = Avx.LoadVector256(&p1[6 * AVX_FLOAT_VEC_DIM]);
                                    productReg_6 = Fma.MultiplyAdd(leftReg_6, rightReg, productReg_6);

                                    var leftReg_7 = Avx.LoadVector256(&p0[7 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_7 = Avx.LoadVector256(&p1[7 * AVX_FLOAT_VEC_DIM]);
                                    productReg_7 = Fma.MultiplyAdd(leftReg_7, rightReg, productReg_7);

                                    Avx.Store(&p1[0], productReg_0);
                                    Avx.Store(&p1[AVX_FLOAT_VEC_DIM], productReg_1);
                                    Avx.Store(&p1[2 * AVX_FLOAT_VEC_DIM], productReg_2);
                                    Avx.Store(&p1[3 * AVX_FLOAT_VEC_DIM], productReg_3);
                                    Avx.Store(&p1[4 * AVX_FLOAT_VEC_DIM], productReg_4);
                                    Avx.Store(&p1[5 * AVX_FLOAT_VEC_DIM], productReg_5);
                                    Avx.Store(&p1[6 * AVX_FLOAT_VEC_DIM], productReg_6);
                                    Avx.Store(&p1[7 * AVX_FLOAT_VEC_DIM], productReg_7);
                                }
                            }
                        }
            }

            // 端数処理
            for (var j = right.ColumnNum - j_rest; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];

            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = right.RowNum - k_rest; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];

            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = left.RowNum - i_rest; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];
        }

Avx.LoadメソッドとAvx.Storeメソッドは引数にポインタしかとれないのでunsafeコードとなります。そして性能は以下のようになります。


(表10) 64 x 64タイリング + FMA

1回の行列積の平均実行時間 123.69 ms
性能 17.36 GFLOPS

大幅な性能向上につながりました。更に以下のように並列化してみます。

        public static unsafe void ParallelMult_4(Matrix left, Matrix right, Matrix product)
        {
            const int BLOCK_SIZE = 64;
            const int AVX_FLOAT_VEC_DIM = 8;

            var jBlockNum = right.ColumnNum / BLOCK_SIZE;
            var j_rest = right.ColumnNum % BLOCK_SIZE;
            var k_rest = right.RowNum % BLOCK_SIZE;
            var i_rest = left.RowNum % BLOCK_SIZE;

            Parallel.For(0, jBlockNum, PARALLEL_OPTIONS, (blockId) =>
            {
                fixed (float* leftPtr = left.DATA)
                fixed (float* productPtr = product.DATA)
                {
                    var jj = blockId * BLOCK_SIZE;
                    for (var kk = 0; kk < right.RowNum - k_rest; kk += BLOCK_SIZE)
                        for (var ii = 0; ii < left.RowNum - i_rest; ii += BLOCK_SIZE)
                            for (var dj = 0; dj < BLOCK_SIZE; dj++)
                            {
                                var j = jj + dj;
                                for (var dk = 0; dk < BLOCK_SIZE; dk++)
                                {
                                    var k = kk + dk;
                                    var rightReg = Vector256.Create(right[k, j]);
                                    float* p0 = &leftPtr[ii + k * left.RowNum];
                                    float* p1 = &productPtr[ii + j * product.RowNum];

                                    var leftReg_0 = Avx.LoadVector256(&p0[0]);
                                    var productReg_0 = Avx.LoadVector256(&p1[0]);
                                    var leftReg_1 = Avx.LoadVector256(&p0[AVX_FLOAT_VEC_DIM]);
                                    var productReg_1 = Avx.LoadVector256(&p1[AVX_FLOAT_VEC_DIM]);
                                    var leftReg_2 = Avx.LoadVector256(&p0[2 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_2 = Avx.LoadVector256(&p1[2 * AVX_FLOAT_VEC_DIM]);
                                    var leftReg_3 = Avx.LoadVector256(&p0[3 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_3 = Avx.LoadVector256(&p1[3 * AVX_FLOAT_VEC_DIM]);
                                    var leftReg_4 = Avx.LoadVector256(&p0[4 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_4 = Avx.LoadVector256(&p1[4 * AVX_FLOAT_VEC_DIM]);
                                    var leftReg_5 = Avx.LoadVector256(&p0[5 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_5 = Avx.LoadVector256(&p1[5 * AVX_FLOAT_VEC_DIM]);
                                    var leftReg_6 = Avx.LoadVector256(&p0[6 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_6 = Avx.LoadVector256(&p1[6 * AVX_FLOAT_VEC_DIM]);
                                    var leftReg_7 = Avx.LoadVector256(&p0[7 * AVX_FLOAT_VEC_DIM]);
                                    var productReg_7 = Avx.LoadVector256(&p1[7 * AVX_FLOAT_VEC_DIM]);

                                    productReg_0 = Fma.MultiplyAdd(leftReg_0, rightReg, productReg_0);
                                    productReg_1 = Fma.MultiplyAdd(leftReg_1, rightReg, productReg_1);
                                    productReg_2 = Fma.MultiplyAdd(leftReg_2, rightReg, productReg_2);
                                    productReg_3 = Fma.MultiplyAdd(leftReg_3, rightReg, productReg_3);
                                    productReg_4 = Fma.MultiplyAdd(leftReg_4, rightReg, productReg_4);
                                    productReg_5 = Fma.MultiplyAdd(leftReg_5, rightReg, productReg_5);
                                    productReg_6 = Fma.MultiplyAdd(leftReg_6, rightReg, productReg_6);
                                    productReg_7 = Fma.MultiplyAdd(leftReg_7, rightReg, productReg_7);

                                    Avx.Store(&p1[0], productReg_0);
                                    Avx.Store(&p1[AVX_FLOAT_VEC_DIM], productReg_1);
                                    Avx.Store(&p1[2 * AVX_FLOAT_VEC_DIM], productReg_2);
                                    Avx.Store(&p1[3 * AVX_FLOAT_VEC_DIM], productReg_3);
                                    Avx.Store(&p1[4 * AVX_FLOAT_VEC_DIM], productReg_4);
                                    Avx.Store(&p1[5 * AVX_FLOAT_VEC_DIM], productReg_5);
                                    Avx.Store(&p1[6 * AVX_FLOAT_VEC_DIM], productReg_6);
                                    Avx.Store(&p1[7 * AVX_FLOAT_VEC_DIM], productReg_7);
                                }
                            }
                }
            });
            // 端数処理
            for (var j = right.ColumnNum - j_rest; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];

            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = right.RowNum - k_rest; k < right.RowNum; k++)
                    for (var i = 0; i < left.RowNum; i++)
                        product[i, j] += left[i, k] * right[k, j];

            for (var j = 0; j < right.ColumnNum; j++)
                for (var k = 0; k < right.RowNum; k++)
                    for (var i = left.RowNum - i_rest; i < left.RowNum; i++)
        }

性能は以下の通りです。


(表11) 64 x 64タイリング + FMA (並列化)

1回の行列積の平均実行時間 37.98 ms
性能 56.54 GFLOPS

一番最初に示した単純実装の並列化(表3)と比べると50倍以上高速化しました。

おまけ(OpenBLAS)

OpenBLASを用いて高速化してみます。C#(.NET)を用いるという主旨から少し外れるのでおまけとしました。下のコードがOpenBLASを用いた行列積です。

        public static void Mult_OpenBLAS(Matrix left, Matrix right, Matrix product)
        {
            cblas_sgemm(BLASOrder.ColMajor, 
                        BLASTranspose.NoTrans, BLASTranspose.NoTrans, 
                        left.RowNum, right.ColumnNum, right.RowNum,
                        1.0f, left.DATA, left.RowNum, 
                        right.DATA, right.RowNum, 
                        0.0f, product.DATA, product.RowNum);
        }

        // OpenBLAS
        const string OPENBLAS_PATH = "OpenBLAS\\libopenblas.dll";

        public enum BLASOrder
        {
            RowMajor = 101,
            ColMajor = 102
        }

        public enum BLASTranspose
        {
            NoTrans = 111,
            Trans = 112,
            ConjTrans = 113,
            ConjNoTrans = 114
        }

        [DllImport(OPENBLAS_PATH)]
        static extern void cblas_sgemm(
               BLASOrder order,
               BLASTranspose transa, BLASTranspose transb,
               int m, int n, int k,
               float alpha, float[] a, int ldA,
               float[] b, int ldB,
               float beta, float[] c, int ldC);

cblas_sgemmの各引数の意味はググれば分かるので割愛します。性能は以下のようになりました。


(表12) OpenBLAS

1回の行列積の平均実行時間 24.65 ms
性能 87.12 GFLOPS

性能が段違いです。なお、OpenBLASはデフォルトで並列演算を行います。

まとめ

.NETに組み込み関数が実装されたおかげで、C#でも十分高速なコードが書けるようになってきました。もちろん、本当に速度を追求するならC/C++を用いたほうが速いですが、C#C++に比べて扱いやすく安全な言語です。unsafeコードを書くこともできますが、それを書かざるをえない状況は限定されます。故に開発のしやすさと速度を兼ね備えているという点ではC#は十分に有能な言語だと考えています。
近年ではGPUを映像処理以外の分野で用いるGPGPUが流行っていますが、.NETにはそれを行える機能は実装されておらず、C#GPGPUを行う場合はサードパーティ製のライブラリを用いるか、CUDAやOpenCLのラッパーを自分で書く必要があります。個人的には今後の.NETにGPGPU関連の機能が追加されることを期待しています。

*1:厳密には搭載されたのは3オペランドのFMA3です。その前には4オペランドのFMA4が存在していました。