Версия для печати


Параллельные потоки
http://www.delphikingdom.com/asp/viewitem.asp?catalogID=1125

Сергей Гурин
дата публикации 21-03-2005 10:50

Под словами "параллельный поток" или просто "поток" будем понимать объект, выполняемый параллельно с основным потоком приложения и с другими параллельными потоками. Термин "параллельность" рассматривается в контексте вытесняющей многозадачности операционной системы - то есть, операционная система выделяет потоку некоторый квант времени, а затем переключается на другой поток. Параллельный поток может добровольно отдавать часть своего кванта времени, если он переходит в состоянии ожидания некоторого события. Выполнение потока может быть прервано более приоритетным потоком. Момент переключения параллельных потоков и адрес процессорной инструкции в момент переключения будем считать полностью недетеминированными. Реализация параллельных потоков опирается на возможность, предоставляемую операционной системой - "thread". Для именования параллельных потоков в литературе встречаются и другие термины: активные объекты, нити, параллельные процессы. В статье будет рассмотрен только частный случай использования параллельных потоков - потоки, существующие в пределах одного приложения.

Введение

Не вдаваясь в тонкости реализации, обрисуем вначале одну из проблем, которая возникает при использовании потоков. Представим простейший случай: приложение и один параллельный поток - в приложении создается объект потока и ссылка на него присваивается некоторой переменной приложения. Объект потока может быть уничтожен как со стороны приложения, так и самостоятельно при своем естественном завершении или при завершении по ошибке. Если поток завершился самостоятельно, то ссылка перестанет указывать на объект и попытка последующего уничтожения объекта со стороны приложения приведет в лучшем случае к возникновению исключительной ситуации, а в худшем случае к непредсказуемой ошибке в данных. Для того, чтобы обойти эту ошибку, можно предложить простое решение - объект потока уведомляет приложение о своем завершении. Но, к сожалению, это решение не работает и вот почему. Предположим, что одновременно возникают два события - самостоятельное завершение потока и попытка его завершения со стороны приложения. Приложение еще не получило уведомления и пытается уничтожить поток, который сам находится на этапе уничтожения (например, уведомление посылается из деструктора объекта потока). Другое возможное решение - объекту потока передается указатель на ссылку на самого себя и объект самостоятельно обнуляет эту ссылку при своем завершении. Приложение проверяет ссылку на nil и, если она действительна, то уничтожает поток. Это решение также не работоспособно. Представьте ситуацию, при которой основной поток приложения успешно проверил ссылку, а затем операционная система переключила управление на параллельный поток, который начал самоуничтожение. Приложение считает, что объект потока существует, и при получении управления будет уничтожать только что уничтоженный объект. Вы можете считать такую ситуацию маловероятной, и действительно, в простейшем примере ее вероятность будет исчезающе мала, но в более сложных случаях с многократным созданием-уничтожением потоков и при большом числе потоков этой вероятностью уже никак нельзя пренебреь. Как видно, для решения этой задачи требуется более сложный механизм. Может показаться, что трудности, возникающие при использовании параллельных потоков довольно велики. На самом деле это так и не стоит применять столь мощную технику, как потоки, без особой на то нужды. Но при аккуратном их применении, потоки могут быть чрезвычайно полезными, они могут существенно улучшить внутренний дизайн приложения и сделать приложение более эффективным. При неаккуратном же использовании потоков, ничего, кроме вреда, вы не получите.

Синхронизация потоков

Продолжим наш пример и попытаемся найти корректное решение задачи уничтожения объекта потока. Интуитивно можно предположить, что для решения нам потребуется некоторый третейский судья, так как самостоятельно ни приложение, ни поток с задачей не справятся. Таким арбитром может служить объект, известный как "критическая секция". Смысл этого объекта состоит в том, что операционная система гарантирует, что при любом раскладе только один из множества конкурирующих параллельных потоков может попасть внутрь критической секции. Все остальные потоки будут приостановлены операционной системой вплоть до момента, когда поток, захвативший критическую секцию, выйдет из нее. В критической секции можно проверить действительность ссылки, не опасаясь, что она будет нарушена. Это действительно корректное решение, но какой ценой?

Если мы расширим наш простейший пример на случай множества потоков, то для каждого потока нам потребуется одна ссылка и одна критическая секция. Конечно, можно обойтись одной критической секцией на все потоки, но при большом их числе и при частом создании-уничтожении потоки будут толпиться в одном узком месте, что не есть хорошо. Задача осложняется тем, что мы должны контролировать действительность потока всякий раз, когда мы к нему обращаемся, то есть, при любой операции с объектом потока. Отсюда понятно нежелание делать одну критическую секцию на все потоки.

Наличие большого числа критических секций и ссылок делает программу весьма подверженной ошибкам, так как очень легко забыть о том, что любое обращение к потоку требуется выполнять через критическую секцию. А по закону Мэрфи такое обязательно произойдет и, опять же, по закону Мэрфи, в самый неподходящий момент и в самом неподходящем месте. Нам требуется найти не только корректное, но и удобное, "ошибкоустойчивое", решение задачи. А для стимула можно напустить еще больше страху - объекты потоков взаимодействуют не только с потоком приложения, но и между собой, да еще создаются и уничтожаются не только в потоке приложения, но в других потоках.

Как ни странно, эта задача не так сложна, какой может показаться после всех описанных ужасов. Более того, возможно несколько ее решений. В статье будет дано одно из них, которое лично мне кажется наиболее элегантным.

Потоки и дескрипторы

Когда мы создаем какой-либо объект ядра Windows: файл, сокет, окно, мьютекс, то всегда получаем как результат не сам объект (не его адрес в памяти), а некоторое целое число, дескриптор (handle), которое однозначно характеризует системный ресурс. С одной стороны, дескриптор развязывает виртуальную память приложения и виртуальную память ядра операционной системы, а с другой стороны, выполняет функцию автоматического контроля существования объекта, так как любое обращение к объекту происходит через его дескриптор. Эту же технику мы можем применить в нашем случае для решения задачи корректного уничтожения потоков. Определим в первом приближении интерфейс объекта-арбитра, который будет разрешать все конфликты, связанные с созданием потоков, их уничтожением и получением доступа к потокам:

TMyThreadManager = class
public
  function  Add(aThread: TMyThread): Integer;
  procedure Release(aHandle: Integer);
  function  Lock(aHandle: Integer): TMyThread;
