В Java версии 1.5 был добавлен новый пакет, содержащий много полезных
возможностей, касающихся синхронизации и параллелизма:
java.util.concurrent. К сожалению, ему практически не было уделено
внимания в литературе по языку Java, возможно, поэтому предоставляемые
возможности не сильно популярны среди разработчиков. В данной статье мы
постараемся устранить это упущение и познакомить читателей с основными
элементами этого пакета.
Начнем же с некоторых базовых понятий и обзора пакета java.util.concurrent.
Синхронизация – это обмен данными между некоторым числом потоков и процессов.
Параллелизм – это искусство эффективно выполнять приложением несколько задач одновременно, избегая всяческих конфликтов данных.
Перейдем
непосредственно к главной теме, т.е. новым возможностям в реализации
синхронизации и параллелизма в Java приложениях. Их можно разделить на
следующие группы:
-
Executor Framework
-
Synchronization
-
Вспомогательные классы
-
Lock объекты
-
Синхронизаторы
-
Параллельные коллекции
Executors
Пакет java.util.concurrent содержит три Executor-интерфейса:
- Executor
- ExecutorService
- ScheduledExecutorService
Executor
Данный интерфейс был введен для отделения самого процесса отправки
задачи на выполнения от механизма выполнения каждой конкретной задачи.
Классы, реализующие данный интерфейс, представляют объекты, которые и
определяют, как выполнять каждую конкретную задачу.
ExecutorService
Данный интерфейс является расширением интерфейса Executor и добавляет следующие полезные возможности:
- Возможность остановить выполняемый процесс
- Возможность выполнения не только Runnable объектов, но и
java.util.concurrent.Callable. Основное их отличие от Runnable объектов –
возможность возвращать значение потоку, из которого делался вызов. - Возможность возвращать вызывавшему потоку объект
java.util.concurrent.Future, который содержит среди прочего и
возвращаемое значение.
ScheduledExecutorService
Данный интерфейс по сути дела является все тем же ExecutorService, но
с возможностью откладывать начало выполнения задач на определенный
промежуток времени, либо планировать выполнение задач через заданный
временной интервал.
Пример использования
public class CallableImpl implements Callable<Integer> {
public Integer call() {
//…
return new Integer(someValue);
}
}
//…
Callable<Integer> callable = new CallableImpl();
ExecutorService executor = Executors.newFixedThreadPool(5);
Future<Integer> future = executor.submit(callable);
try {
System.out.println( "Future value: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
Путем вызова Executors.newFixedThreadPool был создан пул на 5
поток. Тем самым, в случае необходимости создания большого числа потоков
будет экономиться время на создание нового потока путем использования
существующих потоков из пула. Вызов метода get у объекта типа Future
привел к ожиданию текущим потоком возвращаемого значения.
Атомарные переменные
Атомарные классы предоставляют возможность атомарного выполнения операций основных примитивных и ссылочных типов.
В
качестве примера рассмотрим класс AtomicInteger и его основные методы.
Как понятно из названия, данный класс является оберткой вокруг
примитивного типа int, предоставляющей возможность атомарно обновлять
его значения.
Рассмотрим основные методы данного класса:
-
public final int get() – получение текущего значения
-
public final void set(int newValue) – установка нового значения
-
public final int incrementAndGet() – атомарный аналог операции "++x”.
-
public final long getAndIncrement() – атомарный аналог операции "x++”.
-
public final int addAndGet(int delta) – атомарно увеличивает
текущее значение на delta и возвращает новое значение. public final int
getAndAdd(int delta) – атомарно увеличивает текущее значение на delta и
возвращает предыдущее значение.
-
public final long getAndSet(long newValue) – атомарная установка
нового значения и возвращение предыдущего. public final boolean
compareAndSet(int expect, int update) – атомарная установка нового
значения в случае, если текущее равно значению параметра expect. В
случае успешной операции метод возвращаем булево значение true.
Пример использования 1
private AtomicInteger someValue;
//…
int previousBits, newBits;
do {
previousValue = someValue.get();
newValue = changeValue(previousValue);
} while (!someValue.compareAndSet(previousValue, newValue));
В приведенном выше примере совершается попытка обновления
некоторого значения до тех пор, пока обновление не будет выполнено
успешно. Благодаря использованию атомарной переменной, мы избежали
необходимости использовать синхронизацию.
AtomicReferenceFieldUpdater
Довольно часто возникают ситуации, когда атомарное обновление
необходимо только в одном из многих случаев использования объекта.
Безусловно, в такой ситуации не хочется обременять себя работой с
атомарными типами и не иметь возможности работать с объектом напрямую.
Для этих целей прекрасно подойдет AtomicReferenceFieldUpdater. Само поле
класса, которое будет использовано совместно с
AtomicReferenceFieldUpdater, должно быть объявлено с ключевым словом
volatile. Из рассмотрено ниже примера станет ясно, каким образом это
можно сделать.
Пример использования 2
public class Node {
private volatile InnerNode next;
//...
}
//…
private
static final AtomicReferenceFieldUpdater<Node, InnerNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater(Node.class,
InnerNode.class, "next");
//…
nextUpdater.compareAndSet(currentNode, expectedInnerNode, newInnerNode);
Синхронизаторы
К синхронизаторам можно отнести различного рода структуры, которые
отвечают за координацию поток. Рассмотрим некоторые такие структуры,
которые были добавлены в пакете java.util.concurrency:
-
Семафоры
-
Барьеры
-
Обменники
-
Щеколда
Семафоры
В 1968 г. Э. Дейкстра предложил удобную форму механизма
захвата/освобождения ресурсов, которую он назвал операциями P и V над
считающими семафорами. Считающим семафором называют целочисленную
переменную, выполняющую те же функции, что и байт блокировки. Однако в
отличие от последнего она может принимать кроме "0" и "1" и другие целые
положительные значения.
Как можно понять из написанного выше, семафоры используются для ограничения числа потоков, которые используют некий ресурс.
Для выполнения операций P и V в Java классе java.util.concurrent.Semaphore существуют специальные методы.
acquire – пытается получить доступ к ресурсу и блокирует текущий поток до тех пор, пока ресурс не будет доступен.
tryAcquire - пытается получить доступ к ресурсу в момент вызова без блокировки текущего потока.
release – освобождение ресурса.
Следует отметить, что у Семафора может быть различной сама стратегия получения освободившегося ресурса.
Пример использования
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
Барьеры
Барьер это средство синхронизации, которое используется для того,
чтобы некоторое множество потоков ожидало окончания друг друга в
некотором месте, являющемся барьером или точкой синхронизации. После
того, как все потоки достигли точки синхронизации, они разблокируются и
могут продолжать выполнение.
На практике барьеры используются для
сбора результатов выполнения некоторой распараллеленной задачи. В
качестве примера можно рассмотреть задачу умножения матриц. При
распараллеливании данной задачи каждому потоку будет поручено умножения
определенных строк на определенные столбцы. В точке синхронизации же
полученные результаты собираются из всех потоков, и строится
результирующая матрица.
В пакете java.util.concurrent класс CyclicBarrier является реализацией барьера. Рассмотрим пример его использования ниже.
Пример использования
class Worker extends Thread {
//…
@Override
public void run() {
// Некоторое действие
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
//…
barrier = new CyclicBarrier(N, new Runnable() {
public void run() {
// Действия, которые выполняются при достижении барьера всеми потоками
}
});
for (int i = 0; i < N; i++) {
new Worker(barrier).start();
}
Обменники
Обменники – это средства синхронизации, которые используются для
обмена информацией между двумя потоками. Это делается посредством вызова
метода exchange потоками, желающими обменяться информацией. В качестве
параметра выступает значение, которое должно быть отдано другому потоку.
Пример использования
Exchanger<MyClass> exchanger = new Exchanger<MyClass>();
class Loop1 implements Runnable {
public void run() {
MyClass loop1Value = …
loop1Value = exchanger.exchange(loop1Value );
//…
}
}
class Loop2 implements Runnable {
public void run() {
MyClass loop2Value = …
loop2Value = exchanger.exchange(loop2Value );
//…
}
}
Щеколда
Щеколда – средство синхронизации, которое используются для того,
чтобы один или несколько потоков могли дождаться выполнения
определенного числа операции в других потоках.
Класс CountDownLatch
пакета java.util.concurrent является средством синхронизации, обладающим
вышеперечисленными свойствами. Данный класс работает по принципу
таймера. Происходит инициализация его некоторым начальным значением и
обратный отсчет. При вызове метода await данного класса каким-либо
потоком, он переходит в состояние ожидания момента достижения счетчиком
таймера значения 0.
На практике данный класс удобно использовать для
координации момента начала и окончания определенного числа потоков. Это
означает следующее:
- можно сделать так, чтобы определенное число потоков начиналось в один и тот же момент времени;
- можно отследить момент окончания нескольких потоков.
Пример использования 1
В первом примере рассмотрим первый случай из перечисленных выше,
т.е. начало выполнения нескольких потоков в один момент времени.
public class LatchedThread extends Thread {
private final CountDownLatch startLatch;
public LatchedThread(CountDownLatch startLatch) {
this.startLatch = startLatch;
}
public void run() {
System.out.println( "PreRun");
try {
startLatch.await();
System.out.println( "Run");
} catch (InterruptedException iex) {
}
}
}
//…
CountDownLatch startLatch = new CountDownLatch(1);
for (int threadNo = 0; threadNo < 4; threadNo++) {
Thread t = new LatchedThread(startLatch);
t.start();
}
Thread.sleep(200);
startLatch.countDown();
В данном примере счетчик инициализируется значением «1», и все потоки
ожидают момента перехода счетчика в состояние «0», прежде чем начать
выполнение.
Пример использования 2
Во втором примере рассмотрим второй случай из перечисленных выше, т.е. отслеживания момента окончания работы нескольких потоков.
public class StopLatchedThread extends Thread {
private final CountDownLatch stopLatch;
public StopLatchedThread(CountDownLatch stopLatch) {
this.stopLatch = stopLatch;
}
public void run() {
try {
// Некоторые действия
} finally {
stopLatch.countDown();
}
}
}
CountDownLatch cdl = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
Thread t = new StopLatchedThread(cdl);
t.start();
}
cdl.await();
System.out.println("Stopped");
В данном примере каждый поток уменьшает значение счетчика на «1». При его достижении «0» работу продолжает основной поток.
Lock объекты
Наверняка если вас спросить о средствах синхронизации потоков в
Java 1.4, каждый сразу же вспомнит synchronized блоки или целые методы.
Безусловно, они достаточно просты в использовании, но тем не менее
обладают рядом недостатков, которые мы рассмотрим чуть ниже.
synchronized (object) {
//действия внутри синхронизованного блока
}
Среди основных недостатков synchronized блоков можно выделить следующие:
-
Не существует способа отказаться от попытки захватить какой-либо
объект, если он занят, отсутствует возможность отказаться от попытки
захвата объекта через какой-то интервал времени. Имея все эти
возможности, проблема появления deadlock при синхронизации потоков была
бы не так актуальна.
-
Не существует способа осуществлять отдельные блокировки для чтения
или записи, что местами бывает весьма полезно. При освобождении
некоторого захваченного ресурса (того, который выступает параметром у
вызова synchronized блока) нет возможности специально дать доступ к
этому блоку самому первому потоку, который раньше других начал пытаться
его захватить.
-
Если существует несколько вложенных synchronized блоков, то
освобождены ресурсы должны быть строго в обратном порядке по сравнению с
тем, в котором они были захвачены.
Базовые Lock объекты
В Java 1.5 были введены так называемые внешние блокировки. Т.е.
программист сам может выбирать моменты начала и окончания каждой
блокировки. Базовым интерфейсом для таких блокировок является
java.util.concurrent.locks.Lock.
Lock l = ...;
l.lock();
try {
// доступ к защищенным ресурсам
} finally {
l.unlock();
}
Среди основных методов, использующихся для начала защищенного блока, можно выделить следующие:
-
void lock() - начало защищенного блока. В случае занятости ресурса происходит ожидание его освобождения.
-
boolean tryLock() - попытка получить блокировку, если она свободна
на момент вызова метода. Если нет, то возвращается значение false и
программа продолжает свое выполнение.
-
boolean tryLock(long time, TimeUnit unit) - попытка получить
блокировку в течение некоторого интервала времени. Если она не удачна,
то возвращается значение false и программа продолжает свое выполнение.
Базовой реализацией интерфейса Lock является класс ReentrantLock.
Конструктор данного класса может принимать так называемое значение
честности, т.е. индикатор того, должен ли первый совершивший попытку
поток получить доступ к освобожденному ресурсу. Следует отметить, что
использование этой возможности может негативно сказаться на
производительности приложения. Рекомендуется создавать «честный»
ReentrantLock только в том случае, если это действительно необходимо.
Lock объекты для чтения и записи
В Java 1.5 стало возможным отдельно управляться блокировкой для
чтения и записи данных, что безусловно является весьма удобным.
Блокировка для чтения может одновременно быть захвачена несколькими
потока, в то время как блокировка на запись является эксклюзивной только
для одного конкретного потока. Такое разделение положительным образом
влияет на производительность приложения, т.к. чтением информации могут
заниматься несколько потоков одновременно. В частности данный вид
блокировок можно применить для работы с коллекциями, к которым основной
процент обращений связан с чтением информации, и только изредка
присутствуют её обновления.
Базовой реализацией интерфейса ReadWriteLock является ReentrantReadWriteLock. Выделим основные его особенности:
-
Как и ReentrantLock поддерживает «честный» и «нечестный» режим работы.
-
Не отдает какого-либо предпочтения потокам для чтения или записи при получении блокировки.
-
Поддерживается прерывание процесса получения блокировки.
-
Если для текущего потока блокировка захвачена читателем, то писатель не может её получить. Обратное же верно.
Пример использования
public void writeData(String str) {
lock.writeLock().lock();
//Write data
lock.writeLock().unlock();
}
public void readData() {
lock.readLock().lock();
//Read data
lock.readLock().unlock();
}
Условия
Аналоги методов wait, notify, notifyAll, используемые совместно с
Lock объектами были вынесены в отдельный интерфейс –
java.util.concurrent.locks.Condition. Определение интерфейса Condition
приведено ниже:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
Удобство их использования также заключается в том, что возможно
использовать несколько независимых «ожиданий» для одного объекта:
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
Следует заметить, что по аналогии с synchronized блоками, с
условиями можно оперировать только из блоков кода, заблокированных
данным Lock объектом.
Параллельные коллекции
В дополнение к уже вышеперечисленным возможностям пакет
java.util.concurrent содержит высокопроизводительные thread-safe
реализации интерфейсов List и Map.
ConcurrentHashMap
ConcurrentHashMap является thread-safe реализацией интерфейса
Map, которая предоставляет значительно лучшую поддержку параллелизма,
нежели synchronizedMap. Рассмотрим, в чем же преимущество. Множество
чтений из объекта ConcurrentHashMap могут выполняться параллельно,
одновременные чтения и записи могут выполняться параллельно,
одновременные записи часто могут выполняться параллельно. В первую
очередь это достигается благодаря тому, что при выполнении какой-либо
операции блокируется не вся таблица, а только определенная порция данных
(ввиду соответствующей реализации хэш-таблиц в языке Java). При
создании объекта типа ConcurrentHashMap в качестве параметра
конструктора может быть передано число одновременно работающих потоков,
которые записывают данные. Таблица данных, лежащая в основе Map, будет
разделена на соответствующее число сегментов.
CopyOnWriteArrayList
Поведение CopyOnWriteArrayList аналогично List за тем
исключением, что параллельно может выполняться несколько операций чтения
и записи (операция записи может быть только одна в один момент
времени). Это достигается за счет того, что при каждом изменении списка
создается его новая копия, с которой и идет работа. Более того, важным
отличием от ConcurrentHashMap является то, что операции, работающие с
множеством объектом (addAll, retailAll) , являются атомарными.
CopyOnWriteArrayList предназначен для тех случаев, когда:
• Число чтений во много раз превосходит число записей.
• Список небольшого размера.
• Использование обычного массива по какой-то причине не подходит, и нужны возможности List.
Заключение
В данной статье были рассмотрены основные возможности,
появившиеся в пакете java.util.concurrent в Java 1.5. Несмотря на
достаточно большой промежуток времени с момента её выхода, данному
пакету было неоправданно уделено слишком мало внимания. При создании
многопоточных приложений, некоторые классы и возможности будут крайне
полезны и удобны в использовании.