MapReduce з підручних матеріалів. Частина II – базові інтерфейси реалізації

Take it like a man by John PollakВ попередній частині серії ми (в 100500й раз) спробували розповісти про основні прийоми і стадії підходу Google MapReduce, повинен зізнатися, що перша частина була намір "капітанською", щоб дати знати про MapReduce цільової аудиторії наступних статей. Ми не встигли показати ні строчки того, як все це ми збираємося реалізовувати в Caché ObjectScript. І про це наша розповідь сьогодні (і в наступні дні).
Нагадаємо початковий посил нашого міні-проекту: ви все ще плануємо реалізувати MapReduce алгоритм використовуючи підручні засоби, що є в Caché ObjectScript. При створенні інтерфейсів, ми спробуємо дотримуватися того API, що ми описали в попередній статті про оригінальну реалізацію Google MapReduce, будь девіації будуть озвучені відповідно.

Почнемо з реалізації абстрактних інтерфейсів Mapper і Reducer.
Class MR.Base.Mapper 
{
Method Map(MapInput As MR.Base.Iterator, MapOutput As MR.Base.Emitter) [ Abstract ] { }
}

Class MR.Base.Reducer
{
Method Reduce(ReduceInput As MR.Base.Iterator, ReduceOutput As MR.Base.Emitter) [ Abstract ] { }
}

Спочатку, як і в канонічній реалізації, ми зробили 2 окремих інтерфейсу MapInput і ReduceInput. Але відразу стало очевидним, що вони служать однієї і тієї ж мети, і надають одні й ті ж методи – їх мета пройтися по потоку даних до кінця, т. ч. вони обидва є итераторами. Тому, у підсумку, редуцируем їх у загальний інтерфейс MR.Base.Iterator:
Class MR.Base.Iterator
{

Method GetNext() As %String [Abstract ] { }

Method IsAtEnd() As %Boolean [Abstract ] { }

}

Використання глобалів в якості каналів зв'язку
Оригінальна реалізація Google MapReduce використовувала файлову систему Google GFS як транспорт між вузлами і стадіями алгоритму. У Caché є свій механізм поширення (когерентних) даних між вузлами (якщо не користуватися голим TCP/UDP) – це протокол ECP (Enterprise Сасһе Protocol). Зазвичай він використовується серверами додатків для отримання даних від віддалених серверів баз даних. Але нічого не зупиняє нас від побудови на базі таких peer-to-peer сполук ECP якійсь віртуальній керуючої шини, куди ми будемо складати дані у вигляді пар <ключ, значення> або схожі дані. Ці дані будуть пересилатися між акторами, які беруть участь у конвеєрах алгоритму (тобто emit, посланий об'єктом Mapper, буде писатися в шину ECP і читатися об'єктом Reducer). Якщо гравці будуть працювати в рамках одного вузла, то вони, наприклад, можуть використовувати швидкі глобаль, відображені в CACHETEMP, або звичайні глобаль, якщо реалізований алгоритм багатостадійний і потрібна надійність і журналювання.
У будь-якому випадку, будь то локальні (для конфігурації на одному вузлі) глобаль, або глобаль віддаленого вузла, підключеного через ECP, глобаль є зручним і добре зарекомендували себе транспортом для передачі даних між вузлами кластера Caché, в даному випадку, між залученими в MapReduce функціями і класами.
Тому, природним рішенням, що дозволяє спростити нашу систему буде використання в середовищі Caché для передачі даних між вузлами кластера протоколу ECP замість файлових систем GFS або HDFS. Функціональні характеристики ECP дозволять зробити і інші спрощення (але про це дещо пізніше).
Емітери та чорна магія
Як ми вже розповідали в попередній серії, з моменту коли дані йдуть від об'єкта Mapper, і до моменту як вони поступають на вхід Reducer, в класичній реалізації на майстрі проходить важка операція перемешения і сортування.
В оточенні, що використовує глобаль до якості транспорту, в MUMPS/Caché ObjectScript середовищі, ми можемо повністю уникнути додаткових витрат на таку сортування, т. к. агрегація і сортування будуть зроблені нижележащим btree* сховищем.
Маючи такі вимоги до дизайну, створимо базовий інтерфейс емітера:
Class MR.Base.Emitter Extends MR.Base.Iterator
{

/// emit $listbuild(key,value(s))
Method Emit(EmitList... As %String) [Abstract ] { }

}