end;

Предполагается, что все потоки порождаются от некоторого базового класса TMyThread. После конструирования объекта потока он передается арбитру-менеджеру, который сохраняет его в своих недрах и возвращает целочисленный дескриптор, однозначно характеризующий поток. Приложение (или другой поток-родитель) сохраняет у себя дескриптор созданного потока. Когда поток больше не нужен, то его можно уничтожить, вызывав операцию Release и передав ей дескриптор потока. Если поток уже был уничтожен, то ничего не произойдет, а если поток еще жив, то будет сделана попытка его уничтожения. Здесь важен акцент на слове "попытка", а не на слове "уничтожение", но об этом чуть позже. Когда поток приложения (или любой другой поток) желает взаимодействовать с потоком, дескриптор которого ему известен, он может блокировать поток в памяти, вызвав метод Lock, который возвращает действительный объект потока или nil, если потока больше не существует. Когда поток завершается самостоятельно или другим объектом, которому он больше не нужен, то это делается с помощью уже известного нам метода Release.

Подсчет ссылок

Теперь вернемся к словам "попытка" и "блокировка в памяти". Чтобы понять, как можно управлять временем жизни объекта потока, вспомним еще одну важную технику - подсчет ссылок. Когда объект с управляемым временем жизни создается, его счетчик ссылок устанавливается равным 1 - это означает, что у объекта один "пользователь". Когда объект передается другому объекту, например, помещается в очередь, то его счетчик ссылок увеличивается на 1. Если же "пользователь" больше не нуждается в объекте, то счетчик ссылок объекта уменьшается на 1. Когда счетчик ссылок становится равным 0, то это означает, что объект больше никому не нужен и его можно безопасно уничтожить. Вместе с уничтожением объекта становится недействительным его дескриптор. Таким образом, при создании объекта и его передаче арбитру, метод Add создает новый дескриптор, связывает его с потоком и устанавливает для потока счетчик ссылок, равный единице. Метод Lock увеличивает счетчик ссылок на 1, а метод Release уменьшает счетчик ссылок на 1. То есть, объект потока не будет уничтожен до тех пор, пока у него хотя бы один "пользователь". Сюда включается очень важный момент - даже если поток завершился самостоятельно и вызывал для самого себя метод Release, то объект потока не будет уничтожен до тех пор, пока все пользователи потока явно от него не откажутся с помощью метода Release. Приведем пример кода, предполагая, что ThreadManager - это объект класса TMyThreadManager, Handle - это дескриптор потока, а Thread - это объект потока:

// создание потока
  Handle := ThreadManager.Add(TSomeThread.Create);
  ....
  // работа с потоком
  Thread := ThreadManager.Lock(Handle);
  if Assigned(Thread) then
  begin
    ....
  end;
  ....
  // уничтожение потока
  ThreadManager.Release(Handle);

Реализация параллельного потока

В качестве базового класса для потоков вполне можно использовать стандартный Delphi-класс TThread. Но для полноты картины в этом разделе статьи будет приведена другая реализация базового класса для параллельных потоков. В следующем разделе более подробно будет рассмотрен арбитр-менеджер потоков.

Класс, который предлагается в качестве кандидата для базового класса параллельных потоков, будет иметь только самую важную функциональность, то есть:

Последнее требование нуждается в некотором разъяснении. Дело в том, что библиотека VCL, предоставляющая Delphi-приложениям различные визуальные компоненты, не является потокобезопасной. Это означает, что прямой вызов методов визуальных компонентов из различных потоков почти наверняка разрушит работу приложения. При работе с параллельными потоками требуется обеспечить синхронизацию таким образом, чтобы методы визуальных компонентов вызывались только из основного потока приложения (VCL-потока). Класс TThread решает эту проблему с помощью метода Synchronize, который организует вызов некоторого метода в контексте основного потока и корректно обрабатывает возникающие исключительные ситуации.

Приведем определение класса параллельного потока, полный код вы можете посмотреть в файле GsvThread.pas, который прилагается к статье.

TGsvThread = class
  public
    constructor Create;
    destructor  Destroy; override;

  private
    // эти поля нужны для менеджера потоков
    FManager:          TGsvThreadManager;
    FGsvHandle:        Integer;
    FRefCount:         Integer;
    FCollision:        TGsvThread;

    // это собственные данные объекта потока
    FSysHandle:        THandle;
    FTerminated:       Boolean;
    FFinished:         Boolean;
    FTerminationEvent: THandle;

    procedure ThreadExecute;
    procedure Terminate;
    function  GetPriority: Cardinal;
    procedure SetPriority(const Value: Cardinal);

  protected
    procedure Execute; virtual; abstract;
    procedure OnError(const E: Exception); virtual;
    procedure OnFinished; virtual;

    procedure Pause(aTime: Cardinal);

    property  Terminated: Boolean read FTerminated write FTerminated;

  public
    procedure Resume;
    procedure Suspend;

    property  GsvHandle: Integer read FGsvHandle;
    property  SysHandle: THandle read FSysHandle;
    property  Finished: Boolean read FFinished;
    property  Priority: Cardinal read GetPriority write SetPriority;
    property  TerminationEvent: THandle read FTerminationEvent;
  end;

С помощью конструктора мы можем создавать объект потока, метод Suspend позволяет приостановить поток, метод Resume запустить его после приостановки. Метод Suspend может быть вызван многократно, но каждому Suspend должен соответствовать парный вызов Resume. Свойство Priority позволяет получить и изменить приоритет потока. Это минимальная внешняя функциональность, которую мы предполагаем получить от потока. Внутренняя функциональность более сложна. Алгоритм работы потока определяется в наследуемом методе Execute. Если поток выполняет некоторую циклическую работу, то метод может выглядеть так:

procedure TDerivedThread.Execute;
begin
  // действия до начала цикла
  ....
  // циклическое выполнение
  while not Terminated do begin
    ....
  end;
end;

То есть, поток выполняется циклически до тех пор, пока у потока есть "пользователи". Кроме того, поток может самостоятельно прервать свое выполнения, выйдя из цикла или установив свойство Terminated в True. Если действия потока нециклические, то он просто выполняется от начала до конца. При возникновении ошибок вызывается метод OnError, которому передается объект возникшего исключения. При завершении работы потока (а, точнее, объекта ядра операционной системы) вызывается метод OnFinished. Отметим, что метод OnFinished будет вызван всегда, независимо от вызова метода OnError. Эти методы переопределяются в наследуемых классах. С помощью указанных методов можно определить все аспекты старта, выполнения и завершения параллельного потока.

