10 порад по використанню ExecutorService

Пропоную читачам «Хабрахабра» переклад публікації «ExecutorService — 10 tips and tricks».



Абстракція ExecutorService була представлена ще в Java 5. На дворі йшов 2004 рік… На секунду – зараз Java 5 і 6 більше не підтримуються і Java 7 готується поповнити список. А багато Java-програмісти, як і раніше не повною мірою розуміють як працює ExecutorService. У вашому розпорядженні безліч джерел, але зараз я хотів би розповісти про мало відомих тонкощах і практиках по роботі з нею.

1. Именуйте пули потоків

Не можу не згадати про це. При дампинге або під час дебаггінга можна помітити, що стандартна схема іменування потоку наступна: pool-N-thread-M, де N означає послідовний номер пулу (кожен раз, коли ви створюєте новий пул, глобальний лічильник N инкрементится), а M – порядковий номер потоку в пулі. Наприклад, pool-2-thread-3 означає третій потік у другому пулі життєвого циклу JVM. Див.: Executors.defaultThreadFactory(). Не дуже інформативно, чи не правда? JDK трохи ускладнює правильне іменування потоків, т. к. стратегія іменування прихована всередині ThreadFactory. На щастя, Google Guava має вбудований клас для цього:

import com.google.common.util.concurrent.ThreadFactoryBuilder;

final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("Замовлення-%d")
.setDaemon(true)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

За замовчуванням створюються non-daemon пули потоків, вирішуйте самі де які доречніше.

2. Змінюйте імена в залежності від контексту

Про цей трюк я дізнався зі статті «Supercharged jstack: How to Debug Your Servers at 100mph». Раз ми знаємо про імена потоків, ми можемо змінювати їх рантайме, коли захочемо! Це має сенс, оскільки дамп потоку містить імена класів і методів без параметрів і локальних змінних. Включаючи деяку важливу інформацію в ім'я потоку, ми можемо легко простежити які повідомлення/запису/запити і т. п. гальмують систему або викликають взаємне блокування.

private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName("Обробка-" + messageId);
try {
//основна логіка...
} finally {
currentThread.setName(oldName);
}
});
}

Всередині блока try-finally поточний потік називаєтьсяОбробка-ID-поточного-повідомлення. Це може стати в нагоді при відстеженні потоку повідомлень через систему.

3. Явне і безпечне завершення

Між клієнтськими потоками і пулом потоків стоїть черга завдань. Коли програма завершує роботу, ви повинні потурбуватися про дві речі: що станеться з завданнями, які очікували в черзі, і як поведуть себе вже виконуються завдання (про це пізніше). Дивно, але багато розробники не закривають пул потоків належним чином. Є два способи: або дозвольте відпрацювати всіх завдань у черзі (shutdown()), або видаліть їх (shutdownAll()) – залежно від конкретного випадку. Наприклад, якщо ми поставили в чергу набір завдань і хочемо повернути управління як тільки всі вони виконуються, використовуємо shutdown():

private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);
log.debug("Всі листи були відправлені? {}", done);
}


У цьому прикладі ми відправляємо пачку листів, кожне в окремій задачі через пул потоків. Після постановки цих завдань в чергу ми закриваємо потік, щоб він більше не міг прийняти нових завдань. Далі ми чекаємо максимум одну хвилину поки всі завдання не будуть виконані. Однак, якщо якісь завдання ще не завершилися, awaitTermination() просто поверне false. Крім того, залишилися завдання продовжать виконуватися. Знаю, хіпстери готові піти на:
emails.parallelStream().forEach(this::sendEmail);

Кличте мене старомодним, але мені подобається контролювати кількість паралельних потоків. А альтернатива поступового завершення shutdown()shutdownAll():
final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Відхилені завдання: {}", rejected.size());

На цей раз всі стоять у черзі завдання відкидаються і повертаються. Вже запущеним завданням дозволено продовжити роботу.

4. Обробляйте переривання потоку з обережністю

Менш відома особливість інтерфейсу Future – можливість скасування. Далі наводиться одна з моїх попередніх статей: InterruptedException and interrupting threads explained.
Оскільки виняток InterruptedException явно пробрасываемое (checked), ніхто, швидше за все, навіть не замислювався про те, скільки помилок воно придушило за всі ці роки. І так як воно має бути оброблене, багато обробляють його неправильно або необдумано. Давайте розглянемо простий приклад потоку, який періодично робить якусь очищення, а в проміжках велику частину часу спить.

