Divide&Conquer над алгоритмом Штрассена

image

Привіт друзі! Будучи студентами одного відомого освітнього проекту, ми з bo_0m, після вступної лекції за курсом Поглиблене програмування на Java, отримали своє перше домашнє завдання. Необхідно було реалізувати програму, яка б перемножала матриці. І все б нічого, та так співпало, що на наступному тижні мала відбутися конференція Joker, і наш викладач вирішив скасувати з такої нагоди заняття, подарувавши нам кілька годин вільного п'ятничного вечора. Не пропадати ж часу дарма! Раз ніхто не квапить, то можна підійти до справи творчо.

Welcome, under the hood ↓

Перше, що приходить в голову
Напевно кожного студента технічного вузу доводилося множити матриці. Алгоритм був завжди один, а саме, простенький кубічний метод множення. Та й як би це не звучало, але даний спосіб не так-то вже й поганий (для розмірності матриць менше 100).

Всі ми з цього починали:

for (int i = 0; i < A. rows(); i++) {
for (int j = 0; j < A. columns(); j++) {
for (int k = 0; k < B. columns(); k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}

Забігаючи вперед, скажу, що ми будемо використовувати модифікований варіант із застосуванням транспонування. Про таку модифікацію добре написано тут, та й не тільки про неї.

Гаразд, поїхали далі!

Алгоритм Штрассена
Можливо, не всі знають, але автор алгоритму Фолькер Штрассен не тільки живий, але і активно викладає, так само будучи почесним професором кафедри математики і статистики Констанцський університету. Обов'язково почитайте про цю людину хоча б на вікі.
Трошки теорії з Вікіпедії:

Нехай A і B — дві (n*n)-матриці, причому n — степінь числа 2. Тоді можна розбити кожну матрицю A і B на чотири ((n/2)*(n/2))-матриці і через них висловити твір матриць A і B:
image
Визначимо нові елементи:
image
Таким чином, нам потрібно всього 7 множень на кожному етапі рекурсії. Елементи матриці C виражаються з Pk за формулами:
image
Рекурсивний процес продовжується n разів, до тих пір, поки розмір матриць Ci,j не стане досить малим, далі використовують звичайний метод множення матриць. Це роблять із-за того, що алгоритм Штрассена втрачає ефективність у порівнянні зі звичайним на малих матрицях в силу більшого числа складань.
let's go to practice!

Для реалізації алгоритму Штрассена нам знадобляться додаткові функції. Як було сказано вище, алгоритм працює лише з квадратними матрицями, розмірність яких дорівнює ступеню 2, тому наведемо вихідні матриці до такого виду.

Для цього була реалізована функція, що визначає нову розмірність:

private static int log2(int x) {
int result = 1;
while ((x >>= 1) != 0) result++;
return result;
}

//******************************************************************************************

private static int getNewDimension(int[][] a, int[][] b) {
return 1 << log2(Collections.max(Arrays.asList(a.length, a[0].length, b[0].length)));
// Л - Лаконічно
}

І функція, яка розширює матрицю до потрібного розміру:

private static int[][] addition2SquareMatrix(int[][] a, int n) {
int[][] result = new int[n][n];

for (int i = 0; i < a.length; i++) {
for (int j = 0; j < a[i].length; j++) {
result[i][j] = a[i][j];
}
}
return result;
}

Тепер вихідні матриці задовольняють вимогам для реалізації алгоритму Штрассена. Також нам знадобиться функція, яка дозволить розбити матрицю розміром n*n на чотири матриці (n/2)*(n/2) і зворотна для відновлення матриці:

private static void splitMatrix(int[][] a, int[][] a11, int[][] a12, int[][] a21, int[][] a22) {
int n = a.length >> 1;

for (int i = 0; i < n; i++) {
System.arraycopy(a[i], 0, a11[i], 0, n);
System.arraycopy(a[i], n, a12[i], 0, n);
System.arraycopy(a[i + n], 0, a21[i], 0, n);
System.arraycopy(a[i + n], n, a22[i], 0, n);
}
}

//******************************************************************************************

private static int[][] collectMatrix(int[][] a11, int[][] a12, int[][] a21, int[][] a22) {
int n = a11.length;
int[][] a = new int[n << 1][n << 1];

for (int i = 0; i < n; i++) {
System.arraycopy(a11[i], 0, a[i], 0, n);
System.arraycopy(a12[i], 0, a[i], n, n);
System.arraycopy(a22[i], 0, a[i + n], n, n);
}
return a;
}

Ось ми і дісталися до самого цікавого, основна функція множення матриць алгоритмом Штрассена виглядає наступним чином:

Алгоритм Штрассена
private static int[][] multiStrassen(int[][] a, int[][] b, int n) {
if (n < = 64) {
return multiply(a, b);
}

n = n >> 1;

int[][] a11 = new int[n][n];
int[][] a12 = new int[n][n];
int[][] a21 = new int[n][n];
int[][] a22 = new int[n][n];

int[][] b11 = new int[n][n];
int[][] b12 = new int[n][n];
int[][] b21 = new int[n][n];
int[][] b22 = new int[n][n];

splitMatrix(a, a11, a12, a21, a22);
splitMatrix(b, b11, b12, b21, b22);

int[][] p1 = multiStrassen(summation(a11, a22), summation(b11, b22), n);
int[][] p2 = multiStrassen(summation(a21, a22), b11, n);
int[][] p3 = multiStrassen(a11, subtraction(b12, b22), n);
int[][] p4 = multiStrassen(a22, subtraction(b21, b11), n);
int[][] p5 = multiStrassen(summation(a11, a12), b22, n);
int[][] p6 = multiStrassen(subtraction(a21, a11), summation(b11, b12), n);
int[][] p7 = multiStrassen(subtraction(a12, a22), summation(b21, b22), n);

int[][] c11 = summation(summation(p1, p4), subtraction(p7, p5));
int[][] c12 = summation(p3, p5);
int[][] c21 = summation(p2 p4);
int[][] c22 = summation(subtraction(p1, p2), summation(p3, p6));

return collectMatrix(c11, c12, c21, c22);
}


На цьому можна було б і закінчити. Реалізований алгоритм працює домашка виконана, але допитливі розуми жадають дорослий perfomance. Хай буде з нами Java 7.

Пора розпаралелити
Java 7 надає прекрасний API для розпаралелювання рекурсивних завдань. З її виходом з'явилося одне з доповнень до пакетів java.util.concurrent — реалізація парадигми Divide and Conquer — Fork-Join. Ідея полягає в наступному: рекурсивно розбиваємо завдання на підзавдання, вирішуємо, а потім об'єднуємо результати. Більш детально з даною технологією можна ознайомитися в документации.

Подивимося як легко і ефективно можна застосувати цю парадигму до нашого алгоритму Штрассена.

Реалізація алгоритму з Fork/Join
private static class myRecursiveTask extends RecursiveTask<int[][]> {
private static final long serialVersionUID = -433764214304695286L;
int n;
int[][] a;
int[][] b;

public myRecursiveTask(int[][] a, int[][] b, int n) {
this.a = a;
this.b = b;
this.n = n;
}

@Override
protected int[][] compute() {
if (n < = 64) {
return multiply(a, b);
}

n = n >> 1;

int[][] a11 = new int[n][n];
int[][] a12 = new int[n][n];
int[][] a21 = new int[n][n];
int[][] a22 = new int[n][n];

int[][] b11 = new int[n][n];
int[][] b12 = new int[n][n];
int[][] b21 = new int[n][n];
int[][] b22 = new int[n][n];

splitMatrix(a, a11, a12, a21, a22);
splitMatrix(b, b11, b12, b21, b22);

myRecursiveTask task_p1 = new myRecursiveTask(summation(a11,a22),summation(b11,b22),n);
myRecursiveTask task_p2 = new myRecursiveTask(summation(a21,a22),b11,n);
myRecursiveTask task_p3 = new myRecursiveTask(a11,subtraction(b12,b22),n);
myRecursiveTask task_p4 = new myRecursiveTask(a22,subtraction(b21,b11),n);
myRecursiveTask task_p5 = new myRecursiveTask(summation(a11,a12),b22,n);
myRecursiveTask task_p6 = new myRecursiveTask(subtraction(a21,a11),summation(b11,b12),n);
myRecursiveTask task_p7 = new myRecursiveTask(subtraction(a12,a22),summation(b21,b22),n);

task_p1.fork();
task_p2.fork();
task_p3.fork();
task_p4.fork();
task_p5.fork();
task_p6.fork();
task_p7.fork();

int[][] p1 = task_p1.join();
int[][] p2 = task_p2.join();
int[][] p3 = task_p3.join();
int[][] p4 = task_p4.join();
int[][] p5 = task_p5.join();
int[][] p6 = task_p6.join();
int[][] p7 = task_p7.join();

int[][] c11 = summation(summation(p1, p4), subtraction(p7, p5));
int[][] c12 = summation(p3, p5);
int[][] c21 = summation(p2 p4);
int[][] c22 = summation(subtraction(p1, p2), summation(p3, p6));

return collectMatrix(c11, c12, c21, c22);
}
}


Кульмінація
Вам, напевно, вже не терпиться подивитися на порівняння продуктивності роботи алгоритмів на реальному залозі. Відразу обмовимо, що тестування будемо проводити на квадратних матрицях. Отже, ми маємо:

  1. Традиційний (Кубічний) метод множення матриць
  2. Традиційний із застосуванням транспонування
  3. Алгоритм Штрассена
  4. Распараллеленный алгоритм Штрассена
Розмірність матриць будемо задавати в інтервалі [100..4000] і з кроком 100.

image

Як і очікувалося, наш перший алгоритм одразу випав з трійки лідерів. Але ось з його модернізованим братом(варіант з транспонуванням) не все так просто. Навіть на досить великих розмірностях даний алгоритм не тільки не поступається, але і часто перевершує однопотоковий алгоритм Штрассена. Особливості читання багатовимірних масивів в Java дають про себе знати! І все ж, маючи в рукаві козир у вигляді Fork-Join Framework'а, нам вдалося отримати вагомий приріст продуктивності. Розпаралелювання алгоритму Штрассена дозволило скоротити час перемноження майже в 3 рази, а також очолити наш підсумковий тотал.

» Вихідний код розміщений тут.

Будемо раді відгуків і зауважень до нашої роботи. Спасибі за увагу!
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.