Поток может самостоятельно приостановиться на некоторое время с помощью метода Pause, аргумент которого - время в миллисекундах. Время может быть равно 0, это означает, что поток отказывается от кванта времени, который выделен ему операционной системой и разрешает операционной системе переключиться на другой параллельный поток с тем же приоритетом. Если таких потоков нет, то Pause(0) не выполняет каких-либо действий. Рассмотрим реализацию метода Pause подробнее.

procedure TGsvThread.Pause(aTime: Cardinal);
begin
  if not FTerminated then begin
    if aTime = 0 then
      Sleep(0)
    else
      WaitForSingleObject(FTerminationEvent, aTime);
  end;
end;

Приостановка выполняется только в том случае, если поток еще не завершен и менеджер потоков не вызвал завершающий метод Terminate. Если время равно 0, то вызывается Windows-функция Sleep, а иначе выполняется ожидание события FTerminationEvent с таймаутом, равным времени паузы. Событие FTerminationEvent создается в конструкторе объекта потока, его назначение - быстрое завершение потока, если он находится состоянии ожидания некоторого события. Это довольно важный момент. Если мы хотим завершить программу, а поток ожидает некоторого события (которое, возможно уже никогда не произойдет или произойдет через неопределенное время), то у нас должен быть способ форсированного вывода потока из состояния ожидания. Более подробно событие TerminationEvent будет рассмотрено при обсуждении альтернативного ожидания. Событие устанавливается в методе Terminate:

procedure TGsvThread.Terminate;
begin
  if not FTerminated then begin
    FTerminated := True;
    SetEvent(FTerminationEvent);
    Resume;
  end;
end;

Напомню, что предлагаемая техника основана на том, что временем жизни объекта потока управляет только менеджер потоков, поэтому другие потоки не могут вызывать напрямую метод Terminate, а тем паче Free - вместо этого они должны вызывать метод менеджера Release, чтобы сообщить о своем отказе от того или иного потока.

Реализация менеджера потоков

Интерфейс менеджера потоков:

TGsvThreadManager = class
  private
    FLatch:     TRTLCriticalSection;
    FHashTable: array of TGsvThread;
    FCurHandle: Integer;
    FCount:     Integer;
    FOnEmpty:   TNotifyEvent;

  public
    constructor Create(aCapacity: Integer = 64);
    destructor  Destroy; override;

    function  Add(aThread: TGsvThread; aStart: Boolean = True): Integer;
    procedure Release(aHandle: Integer);
    function  Lock(aHandle: Integer): TGsvThread;
    function  TerminateAll: Integer;
    function  ActiveThreadList: IGsvThreadList;

    property  OnEmpty: TNotifyEvent read FOnEmpty write FOnEmpty;
  end;

Критическая секция FLatch используется для предоставления исключительного доступа к внутренним данным менеджера для того, чтобы избежать ситуацию наложения конкурирующих потоков, которые одновременно могут вызывать методы менеджера. Хэш-таблица организует быстрое преобразование дескриптора потока в его адрес, по сравнению с другими способами поиска хеш-таблица обладает прозводительностью O(1), то есть, время поиска не зависит от числа элементов. В менеджере реализована не совсем обычная хеш-таблица - вместо хеш-функции используется само значение дескриптора потока. Размер таблицы задается в конструкторе и обычно выбирается в полтора-два раза большим, чем предполагаемое число потоков. Дескриптор потока назначается с помощью целочисленной переменной, которая каждый раз увеличивается на 1. Это дает очень хорошую расстановку хеш-ключей. При многократном создании-уничтожении потоков может произойти ситуация, при которой два и более различных дескриптора будут иметь один и тот же хеш-индекс (который вычисляется как остаток от деления дескриптора на размер хеш-таблицы). В этом случае для хеш-индекса создается цепочка коллизий, которая выглядит как односвязный список.

Метод Add связывает объект потока с дескриптором, устанавливает его счетчик ссылок в 1 и добавляет объект потока в хеш-таблицу:

function TGsvThreadManager.Add(aThread: TGsvThread; aStart: Boolean): Integer;
var
  hash: Integer; // хеш-код дескриптора: индекс в хеш-таблице
begin
  // делаем все операции внутри критической секции чтобы исключить
  // наложение параллельных потоков
  EnterCriticalSection(FLatch);
  try
    Inc(FCurHandle);   // создаем следующий дескриптор
    hash               := FCurHandle mod Length(FHashTable);
    aThread.FManager   := Self;
    aThread.FGsvHandle := FCurHandle;
    aThread.FRefCount  := 1;
    // включаем объект в начало цепочки коллизий (если она есть) и
    // вносим объект в хеш-таблицу
    aThread.FCollision := FHashTable[hash];
    FHashTable[hash]   := aThread;
    Inc(FCount);
    Result := FCurHandle;
  finally
    LeaveCriticalSection(FLatch);
  end;
  // активизируем параллельное выполнение потока
  if aStart then
    aThread.Resume;
end;

Поток создается в приостановленном состоянии - можно стартовать его сразу при передаче менеджеру потоков, а можно потом, когда будут созданы все взаимодействующие потоки. Это достаточно важный элемент проектирования потоков - все взаимодействующие потоки должны быть созданы до момента их запуска чтобы избежать ситуации взаимодействия с еще не созданным потоком.

Метод Release находит объект потока по его дескриптору, уменьшает счетчик ссылок и уничтожает объект, если он больше никому не нужен:

procedure TGsvThreadManager.Release(aHandle: Integer);
var
  hash: Integer;
  th:   TGsvThread; // поток, связанный с дескриптором
  prev: TGsvThread; // предыдущий поток в цепочке коллизий