class Cleaner implements Runnable {

Cleaner() {
final Thread cleanerThread = new Thread(this, "Чистильник");
cleanerThread.start();
}

@Override
public void run() {
while(true) {
cleanUp();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

private void cleanUp() {
//...
}
}


Цей код жахливий зі всіх сторін!
  1. Запуск потоку з конструктора — часто погана ідея. Наприклад, деякі фреймворки, такі як Spring, люблять створювати динамічні підкласи для підтримки перехоплення методів. У кінцевому рахунку, ми отримаємо два потоку, запущені з двох примірників.
  2. Виняток InterruptedException проковтну, а не опрацьовано як слід.
  3. Цей клас запускає новий потік в кожному примірнику. Замість цього, він повинен використовувати ScheduledThreadPoolExecutor, який буде видавати одні і ті ж потоки для багатьох об'єктів, що більш надійно і ефективно.
  4. Крім того, за допомогою ScheduledThreadPoolExecutor ми можемо уникнути написання циклів засипання/роботи і перейти до роботи дійсно за розкладом.
  5. Останнє, але не менш важливе. Немає ніякого способу позбавитися від цього потоку, навіть якщо на примірник Чистильника більше ніхто не посилається.
Всі перераховані проблеми важливі, але придушення InterruptedException – найбільший гріх. Перш ніж ми зрозуміємо чому, давайте подумаємо, для чого це виняток потрібно і як ми можемо використовувати його переваги, щоб витончено переривати потоки. Багато блокуючі операції в JDK зобов'язують обробляти InterruptedException, наприклад:
  • Object.wait()
  • Thread.sleep()
  • Process.waitFor()
  • Process.waitFor()
  • Безліч блокуючих методів вjava.util.concurrent.*, наприклад ExecutorService.awaitTermination(), Future.get(), BlockingQueue.take(), Semaphore.acquire() Condition.await() і багато, багато інших
  • SwingUtilities.invokeAndWait()
Зверніть увагу, що блокує введення/виведення не прокидає InterruptedException (що сумно). Якщо всі ці класи декларують InterruptedException, ви можете бути здивовані, коли ці винятки будуть кинуті:
  • Коли потік блокується на якому-небудь метод, який декларує InterruptedException, і ви викликаєте Thread.interrupt() на такому потоці, швидше за все блокуючий метод негайно кине InterruptedException.
  • Якщо ви поставили завдання в пул потоків (ExecutorService.submit()) і викликали Future.cancel(true) поки вона ще виконується. У цьому випадку пул потоків буде намагатися перервати потік, що виконує цю задачу для вас, ефективно перервавши її.
Знаючи, що насправді являє собою InterruptedException, ми добре оснащені, щоб обробити його правильно. Якщо хтось намагається перервати наш потік, і ми виявили його, обробляючи InterruptedException, найбільш розумним буде дозволити завершити його, наприклад:
class Cleaner implements Runnable, AutoCloseable {

private final Thread cleanerThread;

Cleaner() {
cleanerThread = new Thread(this, "Cleaner");
cleanerThread.start();
}

@Override
public void run() {
try {
while (true) {
cleanUp();
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException ignored) {
log.debug("Interrupted, closing");
}
}

//... 

@Override
public void close() {
cleanerThread.interrupt();
}
}

Зверніть увагу, що блок try-finally в даному прикладі оточує цикл while. Таким чином, якщо sleep() викине InterruptedException, ми перервемо цей цикл. Ви можете заперечити, що ми повинні логировать стек виключення InterruptedException. Це залежить від ситуації. В даному випадку переривання потоку є очікуваною поведінкою, а не падінням. Загалом, на ваш розсуд. У більшості випадків потік перерветься під час sleep() і ми швиденько завершимо метод run() в цей же час. Якщо ви дуже обережні, то напевно запитаєте – а що буде, якщо потік перерветься під час виконання чищення cleanup()? Часто ви зіткнетеся з рішенням вручну виставити прапор, на зразок цього:
private volatile boolean stop = false;

@Override
public void run() {
while (!stop) {
cleanUp();
TimeUnit.SECONDS.sleep(1);
}
}

@Override
public void close() {
stop = true;
}

Пам'ятайте, що стоп-прапор (він повинен бути волатильним!) не буде переривати блокуючі операції, ми повинні дочекатися поки відробить метод sleep(). З іншого боку, цей явний прапор дає нам найкращий контроль, оскільки ми можемо спостерігати його в будь-який час. Виявляється, переривання потоків працює точно так само. Якщо хтось перервав потік, поки він виконував неблокирующие обчислення (наприклад, cleanUp()), такі обчислення не будуть перервані негайно. Однак потік вже позначений як перерваний, тому будь-яка наступна блокуюча операція, така як sleep() негайно перерветься і викине InterruptedException, тому ми не втратимо цей сигнал.
Ми також можемо скористатися цим фактом, якщо ми пишемо неблокуючий потік, який як і раніше хоче використовувати переваги механізму переривання потоків. Замість того, щоб покладатися на InterruptedException, ми просто повинні періодично перевіряти Thread.isInterrupted():
public void run() {
while (Thread.currentThread().isInterrupted()) {
someHeavyComputations();
}
}

Як видно, якщо хто-то перерве наш потік, ми скасуємо обчислення так скоро, наскільки дозволять попередня ітерація someHeavyComputations(). Якщо вона виконується так довго або нескінченно, ми ніколи не досягнемо прапора переривання. Примітно, що цей прапор не одноразовий. Ми можемо викликати Thread.interrupted() замістьisInterrupted(), що скине значення прапора і ми зможемо продовжити. Іноді ви можете захотіти проігнорувати прапор переривання і продовжити виконання. У цьому випадку interrupted() може стати в нагоді.

Якщо ви олдскульний програміст, ви можете згадати про метод Thread.stop(), який застарів 10 років тому. В Java 8 були плани з його «деимплементации», але в 1.8u5 він як і раніше з нами. Тим не менш, не використовуйте його і рефакторите будь-код, в якому він зустрінеться, використовуючи Thread.interrupt().

Іноді ви, можливо, захочете повністю ігнорувати InterruptedException. В цьому випадку зверніть увагу на клас Uninterruptibles з Guava. Він містить багато методів таких як sleepUninterruptibly() або awaitUninterruptibly(CountDownLatch). Просто будьте обережні з ними. Вони не декларують InterruptedException, але також повністю позбавляють потік від переривання, що досить незвично.

Отже, тепер у вас є розуміння того, чому деякі методи кидають InterruptedException:
  • Викинуті InterruptedException повинні бути адекватно оброблені в більшості випадків.
  • Придушення InterruptedException – часто погана ідея.
  • Якщо потік був перерваний під час неблокирующих обчислень. Використовуйте isInterrupted().


5. Слідкуйте за довжиною черги і встановіть кордон

Пули потоків неправильної розмірності можуть призвести до гальм, нестабільності і витокам пам'яті. Якщо ви налаштуєте занадто мало потоків, черга буде рости, споживаючи багато пам'яті. З іншого боку, занадто багато потоків будуть сповільнювати всю систему із-за частих перемикань контексту – і призведе до тих же симптомів. Важливо зберігати глибину черги і визначати її межі. А перевантажений пул може просто тимчасово відмовлятися від нових завдань.
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);

Вищенаведений код еквівалентний Executors.newFixedThreadPool(n), однак замість того, щоб використовувати за умовчанням необмежений LinkedBlockingQueue, ми використовуємо ArrayBlockingQueue з фіксованою ємністю в 100. Це означає, що якщо 100 завдань вже набрані, наступне завдання буде відхилена з виключенням RejectedExecutionException. Крім того, оскільки черга тепер доступна ззовні, ми можемо періодично справлятися про її розмір, щоб записати в лог, відправити в JMX і т. д.

6. Пам'ятайте про обробку винятків

Який результат виконання наступного коду?
executorService.submit(() -> {
System.out.println(1 / 0);
});

Я був засмучений тим, що як багато разів він не друкував нічого. Ніяких ознак java.lang.ArithmeticException: ділення на нуль, нічого. Пул потоків просто проковтує це виняток, неначе воно ніколи не викидалося. Якщо б це був потік, створений «з нуля», без обгортки у вигляді пулу, міг би спрацювати UncaughtExceptionHandler. Але з пулом потоків ви повинні бути більш обережні. Якщо ви відправили на виконання Runnable (без якого-небудь результату, як вище), ви повинні помістити все тіло методу всередину try-catch. Якщо ви ставите в чергу Callable, упевніться, що ви завжди дістаєте його результат з допомогою блокуючого get(), щоб знову кинути виняток:
final Future<Integer> division = executorService.submit(() -> 1 / 0);
//нижче буде викинуто ExecutionException, викликане ArithmeticException
division.get();

Примітно, що навіть Spring framework допустив цю помилку з @Async див.: SPR-8995 і SPR-12090.

7. Стежте за часом очікування в черзі

Моніторинг глибини робочої черзі односторонній. При вирішенні проблем з одиночної транзакцією/завданням, має сенс подивитися скільки часу пройшло між постановкою завдання і початком її виконання. Це час в ідеалі повинно прагнути до нуля (коли у пулі є простоює потік), однак воно буде збільшуватися по мірі постановки завдань в чергу. Крім того, якщо пул не має фіксоване число потоків, запуск нової задачі може вимагати народження нового потоку, що теж займе якийсь час. Щоб чітко виміряти цей показник, оберніть оригінальний ExecutorService у щось схоже:
public class WaitTimeMonitoringExecutorService implements ExecutorService {

private final ExecutorService target;

public WaitTimeMonitoringExecutorService(ExecutorService target) {
this.target = target;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug("Завдання {} провело в черзі {} мс", task, queueDuration);
return task.call();
}
);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}

@Override
public Future<?> submit(Runnable task) {
return submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run();
return null;
}
});
}

