Haskell. Тестуємо багатопотокове застосування

    Дана стаття складена викладачем Академічного університету Валерієм Ісаєвим за матеріалами практики по курсу функціонального програмування.
 
Вважаю, ні для кого не секрет, що написання багатопоточних додатків пов'язано з цілим рядом проблем, відсутніх при розробці однопоточних програм.
Одна з проблем полягає в тестуванні програми.
Ми не можемо контролювати порядок, в якому виконуються операції, отже, не піддається контролю і результат виконання програми. Навіть якщо ми отримаємо помилку, наступити на ті ж граблі вдруге буде не так-то просто.
Хочу запропонувати невеликої рецепт того, як можна протестувати багатопотокове застосування.
З інгредієнтів нам знадобляться:
haskell
,
QuickCheck
, трохи монад, сіль / перець за смаком.
 
 

Робочий приклад

Як робочий прикладу візьмемо задачу про обідають філософів.
 
 
MVar a
— це посилання, яка або містить значення типу a, або порожня.
 
putMVar ref x
кладе за посиланням ref значення x.
 
takeMVar ref
зчитує вміст посилання, залишаючи її після цього порожній.
Якщо вона вже була порожня, то потік засинає, поки в неї не запише небудь інший потік.
 
()
— це тип, що має єдине значення, яке позначається так само, як і сам тип —
()
.
Вилки ми моделюємо посиланнями типу
MVar ()
.
Таким чином, у вилки може бути два стани: якщо вилка зайнята небудь філософом — вона порожня; якщо вилка вільна — вона містить значення
()
.
 
 
import System.Random
import Control.Monad
import Control.Concurrent
import Control.Monad.Cont
import Control.Monad.Trans
import Data.IORef
import Test.QuickCheck
import Test.QuickCheck.Gen
import Test.QuickCheck.Monadic

-- sleep останавливает поток на рандомное количество секунд (от 0 до 0.3)
sleep :: IO ()
sleep = randomRIO (0, 300000) >>= threadDelay

phil
    :: Int      -- Номер философа.
    -> MVar ()  -- Ссылка на левую вилку.
    -> MVar ()  -- Ссылка на правую вилку.
    -> IO ()
phil n leftFork rightFork = forever $ do
    putStrLn $ show n ++ " is awaiting"
    sleep
    takeMVar leftFork
    putStrLn $ show n ++ " took left fork"
    -- sleep
    takeMVar rightFork
    putStrLn $ show n ++ " took right fork"
    sleep
    putMVar leftFork ()
    putMVar rightFork ()
    putStrLn $ show n ++ " put forks"
    sleep

runPhil :: Int -> IO ()
runPhil n = do

    -- Создаем ссылки, которые представляют вилки.
    forks <- replicateM n $ newMVar ()

    -- Запускаем 5 потоков, в каждом выполняем функцию phil.
    forM_ [1..n] $ \i -> forkIO $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))

main = do
    runPhil 5

    -- Если главный поток завершится, программа остановится, поэтому мы его усыпляем навечно.
    forever (threadDelay 1000000000)

У цій програмі може трапитися Дедлок.
Щоб помилуватися на нього, можна розкоментувати рядок —
sleep 
і трохи почекати.
Наша мета — написати тести, які б виявили цю помилку.
Але перш ніж ми зможемо це зробити, варто зрозуміти, як ми будемо управляти порядком виконання операцій. Для цього, замість IO, використовуємо іншу Монада.
 
Узагальнимо визначення функцій
sleep
,
phil 
і
runPhil
, щоб вони працювали і для інших монад.
 
 
sleep :: MonadIO m => m ()
sleep = do
    r <- liftIO $ randomRIO (0, 100)
    r `times` liftIO (threadDelay 300)
  where
    times :: Monad m => Int -> m () -> m ()
    times r a = mapM_ (\_ -> a) [1..r]