begin
  EnterCriticalSection(FLatch);
  try
    // поиск объекта, связанного с дескриптором и предыдущего объекта в
    // цепочке коллизий. Предыдущий объект нужен из-за того, что
    // цепочка коллизий представляет собой односвязный список, а нам может
    // потребоваться удаление объекта из середины или конца списка
    hash := aHandle mod Length(FHashTable);
    prev := nil;
    th   := FHashTable[hash];
    while Assigned(th) do begin
      if th.FGsvHandle = aHandle then
        Break;
      // проходим по цепочке коллизий
      prev := th;
      th   := th.FCollision;
    end;
    if Assigned(th) then begin
      // объект потока еще существует, уменьшаем его счетчик ссылок
      Dec(th.FRefCount);
      // используем сравнение ( <= 0) для защиты от ошибки повторного уничтожения
      if th.FRefCount <= 0 then begin
        // объект потока больше никому не нужен
        if not th.FFinished then begin
          // объект потока еще не завершен. Завершаем его
          th.Terminate;
          // после своего завершения поток самостоятельно вызовет Release,
          // поэтому временно увеличиваем его счетчик ссылок без уничтожения
          // объекта (не допускаем отрицательного значения счетчика ссылок)
          Inc(th.FRefCount);
        end
        else begin
          // объект потока завершен,
          // удаляем объект из хеш-таблицы и из цепочки коллизий
          if Assigned(prev) then
            prev.FCollision := th.FCollision   // объект в середине или в конце
          else
            FHashTable[hash] := th.FCollision; // объект в начале цепочки
          Dec(FCount);
          // уничтожаем объект потока
          th.Free;
        end;
      end;
    end;
    // else - объекта с таким дескриптором не существует, ничего не делаем
    // Если список потоков пуст, то вызываем событие OnEmpty
    if (FCount = 0) and Assigned(FOnEmpty) then
      FOnEmpty(Self);
  finally
    LeaveCriticalSection(FLatch);
  end;
end;

Особо отметим два момента. Если поток еще не завершен, но его счетчик ссылок уже равен 0, то вместо грубого уничтожения потока мы делаем мягкое действие - предлагаем потоку завершиться самостоятельно, вызывая его метод Terminate. Это достаточно важный фактор, так как поток при своем завершении может требовать некоторых действий, которые обязательно нужно сделать для корректного завершения работы потока. Второй момент - когда программа закрывается, то мы должны сначала инициировать завершение работы всех параллельных потоков и дождаться их действительного завершения. Обычно эта работа делается в методе формы CloseQuery, но для завершения параллельных потоков эта задача несколько усложняется, так как мы не можем перевести основной поток в состояние ожидания - при своем завершении параллельный поток может требовать взаимодействия с основным потоком и перевод основного потока в состояние ожидания приведет к дедлоку и мертвому зависанию программы. Для решения этой задачи и предназначено событие OnEmpty. Приведем пример кода, который решает задачу ожидания завершения параллельных потоков без риска дедлока:

procedure TForm1.FormCloseQuery(Sender: TObject; var CanClose: Boolean);
begin
  FThreadManager.OnEmpty := OnEmpty;
  CanClose := FThreadManager.TerminateAll = 0;
end;

procedure TForm1.OnEmpty(Sender: TObject);
begin
  PostMessage(Handle, WM_CLOSE_APP, 0, 0);
end;

procedure TForm1.OnCloseApp(var aMessage: TMessage);
begin
  Close;
end;

При завершении программы мы устанавливаем обработчик OnEmpty, который будет вызван менеджером потоков при завершении всех потоков, а затем вызываем метод TerminateAll, который инициирует завершение всех имеющихся параллельных потоков и возвращает количество еще не завершенных потоков. Если незавершенные потоки есть, то закрытие формы запрещается. Когда менеджер потоков обнаружит завершение последнего потока, он вызовет обработчик события OnEmpty. Мы не можем сразу в этом обработчике закрыть форму, так как завершение последнего потока выполняется еще в контексте его параллельного потока, а не основного VCL-потока. Поэтому посылаем форме асинхронное сообщение (PostMessage) - оно будет обрабатываться в основном потоке.

function TGsvThreadManager.TerminateAll: Integer;
var
  hash: Integer;
  th:   TGsvThread;
begin
  Result := 0;
  EnterCriticalSection(FLatch);
  try
    // обходим всю хеш-таблицу
    for hash := Low(FHashTable) to High(FHashTable) do begin
      th := FHashTable[hash];
      while Assigned(th) do begin
        // завершаем поток. Если он уже завершен, то Terminate не вызывает
        // побочных действий
        th.Terminate;
        if not th.FFinished then
          Inc(Result);
        // проходим по цепочке коллизий
        th := th.FCollision;
      end;
    end;
  finally
    LeaveCriticalSection(FLatch);
  end;
end;

Обсудив создание и завершение потоков, перейдем к их жизни и более подробно рассмотрим метод Lock:

function TGsvThreadManager.Lock(aHandle: Integer): TGsvThread;
var
  hash: Integer;
begin
  EnterCriticalSection(FLatch);
  try
    hash := aHandle mod Length(FHashTable);
    // поиск объекта потока по его дескриптору
    Result := FHashTable[hash];
    while Assigned(Result) do begin
      if Result.FGsvHandle = aHandle then
        Break;
      Result := Result.FCollision;
    end;
    // объект существует, увеличиваем его счетчик ссылок, так как у объекта
    // появился еще один "пользователь"
    if Assigned(Result) then
      Inc(Result.FRefCount);
  finally
    LeaveCriticalSection(FLatch);
  end;
end;

Действия этого метода весьма просты - находим объект потока в хеш-таблице по его дескриптору и увеличиваем счетчик ссылок потока. Другой метод менеджера, который может быть весьма полезным - ActiveThreadList, позволяет нам получить мгновенный снимок всех активных параллельных потоков. Список создается менеджером внутри критической секции, поэтому состояние всех параллельных потоков во время создания списка остается неизменным.

IGsvThreadList = interface
  ['{2B09399A-07E9-47F5-9CB7-3E34230D37D1}']
    function Count: Integer;
    function GetItem(aIndex: Integer): TGsvThread;

    property Items[aIndex: Integer]: TGsvThread read GetItem; default;
  end;

Все потоки, которые внесены в список, получают приращение счетчика ссылок, поэтому даже если сразу после создания списка они будут завершены, их объекты останутся действительными вплоть до уничтожения списка, а при уничтожении списка для каждого из потоков будет вызван метод Release. Может вызвать некоторое удивление, что список доступен через интерфейс, причем реализация списка скрыта в секции implementation. Этому есть очень простое объяснение. Использование интерфейса позволяет автоматически управлять временем жизни списка, так как компилятор Delphi автоматически уничтожит объект списка, когда ссылка на его интерфейс выйдет из области видимости. Приведем фрагмент кода:

var
  list: IGsvThreadList;
  i:    Integer;