//...
}

Це не повна реалізація, але ви отримали загальне уявлення. У момент, коли ми поставили завдання в пул потоків, ми негайно засікли час. Ми зупинили секундомір, як тільки завдання була вилучена і відправлена на виконання. Не обманюйтеся близькістю startTime queueDuration у вихідному коді. Насправді ці два рядки виконуються в різних потоках, в мілісекундах або навіть секундах один від одного.

8. Зберігайте трасування стека клієнта

Реактивному програмування приділяється підвищена увага в наші дні: Reactive manifesto, reactive streams, RxJava (вже 1.0!), Clojure agents, scala.rx… Все це виглядає здорово, але стектрейс – більше не твій друг, він за великим рахунком марний. Розглянемо, наприклад, таке виключення, що виникає під час виконання завдання в пулі потоків:
java.lang.NullPointerException: null
at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
Ми можемо легко зауважити, що MyTask викинуло NPE у рядку 76. Але ми не маємо жодного уявлення, хто затвердив цю задачу, оскільки стек відноситься тільки до Thread ThreadPoolExecutor. Технічно, ми можемо просто переміщатися по коду в надії знайти тільки одну ділянку, де виконується постановка MyTask в чергу. Але без потоків (не кажучи вже про подійно-орієнтованою, реактивному і т. п. програмуванні), ми завжди можемо відразу всю картину цілком. Що якщо ми могли б зберегти стектрейс клієнтського коду (того, що ініціює завдання) і показати його, наприклад, при виникненні помилки? Ідея не нова, наприклад, Hazelcast поширює виключення з сайту-власника в клієнтський код. Нижче наведено простий приклад як зробити подібне:
public class ExecutorServiceWithClientTrace implements ExecutorService {

protected final ExecutorService target;

public ExecutorServiceWithClientTrace(ExecutorService target) {
this.target = target;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}

private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {
return () -> {
try {
return task.call();
} catch (Exception e) {
log.error("Виключення {} у завданні з потоку {}:", e, clientThreadName, clientStack);
throw e;
}
};
}

private Exception clientTrace() {
return new Exception("Клієнтський стектрейс");
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map(this::submit).collect(toList());
}

//...
}