Емітер повинен бути схожий на інтерфейс вхідного ітератора, показаного вище (тому ми і пронаследовались від MR.Base.Iterator), але на додаток до інтерфейсу проходу за даними, емітер повинен вміти ще й посилати дані свого проміжне сховище (тобто додаємо функцію Emit).
Спочатку, наша функоция Emit була дуже схожа на класичну релизацию і приймала тільки 2 аргументу як пару <ключ, значення>, але потім ми натрапили на рідкісну) необхідність передавати що-то більш багатовимірне, довший ніж пара значень (наприклад, кортеж будь арности), тому, на даний момент, Emit став функцією приймаючої змінне число аргументів.
Зауважимо, що в більшості випадків, на практиці, сюди надходитиме тільки пара аргументів <ключ, значення> як ми і бачили в класичній реалізації.
Це все ще абстрактний інтерфейс, більше м'яса буде додано дуже скоро.
Якщо б нам, при обробці, треба було зберігати порядок надійшли елементів, то ми б використовували реалізацію нижче:
/// Emitter which maintains the order of (key,value(s))
Class MR.Emitter.Ordered Extends (%RegisteredObject, MR.Base.Emitter)
{
/// global name serving as data channel
Property GlobalName As %String;
Method %OnNew(initval As %String) As %Status
{
$$$ThrowOnError($length(initval)>0)
set ..GlobalName = initval 
quit $$$OK
}

Parameter AUTOCLEANUP = 1;
Method %OnClose() As %Status
{
if ..#AUTOCLEANUP {
if $data(@i%GlobalName) {
kill @i%GlobalName
}
}
Quit $$$OK
}
...
}

Зауважимо на полях, що в Caché глобаль – загалом-то, глобальні :), і не будуть очищені автоматично по завершенні процесів їх створили. На відміну, наприклад, від PPG (process-private globals). Але іноді все ж хочеться, щоб наші проміжні канали, створені для взаємодії між стадіями конвеєра MapReduce віддалялися по завершенні підпрограми їх створила. Тому й був доданий режим "автоочищення" (параметр класу #AUTOCLEANUP) при якому глобал, ім'я якого зберігається у властивості GlobalName, буде видалений при закритті об'єкта (у момент виклику %OnClose).
Зверніть увагу, що ми форсуємо один обов'язковий параметр методу %New (у %OnNew генеруємо $$$ThrowOnError якщо ім'я в Initval не визначено). Конструктор класу очікує отримати назву глобал з яким він буде працювати в якості транспорту даних.
Class MR.Emitter.Ordered Extends MR.Base.Emitter
{
/// ... 
Method IsAtEnd() As %Boolean
{
quit ($data(@i%GlobalName)\10)=0
}

/// emit $listbuild(key,value)
Method Emit(EmitList... As %String)
{
#dim list As %String = ""
for i=1:1:$get(EmitList) {
set $li(list,i) = $get(EmitList(i))
}
#dim name As %String = ..GlobalName
set @name@($seq(@name)) = list
}

/// returns emitted $lb(key,value)
Method GetNext() As %String
{
#dim value As %String
#dim index As %String = $order(@i%GlobalName@(""), 1, value)

if index '= "" {
kill @i%GlobalName@(index)
quit value
} else {
kill @i%GlobalName
quit ""
}
}

Method Dump()
{
zwrite @i%GlobalName
}

}

Сподіваємося, ви ще пам'ятаєте, що наш Emitter є спадкоємцем ітератора Iterator? Тому йому потрібно реалізувати кілька функцій ітератора – IsAtEnd і GetNext.
  • IsAtEnd – простий: якщо наш службовий глобал не містить даних (тобто $data(..GlobalName) не повертає 10 або 11, що означає що там у поддереве є ще вузли з даними), то ми досягли кінця потоку даних;
  • Emit створює сайт з даними в кінці поточного списку. Оформляючи пару (або кортеж, при арности більше 2х) як елемент $(listbuild(...)) [listbuild].
Як відомо, і як добре написав Саша Коблов, $SEQUENCE може бути використана майже у всіх місцях, де використовувався $INCREMENT, забезпечуючи при цьому кращі швидкості при роботі в багатопроцесорному або многосерверном режимі (через ECP). У силу меншої кількості колізій при зверненні до одному вузлу глобала. Тому в коді вище ми використовуємо $sequence для виділення індексу наступного елемента впорядкованого списку.
  • На іншій стороні алгоритму, в одержувача GetNext() витягує елементи з колекції допомогою простого $ORDER(@i%GlobalName("")). Елемент, з отриманим індексом буде видалений зі списку після обробки.
Звертаємо увагу, що даний варіант видалення елемента зі списку/глобала не дуже сумісний з паралельним режимом, і треба було б додати блокування або змінити структуру даних. Але оскільки на найближчі серії у нас буде лише один Reducer, на всі безліч Mapper ів, то ми залишимо розв'язання цієї проблеми на майбутнє, коли приступимо до багато-серверної реалізації.

Зауважимо, що структура даних, реалізована MR.Emitter.Ordered по суті реалізують класичну колекцію FIFO ("FirstIn – FirstOut"). Ми поміщаємо новий елемент в кінець списку і витягуємо з голови списку.
Спеціальний випадок: емітер з автоагрегацией
Якщо ви подивитеся на ті дані, що ми посилаємо в між стадіями конвеєра в прикладі word count (ок, не зараз, а коли ми вам покажемо таку реалізацію) то ви швидко зрозумієте, що:
  • насправді нам не цікавий порядок, в якому ми "эмиттим" пари <ключ, значення>. Більш того, нижче лежачі сховище btree* завжди тримає список ключів відсортованим для швидкого пошуку, позбавляючи нас від необхідності сортування на майстра, як сталося б у класичній реалізації;
  • І в наших випадках, коли ми пишемо пару <key,1> на стороні Mapper, ми припускаємо в Reducer їх просту агрегацію на суму одиниць. Тобто у разі Caché ObjectScript ми покладалися б на використання $INCREMENT.