begin
  list := FThreadManager.ActiveThreadList;
  for i := 0 to Pred(list.Count) do begin
    .... // некоторое действие с list[i]
  end;

Нам не требуется явно уничтожать список. Независимо от исключительных ситуаций, которые могут возникнуть внутри цикла for, объект списка будет уничтожен автоматически с помощью механизма подсчета ссылок, который компилятор Delphi применяет к интерфейсам. Использование интерфейса упрощает код и делает его менее подверженным ошибкам.

И, наконец, последнее замечание, касающееся менеджера потоков. В большинстве случаев одного менеджера оказывается достаточно, но в моей практике встречались ситуации, когда потоки естественно образуют группы, причем потоки в группе взаимодействуют только между собой и не взаимодействуют с потоками других групп. В этом случае для каждой группы удобно использовать отдельный менеджер.

Взаимодействие с VCL-потоком

Сначала рассмотрим взаимодействие параллельных потоков с основным VCL-потоком, а затем взаимодействие параллельных потоков между собой. Как уже было отмечено, библиотека VCL не является потокобезопасной, поэтому недопустимо вызывать методы и изменять свойства визуальных компонентов из параллельных потоков. Для решения задачи синхронизации Delphi-класс TThread имеет метод Synchronize. В классе TGsvThread будет использован другой способ взаимодействия, который мне кажется более удобным. Взаимодействия с основным потоком может быть синхронным и асинхронным. Первый тип взаимодействия реализуется с помощью посылки синхронного оконного сообщения - SendMessage, а второй - с помощью асинхронного оконного сообщения - PostMessage. В первом случае выполнение параллельного потока, вызвавшего SendMessage, приостанавливается и управление передается средствами операционной системы основному потоку приложения, вызывается процедура окна, дескриптор которого был передан в функцию SendMessage и диспетчер сообщений окна вызывает тот метод формы, который связан с идентификатором сообщения. Рассмотрим простейший пример, в котором форма отображает с помощью компонента TLabel целое число, которое инкрементируется с некоторым интервалом в параллельном потоке.

procedure TThreadCounter.Execute;
var
  i: Integer;
begin
  i := 0;
  while not Terminated do begin
    Pause(500);
    Inc(i);
    SendMessage(FFormHandle, WM_THREAD_COUNTER, i, LPARAM(Self));
  end;
end;

Дескриптор окна передается конструктору объекта TThreadCounter и используется для посылки синхронного сообщения, причем значение счетчика передается как WParam, а в LParam передается сам объект параллельного потока. Для приема этого сообщения в форме должна быть определена такая процедура:

const
  WM_THREAD_COUNTER = WM_USER + 1000;
....
  procedure OnThreadCounter(var aMessage: TMessage);
            message WM_THREAD_COUNTER;
....
procedure TForm1.OnThreadCounter(var aMessage: TMessage);
begin
  lblCounter.Caption := IntToStr(aMessage.WParam);
end;

В процедуре OnThreadCounter используется только значение счетчика, хотя можно безопасно использовать и сам объект потока, так как при синхронном сообщении он приостановлен и его состояние не может измениться.

Асинхронное взаимодействие с VCL-потоком выполняется с помощью функции PostMessage. В отличие от SendMessage, вызывающий поток не приостанавливается, а сообщение просто заносится в очередь сообщений основного потока, в контексте которого выполняются оконные процедуры всех VCL-форм. В PostMessage мы уже не имеем права передавать ссылку на объект потока, так как поток может быть уничтожен до того момента, как будет обработано оконное сообщение, но вот дескриптор параллельного потока можно передать без всякого опасения.

Двух указанных способов вполне достаточно для организации взаимодействия параллельных потоков с основным VCL-потоком, причем с минимальными трудозатратами - так же, как и при использовании Synchronize, нам нужно определить всего один метод. В отличие от Synchronize, метод определяется не в классе потока, а в классе формы - это кажется мне более удобным решением. Хотя есть и не очень эстетичный момент - необходимость явного приведения типов WParam и LParam к действительным типам передаваемых данных, которого в общем случае желательно избегать.

Взаимодействие параллельных потоков

В литературе описано множество различных видов взаимодействия параллельных потоков. Мы рассмотрим только некоторые, наиболее важные виды, начиная с самого простого - совместное использование разделяемых данных.

Разделяемые данные

Задача, которую нужно решить при совместном использовании данных заключается в обеспечении атомарности (неделимости) изменения данных, что иногда обозначается термином "транзакция". Иными словами, необходим взаимно-исключающий доступ к данным - недопустимо, чтобы один поток читал данные, в то время как другой поток их изменял. Наиболее простое и эффективное средство - это использование критической секции. Delphi определяет оболочку для критической секции - класс TCriticalSection в модуле SyncObjs. Модуль GsvThread имеет идентичный класс TGsvLatch, отличающийся от TCriticalSection только именами методов (вместо имени Enter используется Lock, а вместо Leave - Unlock).

Использовать критическую секцию (защелку) очень просто:

Latch.Lock;
  try
    // использование разделяемых данных
    ....
  finally
    Latch.Unlock;
  end;

Блок try-finally позволяет разблокировать критическую секцию даже при возникновении исключительной ситуации. Опускать try-finally допустимо только при очень простом коде, в котором ошибка исключена. У критической секции есть одно очень важное достоинство - высокая эффективность, поскольку критическая секция - это не объект ядра операционной системы, а запись (record) в адресном пространстве приложения, именно поэтому критическая секция решает задачу синхронизации потоков только в рамках одного приложения. Альтернативное и более универсальное средство - мьютекс, более чем на порядок уступает критической секции по эффективности. В качестве недостатков критической секции можно отметить следующие:

Последний недостаток можно исправить, если применить более сложную технику, основанную не на критических секциях, а на событиях - это делает стандартный Delphi-класс TMultiReadExclusiveWriteSynchronizer.

Задача реализации взаимоисключающего доступа всегда сталкивается с принципиальной проблемой взаимной блокировки при одновременном захвате более чем одного ресурса. Предположим, что существуют два потока A и B, а также два совместно используемых ресурса R1 и R2. Далее предположим такой сценарий: поток A захватывает ресурс R1, а затем ресурс R2 (не освобождая при этом ресурса R1). Поток B захватывает ресурc R2, а затем ресурс R1. Если после того как поток A захватил R1, поток B захватит ресурс R2, то оба потока попадут в состояние дедлока (взаимоблокировки) из которого никогда не выйдут. Обнаружение и предотвращение дедлока в общем случае весьма сложная задача, поэтому требуется очень аккуратно программировать ситуацию захвата нескольких ресурсов: либо отказаться от множественного захвата, введя поток-посредник, либо захватывать ресурсы всегда только в одном и том же порядке.