В цей раз в разі невдачі, ми витягуємо повний стектрейс і назва потоку, де завдання було поставлено в чергу. Набагато більш цінна інформація по порівнянні зі стандартним винятком, розглянутим раніше:
Виняток java.lang.NullPointerException у завданні з потоку main:
java.lang.Exception: Клієнтський стектрейс
at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

9. Віддавайте перевагу CompletableFuture

В Java 8 був представлений більш потужний CompletableFuture. Будь ласка, використовуйте його там, де це можливо. ExecutorService не було розширено, щоб підтримувати цю абстракцію, так що ви повинні дбати про це самостійно. Замість:
final Future<BigDecimal> future = executorService.submit(this::calculate);


Використовуйте:
final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);


CompletableFuture розширює Future, так що все працює як раніше. Але більш просунуті користувачі вашого API по-справжньому оцінять розширену функціональність, що надається CompletableFuture.

10. Синхронні черзі

SynchronousQueue – цікава різновид BlockingQueue, яка насправді не черга. Це навіть не структура даних як така. Найкраще її можна позначити як черга з нульовою ємністю.
Ось що говорить JavaDoc:
Кожна додається операція повинна очікувати відповідної операції видалення в іншому потоці, і навпаки. Синхронна чергу не має ніякої внутрішньої ємності, навіть одиничним. Ви не можете заглянути в синхронну чергу, тому що елемент представлений, тільки при спробі його видалення; ви не можете вставити елемент (використовуючи будь-який метод), поки інший потік не видалить його: ви не можете обійти чергу тому як обходити нічого.
Синхронні черзі схожі на «rendezvous channels», використовувані в CSP і Ada.
Як все це відноситься до пулам потоків? Спробуємо використовувати SynchronousQueue разом з ThreadPoolExecutor:
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS,
queue);

Ми створили пул потоків з двома потоками і SynchronousQueue перед ним. Тому що, по суті SynchronousQueue — черзі з ємністю 0, такі ExecutorService буде тільки приймати нові завдання, якщо доступний простоює потік. Якщо всі потоки зайняті, нова задача буде негайно відхилена і ніколи не буде чекати черги. Такий режим може бути корисний для негайної обробки у фоновому режимі, якщо це можливо.

От і все, сподіваюся, ви відкрили для себе як мінімум одну цікаву фічу!

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.