Так навіщо посилати такий великий трафік непотрібних даних, якщо ми можемо їх агрегувати ще в момент посилки?
Саме так і працює MR.Emitter.Sorted, який є спадкоємцем MR.Emitter.Ordered (показаного вище):
/// Emitter which sorts by all keys emitted pairs or tuples (key, value(s))
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered
{
Property AutoIncrement As %Boolean [ InitialExpression = 1 ];
/// emit $listbuild(key,value)
Method Emit(EmitList... As %String)
{
#dim name As %String = ..GlobalName
#dim key As %String
#dim value As %String 

if $get(EmitList)=1 {
// special case - only key name given, no value
set key = $get(EmitList(1))
quit:key=""
if ..AutoIncrement {
#dim dummyX As %Integer = $increment(@name@(key)) ; $seq is non-deterministic
} else {
set @name@(key) = 1
}
} else {
set value = $get(EmitList(EmitList))
set EmitList = EmitList - 1
for i=1:1:$get(EmitList) {
#dim index As %String = $get(EmitList(i))
quit:index=""
set name = $name(@name@(index))
}
if ..AutoIncrement {
#dim dummyY As %Integer = $increment(@name,value)
} else {
set @name = value
}
}
}
/// ...
}

Для найпростішого випадку, видачі пари <key,1> або, коли значення опущено, і має один ключ <key> ми реалізували локальну оптимізацію, коли в режимі автоинкремента (AutoIncrement = 1) ми при виклику відразу інкрементуємо відповідний лічильник для ключа. Якщо ж не включений автоинкремент, то ми просто (пере)визначаємо вузол ключа в 1, фіксуючи факт передачі ключа.
Для більш загального випадку, з двома елементами, пари ключ-значення <key,value> або навіть з великою кількістю елементів <key,key2,key3,...keyn,value> (кортеж будь арности) у нас знову ж реалізовано 2 режими роботи:
  • автоинкременте ми відразу підсумовуємо значення відповідного вузла, адресуемого ключем(ключами) з переданим значенням;
  • без автоинкремента – ми присвоюємо відповідного вузла, адресуемому списком ключів, передане значення value.
Звертаємо увагу, що кортеж ми передаємо за допомогою масиву, акумулюючого змінну кількість аргументів. Всі елементи цього масиву крім останнього, підуть як адреси подындексов. Останній елемент кортежу буде вважатися значенням.
Таке незвичайне розширення пари ключ-значення в кортежі будь-якої потужності, за нашими даними, є нетиповим або може бути унікальним. Нам не треба працювати з суворим key-value сховищем або bigtable сховищем, і ми з легкістю можемо працювати з багатовимірними ключами в переданих елементах ("тому що можемо"), що може сильно полегшити деякі реалізації алгоритмів, що потребують додаткової розмірності даних, що сильно покращує читабельність коду і спрощує розуміння. В теорії...
Зауважимо, що ми не переопределили IsAtEnd і він пронаследовал реалізацію з MR.Emitter.Ordered, таким чином він буде повертати ненульове значення по закінченні даних в подузлах проміжного сховища.
Але GetNext нам треба перевизначити, т. до. ми більше не намагаємося запам'ятати порядок надісланих даних і формат його внутрішнього сховища змінився:
Class MR.Emitter.Sorted Extends MR.Emitter.Ordered 
{
/// ...

/// returns emitted $lb(key,value)
Method GetNext() As %String
{
#dim name As %String = ..GlobalName
#dim value As %String
#dim ref As %String = $query(@name,1,value)
if ref'="" {
zkill @ref
#dim i As %Integer
#dim refLen As %Integer = $qlength(ref)
#dim baseLen As %Integer = $qlength(name)
#dim listbuild = ""
for i=baseLen+1:1:refLen {
set $li(listbuild,i-baseLen)=$qs(ref,i)
}
set $li(listbuild,*+1)=value

quit listbuild
}

quit ""
}

}

На виході з GetNext() ми очікуємо $LISTBUILD<> список, але всередині сховища дані пар/кортежів розкидані по вузлам ієрархічного сховища. Функція $QUERY дозволяє обійти вузли з даними значеннями пар/кортежів) в масиві для подальшої їх перепакування $LISTBUILD формат, індекси з масиву послідовно додаються наступним елементом списку (за допомогою присвоювання елементу через функцію $LIST. Саме ж значення вузла сховища (значення в парі «ключ-значення» або останній елемент кортежу) буде додано в кінець сформованого списку через ту ж функцію $LIST(listbuild,*+1). В даному випадку *+1 якраз і позначать номер елемента списку, наступний за поточним кінцем.
На цьому несподіваному місці ми перервемо наша розповідь про MapReduce в Caché. У другій частині цієї розповіді ми показали базові інтерфейси інфраструктури, які будуть використані в подальшому при реалізації конкретних прикладів. Вже в наступній серії ми зберемо це все докупи і реалізуємо класичний приклад WordCount, але вже на ObjectScript. Не йдіть далеко!
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.