Несколько уменьшить трудности, связанные с критическими секциями, можно, если инкапсулировать разделяемые данные в специально разработанных классах. Например, можно представить себе такую реализацию разделяемого списка - отдельно список TList и отдельно критическая секция для его защиты. Поскольку каждую операцию со списком требуется выполнять внутри критической секции, то программировать такие разделяемые данные очень неудобно и опасно - легко пропустить ограничение операции критической секцией. Если же объединить TList и критическую секцию в одном классе и защитить каждую операцию критической секцией, то подобных ошибок можно избежать. Именно такая цель была поставлена при реализации потокозащищенного списка в стандартном Delphi-классе TThreadList. К сожалению, таких потокозащищенных классов в Delphi совсем немного и в большинстве случаев их приходится кодировать самостоятельно.

Асинхронное взаимодействие

Это очень универсальный и весьма эффективный способ взаимодействия параллельных потоков. Суть его состоит во введении посредника-буфера между параллельными потоками. Иногда этот способ называют взаимодействием с помощью обмена сообщениями. Поток-писатель, желающий послать другому потоку некоторые данные, записывает их в очередь. Поток-читатель ожидает появления данных в очереди, получает их, обрабатывает и, если нужно, посылает результат в очередь потока-писателя. Принципиальный недостаток асинхронного взаимодействия - необходимость буфера неопределенного размера. Этот недостаток можно сгладить, если ввести ограничение на размер очереди - в этом случае поток-писатель должен приостанавливаться, если очередь достигла своего максимального размера.

Важная проблема эффективности многопоточной обработки возникает в том случае, если поток должен обрабатывать несколько очередей входящих сообщений или несколько событий. Приведем цитату из [1]: "пассажиру, ожидающему автобуса номер 127 в общем случае придется ждать дольше, чем тому, кто может воспользоваться как 19-м, так и 127-м маршрутом автобуса, в зависимости от того, который из них раньше придет к остановке. В предположении что автобусы приходят в случайном порядке, у пассажира, имеющего выбор, время ожидания оказывается вдвое короче - парадоксально, но получается, что он ждет вдвое "быстрее"! Единственный способ достигнуть этого - ожидать именно первого из многих возможных событий; приобретение же более быстрого компьютера здесь не поможет".

Учитывая важность выбора первого из многих возможных событий, введем понятие альтернативного ожидания и дадим его практическое воплощение.

TGsvSelectMethod = procedure of object;

  TGsvSelect = class
  private
    FEvents:  array[0..MAXIMUM_WAIT_OBJECTS - 1] of THandle;
    FMethods: array[0..MAXIMUM_WAIT_OBJECTS - 1] of TGsvSelectMethod;
    FCount:   Cardinal;

  public
    constructor Create(aThread: TGsvThread);

    procedure Init;
    procedure Add(aEvent: THandle; aMethod: TGsvSelectMethod);
    function  Wait(aTimeout: Cardinal = INFINITE): Boolean;
  end;

Альтернатива могут быть добавлена к списку возможных альтернатив с помощью метода Add, который принимает 2 аргумента - дескриптор события и метод-обработчик. Альтернативы могут добавляться статически, на этапе создания объекта, а могут динамически, причем список альтернатив может быть различным в зависимости от различных условий. В языке Ada это делает оператор отбора. В Delphi мы можем воспользоваться оператором if или case. Список альтернатив инициализируется вызовом Init, а альтернативное ожидание - вызовом Wait. Если в процессе ожидания возникло событие, то будет вызван метод, связанный с данным событием и Wait вернет True, в противном случае Wait вернет False. Обработчик события - это обычный метод класса параллельного потока. Функция Wait принимает в качестве аргумента значение таймаута (в миллисекундах), если за время таймаута никаких событий не произошло, то Wait вернет False. Отметим важную деталь - в список ожидаемых событий неявно вносится событие FTerminationEvent, которое делает возможным форсированное завершение ожидания при завершении потока - в этом случае Wait также вернет False. Приведем пример кода, в котором параллельный поток создает набор альтернатив и выполняет ожидание. Предполагается, что в конструкторе объекта потока создан объект FSelect класса TGsvSelect:

FSelect.Init;
  if условие1 then
    FSelect.Add(событие1, метод1);
  if условие2 then
    FSelect.Add(событие2, метод2);
  FSelect.Add(безусловное_событие3, метод3);
  if not FSelect.Wait(100) then begin
    if Terminated then
      // обработка завершения потока
    else
      // обработка таймаута
  end;

Теперь рассмотрим более подробно реализацию класса TGsvSelect:

constructor TGsvSelect.Create(aThread: TGsvThread);
begin
  inherited Create;
  FEvents[0]  := aThread.FTerminationEvent;
  FMethods[0] := nil;
  FCount      := 1;
end;

procedure TGsvSelect.Init;
begin
  FCount := 1;
end;

procedure TGsvSelect.Add(aEvent: THandle; aMethod: TGsvSelectMethod);
begin
  Assert(FCount <= High(FEvents));
  FEvents[FCount]  := aEvent;
  FMethods[FCount] := aMethod;
  Inc(FCount);
end;

function TGsvSelect.Wait(aTimeout: Cardinal): Boolean;
var
  res, i: Cardinal;
begin
  Result := False;
  res    := WaitForMultipleObjects(FCount, @FEvents[0], False, aTimeout);
  if res < (WAIT_OBJECT_0 + FCount) then begin
    Result := res > WAIT_OBJECT_0;
    if Result then begin
      i := res - WAIT_OBJECT_0;
      if Assigned(FMethods[i]) then
        FMethods[i]();
    end;
  end;
end;

При создании объекта первым событием в списке становится завершающее событие. Процедура Add регистрирует событие - добавляет его в список (в массив) событий и добавляет в список методов связанный с этим событием метод. Функция Wait выполняет альтернативное ожидание по всем зарегистрированным событиям, включая завершающее событие. Следует отметить, что функция Wait может вернуть False не только при таймауте или при завершающем событии. Это может случиться также в том случае, когда в качестве события используется мьютекс, от которого отказался создавший его процесс. Но поскольку мы ограничились случаем взаимодействия потоков в рамках одного приложения (одного процесса операционной системы), то это событие у нас не возникнет никогда.

