Ще раз про багатопоточності в один рядок

Нещодавно знадобилося мені в моєму проекті на Flask прискорити відповідь сервера. З-за того, що під view послідовно викликається запит до трьох віддалених веб-сервісів, час завантаження сторінки з даними не з кеша доходило до 10 сек. Так, можливо, Flask не той фреймворк, який варто використовувати, але що маємо, то маємо.
Отже, приступимо. Оскільки реальний код публікувати я не можу, розгляну на академічних прикладах.

Завдання 1 Є три функції a, b ,c, які необхідно викликати в окремих потоках, дочекатися результату їх виконання і видати відповідь.
Для вирішення завдання 1 я скористався цим перекладом, бо був зачарований простотою використання бібліотеки.
import multiprocessing.dummy as multiprocessing
import time

def a():
time.sleep(2)
return 'a'
def b():
time.sleep(2)
return 'b'
def c():
time.sleep(1)
return 'c'

p = multiprocessing.Pool()

results = p.map(lambda f: f(),[a,b,c])
print(results)
p.close()
p.join()



Результат виконання коду:
['a', 'b', 'c']

Чудово, але є суттєвий мінус. Час виконання коду не обмежена, він буде чекати результату виконання всіх процедур. Змінюємо формулювання задачі.

Завдання 2 Є три функції a, b ,c, які необхідно викликати в окремих потоках, і через інтервал часу перевірити, завершилися вони чи ні, видати результат.

Для вирішення використовуємо ту ж бібліотеку, але вже функцію map_async. Її відмінність в тому, що вона повертає об'єкт AsyncResult.

import multiprocessing.dummy as multiprocessing
import time

def a():
time.sleep(2)
return 'a'

def b():
time.sleep(2)
return 'b'

def c():
time.sleep(1)
return 'c'

p = multiprocessing.Pool()

result = p.map_async(lambda f: f(),[a,b,c])

TIMEOUT =3
print(results.get(TIMEOUT))

p.close()
p.join()



Результат виконання при TIMEOUT>=3 такою ж, як і в попередньому випадку, але якщо хоч одна з процедур не встигає завершиться, видається виняток TimeoutError. Однак і цей результат мене не влаштував цілком. Справа в тому, що в моєму випадку мені істотно було, щоб встигала відпрацювати одна функція, інші могли бути відсутньою при видачі.

Завдання 3 Є три функції a, b ,c, які необхідно викликати в окремих потоках, дочекатися результату функції a.

import multiprocessing.dummy as multiprocessing
import time


def a():
time.sleep(2)
print(1)
return 'a'


def b():
time.sleep(3)
print(2)
return 'b'

def c():
time.sleep(1)
print(3)
return 'c'

p = multiprocessing.Pool()

results=[]
for r in p.imap(lambda f: f(),[a,b,c]):
results.append®
break

print(results)
p.close()
p.join()




Результат виконання:
3
1
['a']
2


Як видно, хоча відпрацювали 2 функції з 3, результат ми отримали тільки для пріоритетною. Щоб отримати результат другої, слід використовувати imap_unordered:

results=[]
for r in p.imap_unordered(lambda f: f(),[a,b,c]):
results.append®
if r =='a':
break

Результат:
3
1
['c', 'a']
2


Що, якщо нам в основному потоці потрібен результат тільки одного потоку, найбільш швидкого? Досить прибрати виклик p.join() з попереднього прикладу і вийти з циклу з першого результату.

Тепер ще такий момент. При спробі використовувати модуль multiprocessing, який працює з процесами, замість multiprocessing.dummy, що працює з тредами буде видана помилка серіалізації cPickle.PicklingError, оскільки при межпроцессном взаємодії не вдається сериализации функцію. Для того, щоб код працював, потрібно ввести функцію-псевдонім, код буде не настільки гарний, але:
import multiprocessing
import time

def a():
time.sleep(2)
return 'a'
def b():
time.sleep(2)
return 'b'
def c():
time.sleep(1)
return 'c'

def func(param):
if param == 'a':
return a()
elif param == 'b':
return b()
elif param == 'c':
return c()

p = multiprocessing.Pool()

results = p.map(func,['a','b','c'])
print(results)
p.close()
p.join()


Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.