MapReduce на коліні. Частина III – збираємо всі разом

Mosaic by Joan PollakУ першій (досить капітанською) частини цієї серії ми розповіли про базові концепції MapReduce чому це погано, чому це неминуче, і як з цим жити в інших середовищах розробки (якщо ви не про Сі++ або Java). У другий частини ми таки почали розповідати про базові класи реалізації MapReduce на Caché ObjectScript, ввівши абстрактні інтерфейси і їх первинні реалізації.
Прийшов сьогодні наш день! – ми покажемо перший приклад зібраний у парадигмі MapReduce, так, він буде дивний і не найефективніший, і зовсім не розподілений, але цілком MapReduce.


WordCount – проста, послідовна реалізація
Ви вже, напевно, помітили, що MapReduce – це про паралелізм і масштабування. Але давайте зізнаємося відразу – алгоритм, який-би елегантний і простий він не був би, дуже складно налагоджувати відразу в його в паралельної інкарнації. Зазвичай, для простоти, ми стартуємо з послідовної версії (в нашому випадку це буде алгоритм wordcount) і потім підмішати трохи паралелізму.

Виконання в середовищі MapReduce з статті "MapReduce: Simplified Data Processing on Large Clusters", OSDI-2004
Нагадаємо суть задачки wordcount: у нас є набір текстів (наприклад, всі томи «Війни і мир») і вам потрібно підрахувати кількість слів у всьому масиві. Цей простий приклад використовувався в оригінальній статті Google про MapReduce, бо кожен наступний, що розповідає про MapReduce використовує той же самий приклад. Вважайте це "HelloWorld!" паралельного виконання.
Отже, послідовна реалізація WordCount (але з застосуванням MapReduce інтерфейсів, введених раніше) буде містити всі ті ж самі, що і паралельна. І, наприклад, mapper буде виглядати приблизно так:
Class MR.Sample.WordCount.Mapper Extends (%RegisteredObject, MR.Base.Mapper)
{
/// read strings from MR.Base.Iterator and count words
Method Map(MapInput As MR.Base.Iterator, MapOutput As MR.Base.Emitter)
{
while 'MapInput.IsAtEnd() {
#dim line As %String = MapInput.GetNext()
//#dim pattern As %Regex.Matcher = ##class(%Regex.Matcher).%New("[\w]+")
#dim pattern As %Regex.Matcher = ##class(%Regex.Matcher).%New("[^\s]+")
set pattern.Text = line
while pattern.Locate() {
#dim word As %String = pattern.Group
do MapOutput.Emit(word)
}
}
}
}

Підпрограма Map отримує «вхідний потік» через параметр MapInput, і эмиттит дані в вихідний MapOutput. Алгоритм тут очевидна – якщо у вхідному потоці ще залишилися дані (тобто НЕ MapInput.IsAtEnd() ), то він прочитає таку «рядок» через MapInput.GetNext(), розіб'є рядок на слова за допомогою %Regex.Matcher (дивись хорошу вступну статтю про використання регулярних виразів в Caché на порталі спільноти «Using Regular Explressions in Caché») і кожне виділене слово пересилається у вихідний емітер.
В класичному MapReduce інтерфейсі ми завжди эмиттим «
ключ, значення
», в даному випадку ми зробили спрощення для випадку «
ключ, 1
», використовуючи форму з 1 аргументом. Пояснення дано у попередній частині
Процедура згортки (reducer) ще простіше:
Class MR.Sample.WordCount.Adder Extends (%RegisteredObject, MR.Base.Reducer)
{
Method Reduce(ReduceInput As MR.Base.Iterator, ReduceOutput As MR.Base.Emitter)
{
#dim As result %Numeric = 0
while 'ReduceInput.IsAtEnd() {
#dim value As %String = ReduceInput.GetNext() ; get <key,value> in $listbuild format
#dim word As %String = $li(value,1)
#dim count As %Integer = +$li(value,2)
set result = result + count
}
do ReduceOutput.Emit("Count", result)
}
}