Теперь пора уточнить термин "событие", который мы использовали до этого момента без особого разъяснения. Под событием мы будем понимать изменение состояния объекта ядра операционной системы. В качестве событий можно использовать:

  1. Event - объект ядра, который так и называется - событие. Ввиду его важности мы подробно рассмотрим этот вид события немного ниже.
  2. Mutex (мьютекс) - напоминает критическую секцию, то есть, только один из конкурирующий потоков может захвать мьютекс, а остальные потоки будут приостановлены. Мьютекс имеет перед критической секцией ряд преимуществ, в частности, время ожидания мьютекса можно ограничивать таймаутом, мьютекс можно использовать для синхронизации параллельных потоков из разных приложений. См. в MSDN функции CreateMutex, OpenMutex, ReleaseMutex.
  3. Semaphore (семафор) - напоминает мьютекс, но разрешает захват ресурса не одним, а несколькими потоками, причем число потоков ограничено доступным количеством ресурса. См. функции CreateSemaphore, OpenSemaphore, ReleaseSemaphore.
  4. Change notification (уведомление файловой системы) - событие возникает при различных изменениях в файловой системе. См. FindFirstChangeNotification.
  5. Process - событие возбуждается при завершении процесса. См. CreateProcess.
  6. Thread - событие возбуждается при завершении потока. Ожидание этого события позволяет родительскому потоку дождаться завершения дочерних потоков. В классе TGsvThread дескриптор этого события называется SysHandle.

Кроме перечисленных событий операционная система Windows предоставляет еще несколько событий, но они доступны только в Windows NT. Особо можно еще отметить сокетные события, которые создаются функцией WSACreateEvent - эти события также можно использовать при альтернативных ожиданиях на сокетах.

Рассмотрим более подробно событие "Event":

TGsvEvent = class
  private
    FHandle: THandle;

    procedure SetState(aState: Boolean);

  public
    constructor Create;
    destructor  Destroy; override;

    function Wait(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): Boolean;

    property Handle: THandle read FHandle;
    property State: Boolean write SetState;
  end;

Событие может находиться в двух состояниях - активном (сигнализирущее состояние) и сброшенном (несигнализирующее состояние). В активное состояние событие переводится установкой свойства State в True, а сбрасывается событие установкой State в False. Эти редкий тип свойства, которое доступно только для установки, но не доступно для получения. Событие автоматически сбрасывается при каком-либо успешном ожидании, после чего его нужно явно перевести в активное состояние. Событие может быть внутренним объектом потока, а может не принадлежать никакому потоку конкретно. Поток может ожидать только данного события с помощью метода Wait, либо использовать его дескриптор Handle для альтернативного ожидания. Приведем реализацию метода Wait:

function TGsvEvent.Wait(aThread: TGsvThread; aTimeout: Cardinal): Boolean;
var
  objs: array[0..1] of THandle;
  cnt:  Integer;
begin
  objs[0] := FHandle;
  cnt     := 1;
  if Assigned(aThread) then begin
    objs[1] := aThread.FTerminationEvent;
    cnt     := 2;
  end;
  Result := WaitForMultipleObjects(cnt,@objs[0],False,aTimeout) = WAIT_OBJECT_0;
end;

Если событие ожидает поток aThread, то кроме собственно события также будет ожидаться завершающее событие потока, которое необходимо для форсированного прекращения ожидания. Если поток не задан (aThread = nil), то метод Wait будет ожидать только своего события - это может пригодиться основному VCL-потоку.

Чтобы суммировать материал этого раздела, приведем реализацию очереди ограниченной длины, которую можно использовать для взаимодействия обменом сообщениями. Будем предполагать, что все сообщения являются объектами. Сообщения добавляются в конец очереди, а извлекаются из начала очереди. Исключение из правила - высокоприоритетное сообщение, которое помещается в начало очереди без учета ограничения на длину очереди.

Интерфейс очереди:

TGsvQueue = class
  private
    FGetEvent: TGsvEvent;
    FPutEvent: TGsvEvent;
    FLatch:    TGsvLatch;
    FList:     TList;
    FMaxCount: Integer;

    function  GetCount: Integer;
    procedure SetEvents;

  public
    constructor Create(aMaxCount: Integer);
    destructor  Destroy; override;

    function  Get(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): TObject;
    function  Put(aThread: TGsvThread; aMessage: TObject;
              aTimeout: Cardinal = INFINITE): Boolean;
    procedure PutOutOfTurn(aMessage: TObject);

    property  GetEvent: TGsvEvent read FGetEvent;
    property  PutEvent: TGsvEvent read FPutEvent;
    property  Count: Integer read GetCount;
    property  MaxCount: Integer read FMaxCount;
  end;

Рассмотрим подробнее метод Get:

function TGsvQueue.Get(aThread: TGsvThread; aTimeout: Cardinal): TObject;
begin
  Result := nil;
  if not FGetEvent.Wait(aThread, aTimeout) then
    Exit;
  FLatch.Lock;
  try
    if FList.Count <> 0 then begin
      Result := TObject(FList.Items[0]);
      FList.Delete(0);
      SetEvents;
    end;
  finally
    FLatch.Unlock;
  end;
end;

Поток, вызвавший метод Get, сразу переходит в состояние ожидания непустой очереди. Если очередь содержит хоть одно сообщение, то поток продолжит свое выполнение без ожидания. Первое сообщение извлекается из списка внутри критической секции, а затем производится переустановка событий:

procedure TGsvQueue.SetEvents;
begin
  FGetEvent.State := FList.Count <> 0;
  FPutEvent.State := FList.Count < FMaxCount;
end;

Обычно только один поток пишет в очередь и только один поток читает из нее. Если при асинхронном взаимодействии требуется обмен сообщениями, то в структуре данных записываемого сообщения указывается дескриптор потока-отправителя. Кроме взаимодействия один-к-одному, очередь поддерживает достаточно важный случай взаимодействия многие-к-одному, при котором многие потоки посылают данные одному потоку. Можно себе представить также ситуацию множественного чтения-записи, когда многие потоки посылают сообщения в одну очередь, и несколько потоков выбирают сообщения из очереди. Наиболее вероятна такая ситуация при наличии массива идентичных потоков-получателей.