Тепер функція
sleep 
може працювати з будь-монадою, яка підтримує IO операції. У класі
MonadIO 
визначена всього одна функція
liftIO
, яка дозволяє це робити.
Зауважимо, що замість того, щоб один раз засипати на рандомноє число секунд, ми засипаємо рандомноє число раз на 0.3 мілісекунди. Причина в тому, що в нашій монаді дії всередині
liftIO 
виконуються атомарно. Відповідно, час, на який ми засинаємо, ні на що не впливає, важливо тільки, скільки разів ми це робимо.
 
Оскільки наша монада буде працювати в одному потоці,
MVar 
для нас даремні, і ми пізніше визначимо свій тип посилань, виходячи з того, щоб функція
phil 
могла працювати і з
MVar
, і з іншими типами посилань.
Для цього визначимо клас монад
MonadConcurrent
, в якому будуть операції для створення, читання і запису за посиланням, а також для створення потоків.
 
 
class Monad m => MonadConcurrent m where
    type CVar m :: * -> *
    newCVar :: a -> m (CVar m a)
    takeCVar :: CVar m a -> m a
    putCVar :: CVar m a -> a -> m ()
    fork :: m () -> m ()

 
Тут ми використовували сімейства типів, які є розширенням мови.
В даному випадку нам потрібно це розширення, щоб ми могли визначати для різних монад різні типи посилань.
Для використання розширення потрібно додати наступний рядок в початок файлу (і заодно підключити розширення, які знадобляться пізніше):
 
 
{-# LANGUAGE TypeFamilies, ExistentialQuantification, GeneralizedNewtypeDeriving #-}

Визначимо
instance 
цього класу для монади IO.
Тут все легко: ми просто використовуємо відповідні операції для
MVar
.
 
 
instance MonadConcurrent IO where
    type CVar IO = MVar
    newCVar = newMVar
    takeCVar = takeMVar
    putCVar = putMVar
    fork m = forkIO m >> return ()

Узагальнимо функції
phil 
і
runPhil
.
 
 
phil :: (MonadIO m, MonadConcurrent m) => Int -> CVar m () -> CVar m () -> m ()
phil n leftFork rightFork = forever $ do
    liftIO $ putStrLn $ show n ++ " is awaiting"
    sleep
    takeCVar leftFork
    liftIO $ putStrLn $ show n ++ " took left fork"
    takeCVar rightFork
    liftIO $ putStrLn $ show n ++ " took right fork"
    sleep
    putCVar leftFork ()
    putCVar rightFork ()
    liftIO $ putStrLn $ show n ++ " put forks"
    sleep

runPhil :: (MonadIO m, MonadConcurrent m) => Int -> m ()
runPhil n = do
    forks <- replicateM n $ newCVar ()
    forM_ [1..n] $ \i -> fork $ phil i (forks !! (i - 1)) (forks !! (i `mod` n))

Запустимо програму і переконаємося, що вона працює як раніше.
 
 

Монада Concurrent

А тепер починається найцікавіше.
 
Визначимо монаду, в якій будемо працювати (забігаючи вперед, скажу, що називається вона
Cont
). Також ризикну припустити, що
Cont 
— одна з найскладніших і найбільш потужних монад одночасно.
Використовуючи цю монаду, з потоком управління можна робити все що завгодно: наприклад, замість того, щоб виконувати дії, можна їх зберегти в структурі (з цією метою оголосимо тип
Action
) і виконати їх пізніше, можливо, в іншому порядку.
 
 
data Action = Atom (IO Action)
            | forall a. ReadRef (MaybeRef a) (a -> Action)
            | forall a. WriteRef (MaybeRef a) a Action
            | Fork Action Action
            | Stop

Давайте розберемося окремо з кожним конструктором.
Дія
Stop 
означає, що обчислення завершилися.
Дія
Fork 
означає, що обчислення гілкуються, тобто тепер у нас є два потоки, які можуть виконуватися одночасно.
Дія
Atom 
виконує атомарне IO операцію, що повертає нам новий
Action
, в якому знаходиться дію, що слід виконати на наступному кроці.
 
Наприклад:
Функція
getSum 
задає дію, яке зчитує два числа з клавіатури, друкує їх суму і завершується.
 
 
getSum :: Action
getSum = Atom $ do
    x <- readLn             -- считываем первое число
    return $ Atom $ do      -- возвращаем продолжение
        y <- readLn         -- считываем второе число
        return $ Atom $ do  -- возвращаем продолжение
            print (x + y)   -- печатаем сумму
            return Stop     -- возвращаем продолжение

Далі:
Дія
WriteRef ref val act
записує значення
val 
за посиланням
ref
, в
act 
знаходиться продовження.
Дія
ReadRef ref act
зчитує значення за посиланням
ref
,
act 
приймає це значення і повертає продовження.
Щоб у
Action 
можна було зберігати посилання довільних типів, ми використовуємо ще одне розширення мови — екзистенціальну квантифікацію.
 
Тип
MaybeRef 
представляє тип посилань, які ми будемо використовувати замість
MVar
, і визначається він як посилання на
Maybe
.
 
 
newtype MaybeRef a = MaybeRef (IORef (Maybe a))

Тепер ми можемо визначити нашу Монада.
Як я і обіцяв, ми просто обертаємо монаду
Cont
.
 
 
newtype Concurrent a = Concurrent (Cont Action a) deriving Monad

Монада
Cont Action
влаштована таким чином.
Замість того щоб повертати значення типу
a
, вона приймає продовження типу
(a -> Action)
, передає в цю функцію значення і повертає результат.
Тобто можна вважати, що
Cont Action a = (a -> Action) -> Action
.
Якщо точніше, у нас є наступна пара функцій, які переводять
(a -> Action) -> Action
в
Cont Action a
і назад.
 
 
cont :: ((a -> Action) -> Action) -> Cont Action a.
runCont :: Cont Action a -> (a -> Action) -> Action

Тепер ми можемо визначити instance класів
MonadIO 
і
MonadConcurrent
.
 
 
instance MonadIO Concurrent where
    liftIO m = Concurrent $ cont $ \c -> Atom $ do
        a <- m
        return (c a)

Давайте подивимося, що тут відбувається.
 
liftIO 
приймає IO операцію і обертає її в атомарний дію. А саме: ми в
Cont 
передаємо функцію, яка приймає продовження (тобто c має тип
a -> Action
) і повертає атомарну дію, яке виконує IO операцію
m
.
Ми визначили
Atom
так, що атомарна операція повинна повертати
Action
, що є продовженням.
Власне це ми і робимо: після того як ми виконали
m
, ми викликаємо
c
, яке і повертає необхідне продовження.
 
Тепер визначимо
instance MonadConcurrent
.
Створюємо в
newCVar 
посилання, використовуючи тільки що певну функцію
liftIO
.
У
takeCVar 
і
putCVar 
повертаємо відповідну дію, а продовження зберігаємо всередині цієї структури.
У fork повертаємо дію, в якому збережені обидва потоки: один передається в аргументи функції
fork
, другий приходить з продовження.
 
 
instance MonadConcurrent Concurrent where
    type CVar Concurrent = MaybeRef 
    newCVar a = liftIO $ liftM MaybeRef $ newIORef (Just a)
    takeCVar v = Concurrent $ cont (ReadRef v)
    putCVar v a = Concurrent $ cont $ \c -> WriteRef v a $ c ()
    fork (Concurrent m) = Concurrent $ cont $ \c -> Fork (runCont m $ \_ -> Stop) $ c ()

Наша монада практично готова, залишилося тільки навчитися її запускати.
Для початку напишемо функцію, яка запускає
Action
. Вона приймає список дій, кожен елемент в якому — окремий потік.
Стратегії по запуску дій можуть бути різними. Визначимося з двома моментами: в якому порядку виконувати потоки, і що робити, якщо ми намагаємося вважати значення з змінної, яка порожня. Нагадаю, що у змінній може нічого не лежати, і тоді нам потрібно дочекатися, коли інший потік в неї що-небудь покладе.
Давайте спочатку напишемо просту версію, де будемо виконувати потоки по черзі; а потік, який намагається вважати з порожньої змінної, будемо переміщати в кінець черги.
 
 
runAction :: [Action] -> IO ()
    -- Если потоков не осталось, завершаемся.
runAction [] = return ()

    -- Выполняем атомарное действие, а продолжение, которое оно возвращает, кладем в конец очереди.
runAction (Atom m : as) = do
    a' <- m
    runAction $ as ++ [a']

    -- Кладем два новых потока в конец очереди.
runAction (Fork a1 a2 : as) = runAction $ as ++ [a1,a2]

    -- Продолжаем запускать остальные потоки.
runAction (Stop : as) = runAction as

runAction (ReadRef (MaybeRef ref) c : as) = do

    -- Считываем содержимое ссылки.
    ma <- readIORef ref
    case ma of

        -- Если там было что-то, то 
        Just a -> do

            -- Опустошаем содержимое ссылки.
            writeIORef ref Nothing

            -- Кладем в конец очереди продолжение.
            runAction (as ++ [c a])

        -- Если там ничего не было, то нужно попробовать считать эту ссылку позже, поэтому добавляем в конец очереди то же самое действие.
        Nothing -> runAction (as ++ [ReadRef (MaybeRef ref) c])

-- Записываем по ссылке значение, продолжением кладем в конец очереди.
runAction (WriteRef (MaybeRef ref) val a : as) = do
    writeIORef ref (Just val)
    runAction (as ++ [a])

Зауважте, що
putMVar
працює трохи інакше, ніж наша реалізація
WriteRef
.
Якщо за посиланням вже було якесь значення, то
putMVar
заморозить потік, поки змінна не звільниться. У цьому випадку перезапишіть значення.
Версію, що працює як
putMVar
, створювати в даній ситуації не варто, щоб не переусложнять код.
 
Далі пишемо функцію, яка запускає
Concurrent
, і перевизначають
main
.
 
 
runConcurrent :: Concurrent () -> IO ()
runConcurrent (Concurrent c) = runAction [runCont c $ \_ -> Stop]

main = runConcurrent (runPhil 5)

Так як тепер ми працюємо в одному потоці, і
threadDelay
зупиняє всю роботу, швидкість трохи знизилася.
 
 

Пишемо тести

Настав час «додати в блюдо приправу» — написати тести для нашого прикладу.
Для цього використовуємо бібліотеку
QuickCheck
, генеруючу випадкові вхідні дані для тестів. Оскільки ми хочемо запускати наші потоки в різних порядках, то вхідні дані для наших тестів — це порядок, в якому ми вибираємо черговий потік зі списку.
Можна закодувати вхідні дані списком чисел, але проблема в тому, що ми не знаємо заздалегідь, з якого діапазону слід вибирати ці числа, так як число потоків може змінюватися.
Тому кодувати вхідні дані ми будемо списком функцій типу
Int -> Int
, які беруть число
n
і повертають число з інтервалу
[0,n-1]
.
 
 
newtype Route = Route [Int -> Int]

Клас
Arbitrary
, що надається бібліотекою
QuickCheck
, призначений для опису типів, що дозволяють генерувати елементи випадковим чином.
У цьому класі оголошено дві функції —
shrink
і
Arbitrary
.
У
shrink 
є реалізація за замовчуванням, так що перевизначати її не будемо.
У функції
Arbitrary
згенеруємо список випадкових функцій, де кожна функція повертає число з інтервалу
[0,n-1]
.
 
 
instance Arbitrary Route where
    arbitrary = fmap Route (listOf arbitraryFun)
      where
        arbitraryFun = MkGen $ \q s n -> unGen (choose (0, n - 1)) q s

Визначаємо також
instance Show 
для
Route
, оскільки цього вимагає
QuickCheck
.
На жаль, занадто корисний
show
ми написати не можемо. Більше того, ця функція використовуватися не буде, тому ми залишаємо її невизначеною.
 
 
instance Show Route where
    show = undefined

Тепер можна приступити до написання більш розумною версії
runAction
.
Перша відмінність полягає в тому, що ми розділимо виконання атомарних дій і роботу з посиланнями.
Для початку напишемо допоміжну функцію
skipAtoms
, що виконує атомарні дії: функція приймає список дій, виконує
Atom
,
fork
і
Stop
, решта повертає в якості результату.
 
 
skipAtoms :: [Action] -> IO [Action]
skipAtoms [] = return []
skipAtoms (Atom m : as) = do
    a <- m
    skipAtoms (as ++ [a])
skipAtoms (Fork a1 a2 : as) = skipAtoms (as ++ [a1,a2])
skipAtoms (Stop : as) = skipAtoms as
skipAtoms (a : as) = fmap (a:) (skipAtoms as)

Друга відмінність нової версії
runAction 
від колишньої полягає в тому, що ми відстежуємо отримання Дедлок.
Для цього заводимо два списки дій. У першому зберігаються активні (виконувані нами) потоки. У другому — потоки, що чекають поновлення небудь посилання.
Якщо список активних потоків порожній, а списку чекають немає, значить, ми отримали Дедлок, і в цьому випадку кидаємо виняток.
 
Третє нововведення — аргумент типу
Route
, використовуваний для вибору номера потоку, який слід виконати на поточному кроці.
 
 
runAction :: Route -> [Action] -> [Action] -> IO ()
runAction _ [] [] = return ()
runAction _ [] _ = fail "Deadlock"
runAction (Route []) _ _ = return ()
runAction (Route (r:rs)) as bs = do
    as <- skipAtoms as
    let n = length as
    case splitAt (r n) as of
        (as1, ReadRef (MaybeRef ref) c : as2) -> do
            ma <- readIORef ref
            case ma of
                Just a -> do
                    writeIORef ref Nothing
                    runAction (Route rs) (as1 ++ [c a] ++ as2) bs
                Nothing -> runAction (Route rs) (as1 ++ as2) (bs ++ [ReadRef (MaybeRef ref) c])
        (as1, WriteRef (MaybeRef ref) x c : as2) -> do
            writeIORef ref (Just x)
            runAction (Route rs) (as1 ++ [c] ++ as2 ++ bs) []

Функція
runConcurrent 
практично не змінилася.
 
 
runConcurrent :: Route -> Concurrent () -> IO ()
runConcurrent r (Concurrent c) = runAction r [runCont c $ \_ -> Stop] []

Можна перевірити, як працює нова версія, передавши як перший аргумент
round_robin
. Це проста стратегія виконання, аналогічна тому, як функція
runAction 
працювала раніше. Тут ми просто генеруємо нескінченний список і для кожного елемента беремо залишок по модулю числа потоків.
 
 
round_robin :: Route
round_robin = Route $ map rem [0..]

Запустивши обчислення на цих вхідних даних, ми, швидше за все, швидко отримаємо Дедлок — унаслідок того, що робота нашого прикладу побудована на основі генератора випадкових чисел — отже, не дивлячись на те, що вхідні дані завжди одні й ті ж, порядок виконання виявляється випадковим.
Якби наш приклад був більш детермінований, нам довелося б варіювати вхідні дані випадковим чином, що ми зараз і зробимо.
 
 
main = quickCheck $ monadicIO $ do
    r <- pick arbitrary
    run $ runConcurrent r (runPhil 5)

Ми вибираємо довільний елемент типу
Route
, використовуючи реалізовану нами раніше функцію
Arbitrary
. Після чого запускаємо наше обчислення на цьому вході.
Про інше подбає
QuickCheck
, а саме: запустить наш тест 100 раз, з кожним разом збільшуючи розмір вхідних даних.
 
Запустивши програму, ми побачимо наступне:
 
 
...
3 took left fork
4 put forks
4 is awaiting
5 took left fork
4 took left fork
1 took right fork
1 put forks
1 is awaiting
1 took left fork
2 took left fork
*** Failed! Exception: 'user error (Deadlock)' (after 36 tests):

Що і потрібно отримати!
 
 

Висновок

Ми навчилися писати тести, які можуть виявляти стан Дедлок в багатопотоковому додатку.
У процесі ми бачили приклади використання монади
Cont
, сімейств типів, екзистенціальної квантифікації і бібліотеки
QuickCheck
.
Крім того, ми дізналися, як можна зібрати модель багатопотокового виконання програми з підручних матеріалів.
    
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.