Поки не зустріли кінця потоку (
'ReduceInput.IsAtEnd()
) той продовжує споживати дані з потоку ReduceInput, і на кожній ітерації з потоку виймається пара «ключ-значення в бінарному форматі списку
$listbuild<>
(тобто у вигляді
$lb(word count)
).
Дана функція агрегує число слів в змінну result і эмитит її підсумкове значення н наступну стадію конвеєра через потік ReduceOutput.
Отже, ми показали mapper і reducer, прийшла черга показати головну, керуючу частину програми. Не ризикуючи відразу впертися складність паралелізму, ми заходимо з послідовної версії алгоритму, хоча і використовує MapReduce ідіому і інтерфейси. Так, у послідовному режимі, всі ці віджимання з конвеєром, не мають великого сенсу, але… спрощення необхідно в педагогічних цілях.
/// Спрощена, одне-потокова версія прикладу "map-reduce".
/Class MR.Sample.WordCount.App Extends %RegisteredObject
{
ClassMethod MapReduce() [ ProcedureBlock = 0 ]
{
new
//kill ^mtemp.Map,^mtemp.Reduce

#dim infraPipe As MR.Sample.GlobalPipe = ##class(MR.Sample.GlobalPipe).%New($name(^mtemp.Map($J)))
for i=1:1 {
#dim fileName As %String = $piece($Text(DATA+i),";",3)
quit:fileName=""
// map
#dim inputFile As MR.Input.FileLines = ##class(MR.Input.FileLines).%New(FileName)
#dim mapper As MR.Sample.WordCount.Mapper = ##class(MR.Sample.WordCount.Mapper).%New()

do mapper.Map(inputFile, infraPipe)

// reduce
#dim outPipe As MR.Base.Emitter = ##class(MR.Emitter.Sorted).%New($name(^mtemp.Reduce($J)))
#dim reducer As MR.Sample.WordCount.Adder = ##class(MR.Sample.WordCount.Adder).%New()
while 'infraPipe.IsAtEnd() {
do reducer.Reduce(infraPipe, outPipe)
}
do outPipe.Dump()
}
quit

DATA
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol1.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol2.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol3.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol4.txt
;;
}
}

Спробуємо пояснити цей код рядок за рядком:
  • У звичайному випадку ми такого не рекомендуємо робити, але в даному випадку це необхідно: ми вимикаємо процедурні блоки ProcedureBlock = 0 і повертаємося до старої семантиці з ручним керуванням вмістом таблиці символів з локальними змінними. Нам це потрібно для вбудовування блоку DATA містить вхідні дані (в даному випадку шляху до вхідних файлів), до яких ми будемо звертатися через функцію
    $TEXT
    . В даному випадку ми використовуємо 4 томи «Війни і миру» Льва Толстого;
  • Ми будемо використовувати для проміжного зберігання даних між стадіями конвеєра глобаль виду
    ^mtemp.Map($J)
    та
    ^mtemp.Reduce($J)
    . По чарівному збігом обставин, глобаль виду
    ^mtemp*
    та
    ^CacheTemp*
    автоматично відображаються в тимчасову базу CACHETEMP не будуть журналироваться і будуть підтримуватися в пам'яті (наскільки це можливо). Будемо розглядати їх як "in-memory" глобаль.
  • Проміжний канал intraPipe є екземпляром
    MR.Sample.GlobalPipe
    , який в нашому випадку – просто синонім класу
    MR.EmitterSorted
    , і як ми описали в попередній частині автоматично очищується в кінці роботи програми.
Class MR.Sample.GlobalPipe Extends (%RegisteredObject, MR.Emitter.Sorted) { }

  • Ми проходимо по рядках
    $TEXT(DATA+i)
    , витягуємо 3ий аргумент рядка, розділеної ";". Якщо результат непорожній ми використовуємо це значення як ім'я вхідного файлу.
  • Вхідний ітератор «маппера» (об'єкта відображення) будемо примірником MR.Input.FileLines, який ми ще не показували...
Class MR.Input.FileLines Extends (%RegisteredObject, MR.Base.Iterator)
{
Property File As %Stream.FileCharacter;
Method %OnNew(FileName As %String) As %Status
{
set ..File = ##class(%Stream.FileCharacter).%New()
#dim sc As %Status = ..File.LinkToFile(FileName)
quit sc
}
Method GetNext() As %String
{
if $isobject(..File) && '..File.AtEnd {
quit ..File.ReadLine()
}
quit ""
}
Method IsAtEnd() As %Boolean
{
quit '$isobject(..File) || ..File.AtEnd
}
}

Повернемося назад до додатка
MR.Sample.WordCount.App
:
  • Об'єкт «маппер» буде примірником вже відомого
    MR.Sample.WordCount.Mapper
    (див. вище). Примірник створюється окремо для кожного оброблюваного файлу.
  • В циклі ми послідовно викликаємо функцію Map маппера, передаючи примірник вхідного потоку, що працює з відкритим файлом. В цьому конкретному випадку стадія відображення линеаризуется в послідовному циклі. Що не дуже типово для MapReduce але потрібно як спрощеного вправи.
  • На стадії згортки ми отримуємо: вихідний об'єкт емітера (
    outPipe
    ), екземпляр
    MR.Emitter.Sorted
    , який вказує на
    ^mtemp.Reduce($J)
    . Нагадую, що специфікою
    MR.Emitter.Sorted
    буде використання реалізації B*-Tree в движку Caché для різних оптимізацій. Ключі-значення зберігаються в персистентном сховище природним чином відсортованими, і тому стають можливими реалізації згортки з автоинкрементом вихідних значень.
  • Об'єкт згортки є екземпляром
    MR.Sample.WordCount.Adder
    описаного вище.
  • Для кожного відкритого файлу, і на тій же ітерації циклу, ми викликаємо
    reducer.Reduce
    , передаючи туди як проміжний потік
    infraPipe
    , так і вихідний потік.
Начебто всі частини в зборі – давайте подивимося, як це все працює.
DEVLATEST:MAPREDUCE:23:53:27:.000203>do ##class(MR.Sample.WordCount.App).MapReduce()
^mtemp.Reduce(3276,"Count")=114830
^mtemp.Reduce(3276,"Count")=123232
^mtemp.Reduce(3276,"Count")=130276
^mtemp.Reduce(3276,"Count")=109344

Тут ми бачимо обчислена кількість слів у кожному томі книги, яке виводиться в кінці кожної ітерації циклу. Це все добре, але залишаються 2 запитання, на які ми не отримали відповідь:
  • Яке загальне число слів у всіх томах?
  • І чи впевнені ми, що видані числа коректні? Що, до речі, не початковому етапі написання програм є більш важливим.
Почнемо з відповіді на друге питання, з верифікації результату – перевірити це просто, запустивши Linux/Unix/Cygwin утиліту
wc
на тих же самих даних:
Timur@TimurYoga2P /cygdrive/c/Users/Timur/Documents/mapreduce/data
$ wc -w war*.txt
114830 war_and_peace_vol1.txt
123232 war_and_peace_vol2.txt
130276 war_and_peace_vol3.txt
109344 war_and_peace_vol4.txt
477682 total

Бачимо, що обчислена кількість слів для кожного тома було правильним, т. ч. перейдемо до обчислення фінального, агрегатного значення.
Змінений варіант – з підрахунком загальної суми
Для підрахунку фінальної суми нам треба внести 2 простих змін в код програми показаний вище:
  • Треба застосувати метод рефакторінгу "Extract Method" на частини коду маппера. Надалі нам ця частина коду знадобиться окремо, у вигляді методу класу,, що, в підсумку, спростить подальші модифікації з параллелизацией або навіть віддаленим виконанням коду.
  • Також, нам потрібно винести инстанцирование об'єктів reducer і виклик його функції Reduce з циклу зовні. Мета такої модифікації – не видаляти проміжний канал з даними в кінці кожної ітерації, і продовжувати акумулювати дані між ітераціями, для показу загальної суми після циклу. Агрегатна сума буде підраховуватися автоматично, т. до. ми застосуємо автоінкрементний варіант.
У всіх інших випадках ці два наведених прикладу ведуть себе ідентично – обидва використовують тимчасові глобаль
^mtemp.Map($J)
та
^mtemp.Reduce($J)
в якості проміжного та фінального сховища на стадіях відображення і згортки.
Class MR.Sample.WordCount.AppSum Extends %RegisteredObject
{
ClassMethod Map(FileName As %String, infraPipe As MR.Sample.GlobalPipe)
{
#dim inputFile As MR.Input.FileLines = ##class(MR.Input.FileLines).%New(FileName)
#dim mapper As MR.Sample.WordCount.Mapper = ##class(MR.Sample.WordCount.Mapper).%New()

do mapper.Map(inputFile, infraPipe)
}
ClassMethod MapReduce() [ ProcedureBlock = 0 ]
{
new

#dim infraPipe As MR.Sample.GlobalPipe = ##class(MR.Sample.GlobalPipe).%New($name(^mtemp.Map($J)))
#dim outPipe As MR.Base.Emitter = ##class(MR.Emitter.Sorted).%New($name(^mtemp.Reduce($J)))
#dim reducer As MR.Sample.WordCount.Adder = ##class(MR.Sample.WordCount.Adder).%New()

for i=1:1 {
#dim fileName As %String = $piece($Text(DATA+i),";",3)
quit:fileName=""

do ..Map(fileName, infraPipe)
//do infraPipe.Dump()

}
while 'infraPipe.IsAtEnd() {
do reducer.Reduce(infraPipe, outPipe)
}
do outPipe.Dump()

quit
DATA
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol1.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol2.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol3.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol4.txt
;;
}
}

Паралельна реалізація
Давайте відразу зізнаємося собі – такі віджимання з MapReduce інтерфейсами при створенні простого алгоритму для підрахунку слів не були найпростішим, очевидним і природним підходом при розробці такої тривіальної програми. Але потенційні «плюшки», які ми можемо отримати тут все ж таки переважують всі початкові проблеми та додаткову біль. При розумному плануванні паралелізму і з застосуванням відповідних алгоритмів, ми можемо отримати масштабування, яке складно отримати на послідовних алгоритмах. Наприклад, в даному випадку, на простому low-power Haswell ULT ноутбуці, на якому пишеться ця стаття, послідовний алгоритм відпрацьовував за 4.5 секунди, тоді як паралельна версія завершувалася за 2.6 секунди.
Різниця не така драматична, але досить суттєва, особливо беручи до уваги малий вхідний набір і всього два ядра на ноутбуці.
Повернемося до коду – на попередньому етапі ми, на стадії відображення, виділили функцію в окремий метод класу, отримує два аргументи (ім'я вхідного файлу і ім'я вихідного глобал). Ми виділили даний код в окрему функцію з однією простою метою – полегшити створення паралельної версії. Така паралельна версія буде використовувати механізм worker в Caché ObjectScript ($syste.WorkMgr) Нижче ми перетворимо послідовну версію, створену на попередньому кроці, в паралельну допомогою виклику програм обробників (worker), що запускаються з вибраним методом класу.
/// Версія #2 Більш просунута, що використовує кілька воркеров
Class MR.Sample.WordCount.AppWorkers Extends %RegisteredObject
{
ClassMethod Map(FileName As %String, InfraPipeName As %String) As %Status
{
#dim inputFile As MR.Input.FileLines = ##class(MR.Input.FileLines).%New(FileName)
#dim mapper As MR.Sample.WordCount.Mapper = ##class(MR.Sample.WordCount.Mapper).%New()
#dim infraPipe As MR.Sample.GlobalPipeClone = ##class(MR.Sample.GlobalPipeClone).%New(InfraPipeName)

do mapper.Map(inputFile, infraPipe)

quit $$$OK
}
ClassMethod MapReduce() [ ProcedureBlock = 0 ]
{
new
#dim infraPipe As MR.Sample.GlobalPipe = ##class(MR.Sample.GlobalPipe).%New($name(^mtemp.Map($J)))
#dim outPipe As MR.Base.Emitter = ##class(MR.Emitter.Sorted).%New($name(^mtemp.Reduce($J)))
#dim reducer As MR.Sample.WordCount.Adder = ##class(MR.Sample.WordCount.Adder).%New()

#dim sc As %Status = $$$OK
// do $system.WorkMgr.StopWorkers()
#dim queue As %SYSTEM.WorkMgr = $system.WorkMgr.Initialize("/multicompile=1", .sc) 
quit:$$$ISERR(sc)

for i=1:1 {
#dim fileName As %String = $piece($Text(DATA+i),";",3)
quit:fileName=""

//do ..Map(fileName, infraPipe)
set sc = queue.Queue("##class(MR.Sample.WordCount.AppWorkers).Map", fileName, infraPipe.GlobalName)
quit:$$$ISERR(sc)
}
set sc = queue.WaitForComplete() quit:$$$ISERR(sc)

while 'infraPipe.IsAtEnd() {
do reducer.Reduce(infraPipe, outPipe)
}
do outPipe.Dump()

quit
DATA
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol1.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol2.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol3.txt
;;C:\Users\Timur\Documents\mapreduce\data\war_and_peace_vol4.txt
;;
}

Раніше приклад називався AppSum, Новий приклад нзивается AppWorkers, і різниця між ними дуже маленька, але важлива – ми викликаємо процедуру відображення в окремому тред (процесі) оброблювачі допомогою
$system.WorkMgr.Queue
API. Це API може викликати просту підпрограму, або метод класу, але (за природним причнима) не може викликати методи об'єкта, т. к. не передбачений механізм передачі об'єкта у зовнішній процес.
При виклику паралельного обробника через це API накладаються додаткові обмеження на типи переданих значень:
  • Ми не можемо передавати значення за посиланням і, як наслідок, не можемо повертати змінені значення таких аргументів;
  • Більш того, ми можемо передавати тільки прості скалярні значення (числа і рядки), але не об'єкти.
Але тут, дорогий Х'юстон, у нас проблема. У попередньому прикладі метод
MR.Sample.WordCount.AppSum::Map
отримував в якості 2го аргументу екземпляр класу
MR.Sample.GlobalPipe
. Але ми не можемо передавати об'єкти між процесами (а worker – це окремий процес з пулу процесів). І в даному випадку, нам потрібно придумати просту схему «серіалізації»/«десеріалізації» об'єкта в литеральные значення, для того щоб це можна було передати в паралельний обробник через
$system.WorkMgr.Queue
API.
У випадку з
GlobalPipe
"простий метод серіалізації" – дійсно виходить простим. Якщо передати ім'я проміжного глобал то цього досить для адекватної передачі стану нашого об'єкта. Ось чому другим аргументом методу
MR.SampleWordCount.AppWorkers::Map
стає рядок з ім'ям глобал, а не об'єкт.
Рекомендуємо прочитати документацію по паралельним обробників здесь, але на майбутнє запам'ятайте, що якщо ви хочете використовувати паралельні обробники (в максимальній кількості, яке дозволено при вашому залозі і ліцензії) то при ініціалізації обробників вам варто передати параметр з дивним ім'ям "
/multicompile=1
". [Дивне ім'я пояснюється тим, що ця функціональність була додана для паралельної компіляції в транслятору класів Caché ObjectScript. З тих пір цей модифікатор став використовуватися і поза коду транслятора.]
Як тільки ми запланували виконання методу через $system.WorkMgr.Queueми можемо запустити всі заплановані підпрограми і дочекатися їх завершення через $system.WorkMgr.WaitForComplete.
Всі паралельні обробники будуть використовувати один і той же проміжний глобал
infraPipe
для передачі даних між стадіями конвеєра, але колізій з даними не варто очікувати, оскільки нижележащий рушій даних відпрацює їх коректно. Нагадаємо, що архітектура Caché спочатку многопроцессная, з безліччю масштабованих механізмів синхронізації між процесами, які працюють з одними і тими ж даними. Додатково зауважимо, що наш спрощений приклад з обчисленням загальної кількості слів у всіх томах виконує згортку (reducer) в одному потоці, що також спрощує код і позбавляє нас від деякої головного болю.
Таким чином, на поточний момент ми встигли розповісти про загальних термінах алгоритмів MapReduce, створили базові інтерфейси MapReduce при реалізації їх у контексті середовища Caché ObjectScript, і створили в цьому ж середовищі простий приклад з підрахунком слів. У наступній статті ми покажемо інші використовувані в нашій реалізації ідіоми, використовуючи другий класичний приклад з WikiPedia – AgeAverage. Все тільки починається!
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.