Синхронное взаимодействие

Синхронное взаимодействие будем рассматривать только применительно к двум параллельным потокам. В конце раздела будет особо отмечен случай множественной синхронизации. Ограничимся только простым случаем парного синхронного взаимодействия - асимметричное рандеву. Суть этого способа состоит в том, что оба взаимодействующих потока подходят к точке синхронизации, обмениваются данными и затем продолжают работать параллельно и независимо. Если один из потоков подошел к точке синхронизации раньше, то он дожидается партнера. Асимметричность выражается в том, что потоки играют при встрече разные роли. Поток-отправитель несет отновную, активную, нагрузку, а поток-получатель более пассивен. Рандеву фактически означает совмещение синхронизации и обмена данными [2,3]. Предлагаемая реализация рандеву представляет собой объект-канал, по которому происходит взаимодействие. Обмен данными в этой реализации состоит в том, что поток-отправитель вызывает у потока-получателя некоторый метод, передавая данные как аргумент и принимая реакцию как результат:

TGsvChannelMethod = procedure(aThread: TGsvThread) of object;

 TGsvChannel = class
 private
   FSendEvent:     TGsvEvent;
   FReceiveEvent:  TGsvEvent;
   FReceiveThread: TGsvThread;
   FLatch:         TGsvLatch;
   FResult:        Boolean;

 public
   constructor Create;
   destructor  Destroy; override;

   function Send(aThread: TGsvThread; aMethod: TGsvChannelMethod;
             aTimeout: Cardinal = INFINITE): Boolean;
   function Receive(aThread: TGsvThread; aTimeout: Cardinal = INFINITE): Boolean;
 end;

Опуская достаточно тривиальную реализацю конструктора и деструктора, рассмотрим реализацию канала:

function TGsvChannel.Send(aThread: TGsvThread; aMethod: TGsvChannelMethod;
  aTimeout: Cardinal): Boolean;
begin
  Result  := False;
  FResult := False;
  if not FSendEvent.Wait(aThread, aTimeout) then
    Exit;
  FLatch.Lock;
  try
    if Assigned(FReceiveThread) then begin
      aMethod(FReceiveThread);     // обмен данными
      FReceiveEvent.State := True; // активизация получателя
      FResult             := True; // успешное рандеву
      Result              := FResult;
    end;
  finally
    FLatch.Unlock;
  end;
end;

function TGsvChannel.Receive(aThread: TGsvThread;
  aTimeout: Cardinal): Boolean;
begin
  FLatch.Lock;
  try
    FReceiveThread      := aThread;
    FReceiveEvent.State := False;   // сброс ожидающего события
    FSendEvent.State    := True;    // активизация отправителя
  finally
    FLatch.Unlock;
  end;
  FReceiveEvent.Wait(aThread, aTimeout);
  FLatch.Lock;
  try
    Result           := FResult;
    FResult          := False;      // приведение канала в исходное состояние
    FSendEvent.State := False;
    FReceiveThread   := nil;
  finally
    FLatch.Unlock;
  end;
end;

Обычно канал принадлежит потоку-получателю; поток-отправитель начинает взаимодействие, вызывая для канала метод Send и передавая себя как аргумент. Кроме того, функции Send передается еще один аргумент - адрес метода потока-отправителя, который будет вызван при успешном рандеву. При готовности к рандеву поток-получатель вызывает метод Receive, передавая себя как аргумент. Каждый из взаимодействующих потоков может ограничить время своего ожидания, указав таймаут. Обсудим возможные варианты взаимного поведения потоков:

  1. Отправитель вызвал функцию Send раньше, чем получатель вызвал Receive - в этом случае отправитель приостановится до момента, когда получатель вызовет Receive. В функции Receive получатель сохраняет в канале ссылку на себя, сбрасывает свое ожидающее сообщения, устанавливает ожидающее сообщение отправителя и переходит в состояние ожидания. Отправитель выходит из состояния ожидания и вызывает свой метод-обработчик, передавая ему ссылку на получателя. В этом методе происходит собственно обмен данными. После этого отправитель устанавливает флаг успешного рандеву и активизирует событие получателя. Получатель приводит события к начальному состоянию и завершает взаимодействие. Обе функции вернут True при успешном рандеву и False при неуспешном. Все действия по изменению состояния канала защищены критической секцией.
  2. Противоположная ситуация: получатель вызвал функцию Receive раньше, чем отправитель вызвал Send - в этом случае получатель подготавливает все события и переходит в состояние ожидания. Когда отправитель вызывает свой метод Send, то ожидающее событие уже установлено и отправитель без реального ожидания сразу начинает обмен данными. Далее все действия происходят как и в первом варианте.
  3. Отправитель начинает взаимодействие первым, но завершается по таймауту или завершающему событию потока. В этом случае состояние канала не изменяется.
  4. Получатель начинает взаимодействие первым, но завершается по таймауту или завершающему событию потока. В этом случае ожидающее событие отправителя сначала устанавливается, а затем сбрасывается, то есть, состояние канала остается неизменным.
  5. Ситуация аналогичная предыдущей, но отправитель начинает взаимодействие после того, как получатель вышел из состояния ожидания с ошибкой таймаута или завершения, но еще не начал завершающих действий. В этом случае рандеву будет успешным, так как отправитель сумеет выполнить всю обычную работу по взаимодействию. Вот почему результат функции ожидания получателя не учитывается и почему введен промежуточный флаг FResult.

В заключение отметим случай множественной синхронизации параллельных потоков. Иногда множественная синхронизация обозначается термином "барьер". Суть синхронизации состоит в том, что параллельные потоки приходят к точке синхронизации в разное время, а выходят из нее в одно и тоже время. Реализуется множественная синхронизация Windows-функцией WaitForMultipleObjects. Третий аргумент этой функции в данном случае должен быть True, то есть, функция будет ожидать прихода всех указанных событий. Обычно множественная синхронизация не связана с взаимодействием параллельных потоков, а используется для их привязки по времени.

Литература
  1. Ч. Хоар. Взаимодействующие последовательные процессы. Мир, 1989
  2. Р. Бар. Язык Ada в проектировании систем. Мир, 1988
  3. Г. Джоунз. Программирование на языке Оккам. Мир, 1989
Специально для Королевства Delphi


К материалу прилагаются файлы: