AMQP

Материал из Википедии — свободной энциклопедии
Перейти к навигации Перейти к поиску

AMQP (Advanced Message Queuing Protocol) — открытый протокол для передачи сообщений между компонентами системы. Основная идея состоит в том, что отдельные подсистемы (или независимые приложения) могут обмениваться произвольным образом сообщениями через AMQP-брокер, который осуществляет маршрутизацию, возможно гарантирует доставку, распределение потоков данных, подписку на нужные типы сообщений.

Архитектуру протокола разработал John O’Hara из банка JP Morgan Chase & Co[1].

Протокол[править | править код]

AMQP основан на трёх понятиях:

  1. Сообщение (message) — единица передаваемых данных, основная его часть (содержание) никак не интерпретируется сервером, к сообщению могут быть присоединены структурированные заголовки.
  2. Точка обмена (exchange) — в неё отправляются сообщения. Точка обмена распределяет сообщения в одну или несколько очередей. При этом в точке обмена сообщения не хранятся. Точки обмена бывают трёх типов:
    • fanout — сообщение передаётся во все прицепленные к ней очереди;
    • direct — сообщение передаётся в очередь с именем, совпадающим с ключом маршрутизации (routing key) (ключ маршрутизации указывается при отправке сообщения);
    • topic — нечто среднее между fanout и direct, сообщение передаётся в очереди, для которых совпадает маска на ключ маршрутизации, например, app.notification.sms.# — в очередь будут доставлены все сообщения, отправленные с ключами, начинающимися с app.notification.sms.
  3. Очередь (queue) — здесь хранятся сообщения до тех пор, пока не будут забраны клиентом. Клиент всегда забирает сообщения из одной или нескольких очередей.


Протокол можно разделить на два слоя:

  1. Functional Layer - определяет набор команд которые выполняют работу от имени приложения.
  2. Transport Layer - обслуживает запросы приложения к серверу и сервера к приложению, управляет мультиплексированием каналов, фреймингом, кодировкой, heart-beating, представлением данныx, работой с ошибками.


Примеры очередей:

  • store-and-forward-queue - хранит и распространяет сообщения для многих потребителей (consumers) по принципу round robin. Постоянные, публичные
  • private reply queue - хранит и распространяет сообщения для одного потребителя (consumer). Временные, приватные
  • private subscription queue - хранит сообщения от многих источников и отправляет одному потребителю (consumer). Временные, приватные

Протокол не ограничивается этими тремя видами. Они указаны как пример реализации.

Терминология[править | править код]

  • Exchange - сущность которая получает сообщения от приложений и при необходимости перенаправляет их в очереди сообщений.
  • Binding: отношение между очередью сообщений и точками обмена.
  • Routing key: виртуальный адрес который точка обмена использует для принятия решения о дальнейшей маршрутизации.

Exchange[править | править код]

Принимает сообщения от поставщика и направляет их в message queue в соответствии с предопределёнными критериями. Такие критерии называют bindings. Exchange - механизм согласования и маршрутизации сообщений. На основе сообщений и их параметров (bindings) принимают решение о перенаправлении в очередь или другой exchange. Не хранят сообщения.

Термин exchange означает алгоритм и экземпляр алгоритма. Также говорят exchange type и exchange instance.

AMQP определяет набор стандартных типов exchange. Приложения могут создавать свои exchange instance.


Каждый exchange реализует свой алгоритм маршрутизации. Существует несколько стандартных типов exchange, описанных в Functional Specification стандарта. Из них два являются важными:

  • Direct exchange - маршрутизирует на основе routing key. Базовый exchange - это direct exchange
  • topic exchange - маршрутизирует на основе шаблона маршрутизации.

Сервер создаст несколько exchange, включая direct и topic. Они будут иметь wellknown имена и клиентские приложения смогут работать с ними.

Жизненный цикл Exchange[править | править код]

Каждый AMQP-сервер предсоздаёт несколько экземпляров exchange. Эти экземпляры существуют когда сервер запущен и не могут быть уничтожены. AMQP-приложения могут также создавать свои собственные exchange. AMQP не использует для этого метод create, вместо этого экземпляр декларируется, который исходит из логики: "создать, если не создано, иначе продолжить". Можно сказать, что создание exchange идемпотентно. Вероятно приложения будут создавать exchange по мере необходимости, а затем уничтожать их за ненадобностью. AMQP предоставляет метод для уничтожения exchange.

Routing Key[править | править код]

В общем случае exchange проверяет свойства сообщения, поля заголовка и содержимое его тела и, используя эти и, возможно, данные из других источников, решает, как направить сообщение. В большинстве простых случаев exchange рассматривает одно ключевое поле, которое мы называем Routing Key. Routing Key - это виртуальный адрес, который сервер exchange может использовать для принятия решения о направлении сообщения. Для маршрутизации типа point-to-point ключом маршрутизации обычно является имя очереди сообщений. Для маршрутизации pub-sub ключ маршрутизации обычно является значением иерархии топика (topic - смотри publication/subscruber). В более сложных случаях ключ маршрутизации может быть объединен с маршрутизацией по полям заголовка сообщения и/или его содержанием

Message Queue (очередь сообщений)[править | править код]

Когда клиентское приложение создаёт очередь сообщений - оно может указать следующие свойства:

  • name - если не указано, сервер сам выбирает имя и отправляет его клиенту. Как правило, когда приложения совместно используют очередь сообщений они заранее договариваются об имени очереди сообщений, и когда приложение нуждается в очереди сообщений для своих собственных целей, оно позволяет серверу предоставлять имя.
  • exclusive - если этот параметр установлен, то очередь существует пока существует текущее соединение. Очередь удаляется при разрыве подключения.
  • durable - если установлен - очередь существует и активна при перезагрузке сервера. Очередь может потерять сообщения посланные во время перезагрузки сервера

Жизненный цикл сообщения[править | править код]

Сообщение AMQP состоит из набора свойств и непубличного содержимого. Новое сообщение создается producer с использованием клиентского API AMQP. producer добавляет контент в сообщение и, возможно, устанавливает некоторые свойства сообщения. producer маркирует сообщение с помощью маршрутной информации, которая внешне похожа на адрес, но может быть любой. Затем producer отправляет сообщение в exchange. Когда сообщение поступает на сервер, exchange (обычно) направляет его в набор очередей, которые также существуют на сервере. Если сообщение немаршрутизируемо, exchange может удалить его или вернуть приложению. producer сам решает как поступать с немаршрутизируемыми сообщениями.

Одно сообщение может существовать во многих очередях сообщений. Сервер может справиться с этим по-разному, например: копирование сообщения с помощью подсчета ссылок и т. д. Это не влияет на интероперабельность. Однако, когда сообщение направляется в несколько очередей сообщений, оно идентично в каждой очереди сообщений. Здесь нет уникального идентификатора, отличающего различные копии.

Когда сообщение поступает в очередь сообщений, она немедленно пытается передать его потребителю через AMQP. Если это невозможно, то сообщение хранится в очереди сообщений (в памяти или на диске по просьбе producer ) и ждет, пока consumer будет готов. Если отсутствует consumer, то очередь может вернуть сообщение producer через AMQP (опять же, если producer попросил об этом).

Когда очередь сообщений может доставить сообщение consumer, она удаляет сообщение из своего внутреннего хранилища. Это может произойти сразу же или после того, как consumer признает, что он успешно выполнил свою работу, обработал сообщение. consumer сам выбирает, как и когда сообщения будут "подтверждены". consumer также может отклонить сообщение (отрицательное подтверждение).

Сообщения producer и подтверждения consumer сгруппированы в транзакции. Когда приложение играет обе роли, что часто бывает, он выполняет смешанную работу: отправляет сообщения и отправляет подтверждения,а затем фиксация или откат транзакции.

Доставка сообщений от сервера к consumer не является транзакционной.

Producer[править | править код]

Producer - клиентское приложение, которое публикует сообщения в exchange.

По аналогии с устройством электронной почты, можно заметить, что producer не отправляет сообщения непосредственно в очередь (message queue). Иное поведение нарушило бы абстракцию в модели AMQ. Это было бы похоже на жизненный цикл сообщения электронной почты: разрешение электронной почты, обход таблиц маршрутизации MTA и попадание непосредственно в почтовый ящик. Это сделало бы невозможной вставку промежуточной фильтрации и обработки, например, обнаружение спама.

Модель AMQ использует тот же принцип, что и система электронной почты: все сообщения отправляются в одну точку exchange или MTA, который проверяет сообщения на основе правил и информации, которая скрыта от отправителя, и направляет их к точкам распространения, которые также скрыты от отправителя. (and routes them to drop-off points that are also hidden from the sender - здесь точки распространения это drop-off points из документации).

Consumer[править | править код]

Consumer - клиентское приложение которое получает сообщения из очереди сообщений.

Наша аналогия с электронной почтой начинает разрушаться, когда мы смотрим на consumer (получателей). Почтовые клиенты пассивны - они могут читать почтовые ящики, но они не оказывают никакого влияния на то, как эти почтовые ящики заполняются. С помощью AMQP consumer также может быть пассивным, как и почтовые клиенты. То есть мы можем написать приложение, которое прослушивает определённую очередь сообщений и просто обрабатывает поступающую информацию. При этом очередь сообщений должна быть готова до старта приложения и должна быть "привязана" к нему.

Также consumer имеет следующие возможности:

  • создавать/удалять очереди сообщений
  • определять способ заполнений очереди используя bindings
  • выбирать разные exchanges, что может полностью изменить семантику маршрутизации

Это как обладать почтовой системой, которая на уровне протокола может:

  • создать новый почтовый ящик
  • сказать MTA, что все сообщения с определённым заголовком должны быть скопированы в этот ящик
  • полностью менять то, как именно почтовая система интерпретирует адреса и другие заголовки сообщения.

Автоматический режим[править | править код]

Большинство интеграционных архитектур не нуждаются в таком уровне сложности. Большинство пользователей AMQP нуждаются в базовой функциональности из коробки. AMQP обеспечивает это следующим образом:

  • базовый (default) exchange для отправителей (producers) сообщений;
  • базовый binding для очередей сообщений, который сортирует сообщения на основе совпадения routing key и имени очереди сообщений

В результате базовый binding позволяет producer отправлять сообщения напрямую в очередь сообщений, таким образом он эмулирует простейшую схему отправки сообщения получателю, которую люди ожидают от традиционного middleware.

Базовый binding не препятствует использованию очереди сообщений в более сложных конструкциях. Он позволяет использовать AMQP без конкретного понимания работы механизмов binding и exchange.

AMQP Command Architecture[править | править код]

Секция описывает процесс взаимодействия приложения и сервера

Команды протокола (классы и методы)[править | править код]

Промежуточное программное обеспечение является сложным, и при разработке структуры протокола его создатели попытались укротить эту сложность. Их подход состоял в том, чтобы смоделировать традиционный API, основанный на классах, которые содержат методы, при этом каждый метод должен делать ровно одну вещь и делать это хорошо. Это приводит к большому набору команд, но тот, который есть относительно легко понять.

Команды AMQP сгруппированы в классы. Каждый класс охватывает определенную функциональную область. Некоторые классы являются необязательными - каждый одноранговый узел реализует классы, которые он должен поддерживать.

Существует два различных метода диалога:

  • Synchronous request-response, в котором один одноранговый узел отправляет запрос, а другой одноранговый узел отправляет ответ. Синхронные методы запроса и ответа используются для функциональных возможностей, которые не являются критичными для производительности.
  • Asynchronous notifications, в котором один одноранговый узел отправляет данные, но не ожидает ответа. Асинхронные методы используются там, где производительность имеет решающее значение.

Чтобы упростить обработку метода, мы определяем отдельные ответы для каждого синхронного запроса. То есть, один метод не используется для ответа на два разных запроса. Это означает, что одноранговый узел, отправляя синхронный запрос, может принимать и обрабатывать входящие методы до получения одного из допустимых синхронных ответов. Это отличает AMQP от более традиционных протоколов RPC.

Метод формально определяется как синхронный запрос, синхронный ответ (на конкретный запрос) или асинхронный. Наконец, каждый метод формально определяется как клиентская сторона (т. е. сервер-клиент) или серверная сторона (клиент-сервер).

Сопоставление AMQP с API промежуточного программного обеспечения[править | править код]

AMQP разработан так, чтобы он был сопоставим с API промежуточного программного обеспечения. Процесс сопоставления в какой-то степени интеллектуальный т.е. понимает, что не все методы, и не все аргументы имеют смысл для приложения, но он также является механическим, т.е. установив определённые правила, все методы могут быть сопоставлены без ручного вмешательства.

Преимущества этого заключаются в том, что, изучив семантику AMQP, разработчики найдут ту же самую семантику, предоставленную в любой среде, которую они используют.

Пример метода Queue.Declare:

Queue.Declare
    queue=my.queue
    auto-delete=TRUE
    exclusive=FALSE

Он может быть преобразован в сетевой кадр:

+--------+---------+----------+-----------+-----------+
 | Queue | Declare | my.queue |         1 |         0 |
+--------+---------+----------+-----------+-----------+
   class    method    name      auto-delete  exclusive

Или в метод API высокого уровня

Queue.Declare ("my.queue", TRUE, FALSE);

Логика сопоставления асинхронного метода на псевдокоде:

send method to server

Логика сопоставления синхронного метода на псевдокоде:

send request method to server
repeat
 wait for response from server
 if response is an asynchronous method
 process method (usually, delivered or returned content)
 else
 assert that method is a valid response for request
 exit repeat
 end-if
end-repeat

Стоит отметить, что для большинства приложений промежуточное программное обеспечение может быть полностью скрыто в технических слоях системы, и что фактически используемый API имеет меньшее значение, чем тот факт, что промежуточное программное обеспечение является надежным и функциональным.

Отсутствие уведомлений[править | править код]

Болтливый протокол работает медленно. Мы активно используем асинхронность в тех случаях, когда возникает проблема производительности. Это обычно то место, где мы отправляем контент от одного однорангового узла к другому. Мы отправляем методы как можно быстрее, не дожидаясь подтверждений. Там, где это необходимо, мы реализуем windowing и throttling на более высоком уровне например, на уровне потребителя.

Протокол обходится без уведомлений, т.к. реализует assertion model для всех событий. Либо оно успешно, либо выбрасывается исключение? которое закрывает канал или соединение.

В AMQP нет уведомлений. Успешное событие - молчаливо, провальное - заявляет о себе. Когда приложению нужно явное отслеживание успехов и провалов, оно должно использовать транзакции.

Класс Connection[править | править код]

Соединение реализовано, чтобы быть долговечным и обрабатывать множество каналов.

Жизненный цикл соединения[править | править код]

  • Клиент открывает соединение TCP/IP к серверу и отправляет заголовок протокола. Это единственная доступная для отправки клиентом информация, которая не форматирована как метод.
  • Сервер отвечает версией протокола и другими свойствами, включая список механизмов безопасности которые он поддерживает (the Start method)
  • Клиент выбирает механизм обеспечения безопасности (Start-Ok).
  • Сервер инициирует процесс аутентификации который использует модель SASL, он отправляет клиенту challenge (Secure).
  • Клиент отправляет authentication response (Secure-Ok). Например, используя механизм аутентификации 'plain' response, содержит имя и пароль.
  • Сервер повторяет challenge (Secure) или переходит к переговорам, отправляя набор параметров, в числе которых maximum frame size (Tune).
  • Клиент принимает или занижает эти параметры (Tune-Ok).
  • Клиент формально открывает соединение и выбирает виртуальный хост (Open).
  • Сервер подтверждает выбор виртуального хоста (Open-Ok).
  • Теперь клиент использует соединение по своему усмотрению.
  • Один узел (клиент или сервер) закрывает соединение (Close).
  • Другой узел отправляет данные о закрытии подключения (Close-Ok).
  • Сервер и клиент закрывают соответствующие соединению сокеты.

Обмен информацией для ошибок неполностью открытых соединений не производится. Узел, который обнаружил ошибку, должен закрыть сокет без дополнительных уведомлений.

Класс Channel[править | править код]

AMQP-это многоканальный протокол. Каналы обеспечивают возможность мультиплексирования тяжелого TCP/IP-соединения в несколько легких соединений. Это делает протокол более "дружественным к брандмауэру", поскольку использование портов предсказуемо. Это также означает, что формирование трафика и другие функции QoS сети могут быть легко использованы.

Каналы независимы друг от друга и могут выполнять различные функции одновременно с другими каналами, при этом доступная полоса пропускания разделяется между конкурентными задачами.

Ожидается и поощряется, что многопоточные клиентские приложения могут часто использовать модель “channel-per-thread” для удобства разработки. Однако открытие нескольких соединений с одним или несколькими серверами AMQP от одного клиента также вполне приемлемо. Жизненный цикл канала таков:

  • Клиент открывает новый канал (Open)
  • Сервер подтверждает открытие канала (Open-Ok)
  • Клиент и сервер используют канал по своему усмотрению
  • Один из узлов (клиент или сервер) закрывает канал (Close)
  • Другой узел подтверждает закрытие канала (Close-Ok)

Класс Exchange[править | править код]

Позволяет приложению управлять экземплярами exchange на сервере. Этот класс позволяет приложению писать свой собственный сценарий обработки сообщений, не полагаясь на какую-либо конфигурацию.

Примечание: большинство приложений не нуждаются в таком уровне сложности, и устаревшее промежуточное программное обеспечение вряд ли сможет поддерживать эту семантику.

Жизненный цикл Exchange[править | править код]

  • Клиент просит сервер убедиться, что exchange существует (Declare). Клиент может уточнить это следующим образом: " создайте exchange, если он не существует "или" предупредите меня, но не создавайте его, если он не существует".
  • Клиент публикует сообщения в exchange
  • Клиент может решить удалить exchange (Delete)

Класс Queue[править | править код]

Класс queue позволяет приложению управлять очередями сообщений на сервере. Это основной шаг почти во всех приложениях, которые получают сообщения, по крайней мере для проверки того, что ожидаемая очередь сообщений действительно существует.


Жизненный цикл очереди[править | править код]

Протокол предусматривает два жизненных цикла очереди:

  • Durable message queues - используются несколькими потребителями и существуют независимо от наличия потребителей которые могли бы принимать сообщения
  • Temporary message queues - приватные очереди для конкретного потребителя. Очередь удаляется при отсутствии потребителей.


Жизненный цикл для durable message queue[править | править код]

  • Клиент объявляет очередь сообщений (Declare с аргументом "passive")
  • Сервер подтверждает существование очереди (Declare-Ok)
  • Клиент читает сообщения из очереди

Жизненный цикл для temporary message queues[править | править код]

  • Клиент создаёт очередь сообщений (Declare часто без имени очереди, так что сервер сам даст ей имя). Сервер подтверждает создание (Declare-Ok)
  • Клиент инициализирует consumer для созданной очереди.
  • Клиент останавливает consumer либо явно, либо путем закрытия канала и / или соединения
  • Когда последний consumer исчезает из очереди сообщений и после вежливого тайм-аута сервер удаляет очередь сообщений

AMQP реализует механизм подписок на темы в виде очередей сообщений. Это позволяет создавать интересные структуры, в которых подписка может быть сбалансирована по нагрузке между пулом совместно работающих абонентских приложений.

Жизненный цикл подписки[править | править код]

  • Клиент создаёт очередь сообщений (Declare), сервер подтверждает (Declare-Ok)
  • Клиент сопоставляет очередь сообщений с exchange темы (Bind) и сервер подтверждает сопоставление (Bind-Ok)
  • Клиент использует очередь сообщений так как описано выше

Класс Basic[править | править код]

Базовый класс реализует возможности обмена сообщениями, описанные в этой спецификации. Он поддерживает следующую семантику:

  • Отправка сообщений от клиента - серверу которая происходит асинхронно (Publish)
  • Запуск и остановка consumers (Consume, Cancel)
  • Отправка сообщений с сервера на клиент, которая происходит асинхронно (Deliver, Return)
  • Подтверждение сообщений (Ack, Reject)
  • Получение сообщений из очереди синхронным способом (Get)

Класс Transaction[править | править код]

AMQP поддерживает два вида транзакций:

  1. Автоматические транзакции, в которых каждое опубликованное сообщение и подтверждение обрабатывается как автономная транзакция.
  2. Локальные серверные транзакции, в которых сервер заносит в буфер опубликованные сообщения и подтверждения и фиксирует (commit) их по требованию клиента.

Класс Transaction (“tx”) даёт приложениям доступ ко второму типу транзакций, локальным транзакциям сервера. Семантика класса следующая:

  1. Приложение запрашивает серверные транзакции в каждом канале в котором оно хочет получить такие транзакции (Select)
  2. Приложение выполняет работу (Publish, Ack)
  3. Приложение выполняет commit или rollback работы (Commit, Roll-back)
  4. Приложение продолжает работу

Транзакции занимаются публикацией контента и подтвеждениями, не доставкой. Таким образом rollback не помещает в очередь повторно и не инициирует повторную доставку. Клиент может подтвердить эти сообщения в следующей транзакции.

Транспортная архитектура AMQP[править | править код]

В этом разделе объясняется, как команды сопоставляются с wire-level протоколом.

Описание[править | править код]

AMQP - это двоичный протокол. Информация организуется в фреймы различных типов. Фреймы содержат методы протокола и другую информацию. Все кадры имеют один и тот же общий формат: заголовок кадра, полезная нагрузка и конец кадра. Формат полезной нагрузки кадра зависит от типа кадра.

На транспортном уровне предполагается использование стека TCP/IP или аналогов.

В пределах одного сокетного соединения может существовать несколько независимых потоков управления, называемых каналами. Каждый кадр пронумерован номером канала. Чередуя свои кадры, различные каналы совместно используют это соединение. Для любого данного канала кадры выполняются в строгой последовательности, которая может использоваться для управления анализатором протоколов (обычно это state machine).

Мы строим фреймы, используя небольшой набор типов данных, таких как биты, целые числа, строки и таблицы полей. Поля фрейма плотно упакованы, не делая их медленными или сложными для разбора. Относительно просто создать слой кадрирования механически из спецификаций протокола.

Wire-level форматирование разработано таким образом, чтобы быть масштабируемым и достаточно универсальным для использования в произвольных протоколах высокого уровня (а не только в AMQP). Мы предполагаем, что AMQP будет расширяться, улучшаться и иным образом изменяться с течением времени, и wire-level формат будет поддерживать это.

Типы данных[править | править код]

Типы данных AMQP используемые в фреймах:

  • Целые числа Integers (от 1 до 8 октетов), используются для представления размеров, величины, лимиты и т.д. Целые числа всегда беззнаковые и могут быть невыровнены в кадре.
  • Биты
  • Короткие строки, используемые для хранения свойств короткого текста. Короткие строки ограничены 255 октетами и могут быть проанализированы без риска переполнения буфера. (Я подозреваю, что речь об одном октете в 255 состояний а не о 255 октетах)
  • Длинные строки, используемые для хранения частей двоичных данных
  • Поля таблиц, содержащие пары имя-значение. Значения полей вводятся в виде строк, целых чисел и т. д

Согласование протокола[править | править код]

Клиент и сервер согласовывают протокол. Это означает, что при подключении клиента сервер предлагает определенные параметры, которые клиент может принять или изменить. Когда оба соглашаются с результатом, соединение считается установленным. Согласование - это полезно, потому что оно позволяет устанавливать предварительные настройки соединения.

Согласование происходит по ряду аспектов:

  • Актуальный протокол и его версия. Сервер МОЖЕТ обрабатывать множество протоколов на одном порту.
  • Аргументы шифрования и аутентификация обеих сторон. Это часть функционального слоя протокола.
  • Максимальный размер кадра, количество каналов и другие эксплуатационные ограничения

Согласованные лимиты могут позволить обеим сторонам предварительно распределить ключевые буферы, избегая тупиковых ситуаций. Каждый входящий кадр либо подчиняется согласованным ограничениям и поэтому является безопасным, либо превышает их, в этом случае другая сторона неисправна и должна быть отключена. Это очень хорошо согласуется с философией AMQP "это либо работает должным образом, либо не работает вообще".

Оба узла согласовывают ограничения до самого низкого согласованного значения следующим образом:

  • Сервер ДОЛЖЕН сказать клиенту какие лимиты он предлагает
  • Клиент отвечает и МОЖЕТ уменьшить лимиты для соединения

Разграничение кадров[править | править код]

Стек TCP/IP - работает с потоками, в нём нет встроенного механизма разграничения кадров. Существующие протоколы решают эту проблему несколькими различными способами:

  • Отправка одного кадра на соединение. Это просто но медленно
  • Добавление разграничения кадров в поток. Это просто но делает парсинг медленным
  • Подсчет размера кадров и отправка размера перед каждым кадром. Это просто и быстро, и этот подход реализован в AMQP.


Кадр в деталях[править | править код]

Все кадры состоят из заголовка (7 октетов), полезной нагрузки произвольного размера и октета "конец кадра", который обнаруживает искаженные кадры:

 0      1         3             7                size+7 size+8
 +------+---------+-------------+ +------------+ +-----------+
 | type | channel |        size | |    payload | | frame-end |
 +------+---------+-------------+ +------------+ +-----------+
   octet   short    long            size octets     octet

Чтение кадра просходит следующим образом:

  1. Чтение заголовка и проверка типа кадра и канала
  2. В зависимости от типа кадра, происходит чтение данных из payload и их обработка
  3. Чтение frame-end.

В реалистичных реализациях, где речь идет о производительности, мы будем использовать“read-ahead buffering” или “gathering reads”, чтобы избежать выполнения трех отдельных системных вызовов для чтения кадра.

Method Frames[править | править код]

Фреймы методов несут команды протокола высокого уровня (которые мы называем "методами"). Один кадр метода несет одну команду. Полезная нагрузка фрейма метода имеет такой формат:

0          2           4
+----------+-----------+-------------- - -
| class-id | method-id | arguments...
+----------+-----------+-------------- - -
   short       short       ...

Фрейм метода обрабатывается следующим образом:

1. Чтение payload фрейма метода.

2. Его распаковка в структуру. Данный метод всегда имеет одну и ту же структуру, поэтому можно быстро распаковать его

3. Проверка того, что этот метод разрешен в текущем контексте.

4. Проверка того, что аргументы метода являются допустимыми.

5. Выполнение этого метода.

Тело фрейма метода строится как список полей данных AMQP (биты, целые числа, строки и таблицы строк). Код маршалинга тривиально генерируется непосредственно из спецификаций протокола и может быть очень быстрым

Content Frames[править | править код]

Контент - это данные приложения, которые мы переносим от клиента к клиенту через сервер AMQP. Содержание - это, грубо говоря, набор свойств плюс двоичная часть данных. Набор разрешенных свойств определяется базовым классом, и они образуют "фрейм заголовка содержимого". Данные могут быть любого размера и разбиты на несколько (или много) блоков, каждый из которых образует "каркас тела содержимого".

Смотря на кадры для определенного канала, в момент передачи по проводам, мы можем увидеть что-то вроде этого:

[method]
[method] [header] [body] [body
[method]
...

Некоторые методы (например, Basic.Publish, Basic.Deliver и т. д.) формально определяются как несущие содержание. Когда одноранговый узел отправляет такой method frame, он всегда следует за ним с заголовком содержимого и с несколькими фреймами тела содержимого или без таковых. Заголовок content frame имеет следующий формат:

0          2        4           12               14
+----------+--------+-----------+----------------+------------- - -
| class-id | weight | body size | property flags | property list...
+----------+--------+-----------+----------------+------------- - -
    short     short    long long     short           remainder...

Мы помещаем тело контента в отдельные фреймы (а не включаем его в метод), чтобы AMQP мог поддерживать методы "нулевого копирования", в которых контент никогда не маршалируется или не кодируется. Мы помещаем свойства содержимого в их собственный фрейм, чтобы получатели могли выборочно отбрасывать содержимое, которое они не хотят обрабатывать.

Heartbeat Frames[править | править код]

Heartbeat - это метод, предназначенный для отмены одной из особенностей TCP/IP, а именно его способности восстанавливаться после разорванного физического соединения, закрываясь только после довольно длительного тайм-аута. В некоторых сценариях нам нужно очень быстро узнать, отключен ли одноранговый узел или он не отвечает по другим причинам (например, он зацикливается). Поскольку сердцебиение может быть сделано на низком уровне, мы реализуем это как особый тип фрейма, которым обмениваются узлы на транспортном уровне, а не как метод класса.

Обработка ошибок[править | править код]

AMQP использует исключения для обработки ошибок. Любая операционная ошибка (очередь сообщений не найдена, недостаточные права доступа и т. д.) приводит к срабатыванию исключения канала. Любая структурная ошибка (неверный аргумент, плохая последовательность методов и т. д.) приводит к исключению соединения. Исключение закрывает канал или соединение и возвращает код ответа и текст ответа клиентскому приложению. Мы используем 3-значный код ответа плюс текстовую схему текста ответа, которая используется в HTTP и многих других протоколах.


Закрытие каналов и соединений[править | править код]

Соединение или канал считается "открытым" для клиента, когда он отправил Open, и для сервера, когда он отправил Open-Ok. С этого момента одноранговый узел, который хочет закрыть канал или соединение, должен сделать это с помощью протокола рукопожатия, который описан здесь.

Закрытие канала или соединения по любой причине - нормальной или исключительной - должно быть сделано осторожно. Резкое закрытие не всегда обнаруживается быстро, и после исключения мы можем потерять коды ответа на ошибку. Правильная конструкция состоит в том, чтобы вручную согласовать закрытие таким образом, чтобы закрыть канал/соединение только после того, как мы будем уверены в том, что другая сторона знает о ситуации.

Когда одноранговый узел решает закрыть канал или соединение,он посылает метод Close. Принимающий узел должен ответить на закрытие с помощью Close-Ok, а затем обе стороны могут закрыть свой канал или соединение. Обратите внимание, что если одноранговые узлы игнорируют Close, может произойти взаимоблокировка в случае если оба одноранговых узла отправляют Close одновременно.


AMQP Client Architecture[править | править код]

Можно читать и записывать кадры AMQP непосредственно из приложения, но это было бы плохим дизайном. Даже самый простой диалог AMQP гораздо сложнее, чем, скажем, HTTP, и разработчикам приложений не нужно понимать такие вещи, как двоичные форматы фрейминга, чтобы отправить сообщение в очередь сообщений. Рекомендуемая архитектура клиента AMQP состоит из нескольких уровней абстракции:

  1. Fraiming Layer - принимает методы протокола AMQP в некотором языковом формате (структуры, классы и т. д.) и сериализует их как wire-level кадры. Слой кадрирования может быть механически сгенерирован из спецификаций AMQP (которые определены на языке моделирования протокола, реализованы в XML и специально разработаны для AMQP).
  2. connection manager layer - считывает и записывает кадры AMQP и управляет общей логикой соединения и сеанса. В этом слое мы можем инкапсулировать полную логику открытия соединения и сессии, обработку ошибок, передачу и прием контента и так далее. Большие части этого слоя могут быть изготовлены автоматически из спецификаций AMQP. Например, спецификации определяют, какие методы несут контент, поэтому логика "отправить метод, а затем необязательно отправить контент" может быть создана механически.
  3. API Layer - предоставляет определенный API для работы с приложениями. Уровень API может отражать некоторый существующий стандарт или может предоставлять высокоуровневые методы AMQP, создавая сопоставление, как описано ранее. Методы AMQP предназначены для того, чтобы сделать это сопоставление простым и полезным. Уровень API сам по себе может состоять из нескольких слоев, например, API более высокого уровня, построенного поверх API метода AMQP.

Кроме того, обычно существует какой-то уровень ввода-вывода, который может быть очень простым (Синхронное чтение и запись сокета) или сложным (полностью асинхронный многопоточный ввод-вывод). На этой диаграмме показана общая рекомендуемая архитектура:

     +------------------------+
     |      Application       |
     +-----------+------------+
                 |
     +------------------------+
 +---|      API Layer         |----Client API Layer-----+
 |   +-----------+------------+                         |
 |               |                                      |
 |   +------------------------+    +---------------+    |
 |   | Connection Manager     +----+  Framing Layer|    |
 |   +-----------+------------+    +---------------+    |
 |               |                                      |
 |   +------------------------+                         |
 +---| Asynchronous I/O Layer |-------------------------+
     +-----------+------------+
                 |
              -------
      - - - - Network - - - -
              -------

В этом документе, когда мы говорим о "клиентском API", мы имеем в виду все слои ниже приложения (i/o, фрейминг, диспетчер соединений и API-уровни. Обычно мы говорим о "клиентском API" и "приложении" как о двух отдельных вещах, где приложение использует клиентский API для общения с сервером промежуточного программного обеспечения.

Functional Specification[править | править код]

Server Functional Specification[править | править код]

Messages and Content[править | править код]

Сообщение - это атомарная единица обработки системы маршрутизации и массового обслуживания промежуточного программного обеспечения. Сообщения содержат содержимое, которое состоит из заголовка содержимого, содержащего набор свойств, и тела содержимого, содержащего непрозрачный блок двоичных данных.

Сообщение может соответствовать многим различным сущностям приложения:

  • Сообщение уровня приложения
  • Отправляемый файл
  • Кадр потока данных

Сообщения могут быть постоянными. Постоянное сообщение надежно хранится на диске и гарантированно доставляется даже в случае серьезного сбоя сети, сбоя сервера, переполнения и т. д.

Сообщения могут иметь приоритет. Сообщение с высоким приоритетом отправляется раньше, чем сообщения с более низким приоритетом, ожидающие в той же очереди сообщений. Когда сообщения должны быть отброшены для поддержания определенного уровня качества обслуживания, сервер сначала отбрасывает сообщения с низким приоритетом.

Сервер не должен изменять тела содержимого сообщений, которые он получает и передает потребительским приложениям. Сервер может добавлять информацию в заголовки контента, но он не должен удалять или изменять существующую информацию.

Виртуальные хосты[править | править код]

Виртуальный хост - это раздел данных внутри сервера, это административное удобство, которое окажется полезным для тех, кто хочет предоставить AMQP в качестве сервиса на общей инфраструктуре.

Виртуальный хост содержит собственное пространство имен, набор обменов, очереди сообщений и все связанные с ними объекты. Каждое соединение должно быть связано с одним виртуальным хостом.

Клиент выбирает виртуальный хост в методе Connection.Open, после проверки подлинности. Это означает, что схема аутентификации сервера является общей для всех виртуальных узлов на этом сервере. Однако используемая схема авторизации может быть уникальной для каждого виртуального хоста. Это должно быть полезно для инфраструктуры общего хостинга. Администраторы, которым требуются разные схемы проверки подлинности для каждого виртуального хоста, должны использовать отдельные серверы

Все каналы внутри соединения работают с одним и тем же виртуальным хостом. Нет никакого способа связаться с другим виртуальным хостом на том же самом соединении, и нет никакого способа переключиться на другой виртуальный хост, не разрывая соединение и не начиная заново.

Протокол не предлагает никаких механизмов для создания или настройки виртуальных хостов - это делается неопределенным образом внутри сервера и полностью зависит от реализации.

Exchanges[править | править код]

Exchange - это агент маршрутизации сообщений внутри виртуального хоста. Экземпляр exchange (который мы обычно называем "обменом") принимает сообщения и информацию о маршрутизации - главным образом ключ маршрутизации - и либо передает сообщения в очереди сообщений, либо во внутренние службы. Обмены именуются на основе каждого виртуального хоста.

Приложения могут свободно создавать, совместно использовать и уничтожать экземпляры exchange в пределах своих полномочий.

Обмены могут быть постоянными, временными или автоматически удаляемыми. Постоянные обмены существуют до тех пор, пока они не будут удалены. Временные обмены существуют до тех пор, пока сервер не выключится. Автоматически удаляемые обмены существуют до тех пор, пока их не перестанут использовать.

Сервер предоставляет определенный набор типов exchange. Каждый тип обмена реализует определенное соответствие и алгоритм, как определено в следующем разделе. AMQP предписывает небольшое количество типов обмена и рекомендует еще несколько. Кроме того, каждая серверная реализация может добавлять свои собственные типы exchange.

Обмен может направлять одно сообщение во множество очередей сообщений параллельно. Это создает несколько экземпляров сообщения, которые потребляются независимо друг от друга.

The Direct Exchange Type[править | править код]

direct exchange type работает следующим образом:

  1. Очередь сообщений сопоставляется с сервером exchange с помощью ключа маршрутизации K.
  2. Издатель отправляет exchange сообщение с ключом маршрутизации R.
  3. Сообщение передается в очередь сообщений, если K = R.

Сервер должен реализовать echange типа direct и ДОЛЖЕН предопределить в каждом виртуальном хосте, как минимум два direct exchange: одну с именем amqp.direct и вторую без публичного имени которая служит как exchange по умолчанию для обработки публичных методов.

Обратите внимание, что очереди сообщений могут связываться с использованием любого допустимого значения ключа маршрутизации, но чаще всего очереди сообщений будут связываться с использованием собственного имени в качестве ключа маршрутизации.

В частности, все очереди сообщений ДОЛЖНЫ быть автоматически привязаны к exchange без публичного имени, используя имя очереди сообщений в качестве ключа маршрутизации.

The Fanout Exchange Type[править | править код]

Fanout Exchange работает следующим образом:

  1. Очередь сообщений привязывается к серверу exchange без каких-либо аргументов.
  2. publisher отправляет сообщение в exchange.
  3. Сообщение передается в очередь сообщений безоговорочно.

Fanout Exchange является тривиальным для проектирования и реализации. Этот тип обмена и предварительно объявленное имя amq.fanout , являются обязательными.

The Topic Exchange Type[править | править код]

Topic Exchange работает следующим образом:

  1. Очередь сообщений привязывается к серверу exchange с помощью шаблона маршрутизации P.
  2. publisher отправляет exchange сообщение с ключом маршрутизации R.
  3. Сообщение передается в очередь сообщений, если R совпадает с P.

Ключ маршрутизации, используемый для Topic Exchange, должен состоять из слов, разделенных точками. Минимальный размер слова - 0 символов. Каждое слово может содержать буквы A-Z и a-z, а также цифры 0-9.

Шаблон маршрутизации следует тем же правилам, что и ключ маршрутизации, с добавлением, что знак * соответствует одному слову, а знак # соответствует нулю или более слов. Таким образом, схема маршрутизации *.stock.# соответствует ключам маршрутизации usd.stock и eur.stock.db, но не stock.nasdaq.

Одна из предложенных схем для Topic Exchange состоит в том, чтобы хранить набор всех известных ключей маршрутизации и обновлять его, когда publishers используют новые ключи маршрутизации. Можно определить все привязки для данного ключа маршрутизации и таким образом быстро найти очереди сообщений для сообщения. Этот тип обмена является необязательным.

Сервер должен реализовать тип topic exchange, и в этом случае сервер должен предварительно объявить в каждом виртуальном хосте по крайней мере одну тему exchange с именем amq.topic.

The Headers Exchange Type[править | править код]

headers exchange type работает следующим образом:

  1. Очередь сообщений привязывается к exchange с таблицей аргументов, содержащих заголовки, которые должны быть сопоставлены для этой привязки, и, возможно, значения, которые они должны содержать. Ключ маршрутизации не используется.
  2. publisher отправляет сообщение в exchange, где свойство headers содержит таблицу имен и значений.
  3. Сообщение передается в очередь, если свойство headers соответствует аргументам, с которыми была связана очередь.

Алгоритм сопоставления управляется специальным аргументом привязки, передаваемым в качестве пары имя-значение в таблице аргументов. Этот аргумент называется "X-match". Он может принимать одно из двух значений, диктуя, как обрабатываются остальные пары значений имен в таблице во время сопоставления:

  • 'all' подразумевает, что все остальные пары должны соответствовать свойству headers сообщения, чтобы это сообщение было перенаправлено (AND)
  • 'any' подразумевает, что сообщение должно быть перенаправлено, если какое-либо из полей в свойстве headers совпадает с одним из полей в таблице аргументов (OR)

Поле в аргументах привязки соответствует полю в сообщении, если выполняется следующее условие: либо поле в аргументах привязки не имеет значения и поле с таким же именем присутствует в заголовках сообщений, либо если поле в аргументах привязки имеет значение и поле с таким же именем существует в заголовках сообщений и имеет такое же значение.

Любое поле, начинающееся с 'x -', кроме 'X-match', зарезервировано для дальнейшего использования и будет проигнорировано

Сервер должен реализовать Headers Exchange Type, при этом сервер должен предварительно объявить в каждом виртуальном хосте по крайней мере один Headers Exchange Type с именем amq.match.

The System Exchange Type[править | править код]

The System Exchange Type работает следующим образом:

  1. publisher отправляет в exchange сообщение с ключом маршрутизации S.
  2. The system exchange отправляет его в системный сервис S.

Системные сервисы начинающиеся с "amq." зарезервированны для AMQP. Все остальные имена могут быть использованы. Данный тип exchange необязательный.

Пользовательские типы Exchange[править | править код]

Имена всех пользовательских типов exchange должны начинаться с "x -". Типы Exchange, которые не начинаются с "x -", зарезервированы для дальнейшего использования в стандарте AMQP.

Очереди сообщений[править | править код]

Очередь сообщений - это именованный буфер FIFO, который содержит сообщения от приложений. Приложения могут свободно создавать, обмениваться, использовать и уничтожать очереди сообщений в пределах своих полномочий.

Обратите внимание, что при наличии нескольких считывателей из очереди, или клиентских транзакций, или использовании полей приоритета, или использовании селекторов сообщений, или оптимизации доставки для конкретной реализации очередь может не иметь истинных характеристик FIFO. Единственный способ гарантировать FIFO - это иметь только одного потребителя, подключенного к очереди. В этих случаях очередь может быть описана как “слабая-FIFO".

Очереди сообщений могут быть постоянными, временными или автоматически удаляемыми. Постоянные очереди сообщений существуют до тех пор, пока не будут удалены. Временные очереди сообщений существуют до тех пор, пока сервер не завершит работу. Очереди автоматически удаляемых сообщений длятся до тех пор, пока они больше не будут использоваться.

Очереди сообщений хранят свои сообщения в памяти, на диске или в какой-то их комбинации. Очереди сообщений именуются на основе виртуального хоста.

Очереди сообщений содержат сообщения и распределяют их между одним или несколькими клиентами-потребителями. Сообщение, направленное в очередь сообщений, никогда не отправляется более чем одному клиенту, если только оно не было отклонено

Одна очередь сообщений может содержать различные типы содержимого одновременно и независимо друг от друга. То есть, если основное и файловое содержимое отправляются в одну и ту же очередь сообщений, они будут доставляться потребляющим приложениям независимо по запросу.

Bindings[править | править код]

Binding - это связь между очередью сообщений и обменом данными. Привязка определяет аргументы маршрутизации, которые сообщают exchange, какие сообщения должна получать очередь. Приложения создают и уничтожают bindings по мере необходимости, чтобы направить поток сообщений в свои очереди сообщений. Продолжительность жизни binding зависит от очередей сообщений, для которых они определены - когда очередь сообщений уничтожается, ее binding также уничтожается. Специфическая семантика метода Queue.Bind зависит от типа обмена.

Consumers - потребители[править | править код]

Мы используем термин consumer для обозначения как клиентского приложения, так и сущности, управляющей тем, как конкретное клиентское приложение получает сообщения из очереди сообщений. Когда клиент "запускает потребителя", он создает сущность потребителя на сервере. Когда клиент "отменяет потребителя", он уничтожает сущность потребителя на сервере. Потребители принадлежат к одному клиентскому каналу и заставляют очередь сообщений асинхронно отправлять сообщения клиенту.

Quality of Service[править | править код]

Качество обслуживания определяет скорость отправки сообщений. Качество обслуживания зависит от типа распространяемого контента. В общем случае качество обслуживания использует концепцию "предварительной выборки", чтобы указать, сколько сообщений или сколько октетов данных будет отправлено до того, как клиент подтвердит сообщение. Цель состоит в том, чтобы отправить данные сообщения заранее, чтобы уменьшить задержку.

Acknowledgements[править | править код]

Acknowledgment - это формальный сигнал от клиентского приложения к очереди сообщений о том, что оно успешно обработало сообщение. Существует две возможные модели подтверждения:

  1. Automatic - в котором сервер удаляет содержимое из очереди сообщений сразу же после его доставки в приложение (с помощью методов Deliver или Get-Ok).
  2. Explicit - в котором клиентское приложение должно отправлять метод Ack для каждого сообщения или пакета сообщений, которые оно обработало

Клиентские уровни могут сами реализовывать явные подтверждения различными способами, например, сразу после получения сообщения или когда приложение указывает, что оно его обработало. Эти различия не влияют на AMQP или интероперабельность.

Flow Control[править | править код]

Управление потоком - это аварийная процедура, используемая для остановки потока сообщений от однорангового узла. Он работает одинаковым образом между клиентом и сервером и реализуется коммандой Channel.Flow. Flow Control - это единственный механизм, который может остановить чрезмерно производящего publisher. Consumer может использовать более элегантный механизм предварительной выборки окон, если он использует подтверждения сообщений (Acknowledgements) (что обычно означает использование транзакций).

Соглашение о наименовании[править | править код]

Эти соглашения регулируют именование сущностей AMQP. Сервер и клиент должны соблюдать эти соглашения:

  • Пользовательские exchange должны начинаться с префикса "x-"
  • Стандартные экземпляры exchange должны начинаться с префикса "amq."
  • Стандартные системные сервисы должны начинаться с префикса "amq."
  • Стандартные очереди сообщений должны начинаться с префикса "amq."

Спецификация команд AMQP (Classes & Methods)[править | править код]

Примечания[править | править код]

Методы AMQP могут определять минимальные значения (например, количество потребителей в очереди сообщений) по соображениям совместимости. Эти минимумы определяются в описании каждого класса.

Соответствующие реализации AMQP должны реализовывать достаточно щедрые значения для таких полей, минимумы предназначены только для использования на наименее способных платформах.

Грамматики используют эту нотацию:

  • 'S:' указывает на данные или метод, отправленные с сервера клиенту;
  • 'C:' указывает на данные или метод, отправленные с клиента на сервер;
  • +условие или +(...) выражение означает "1 или более экземпляров";
  • *условие или *(...) выражение означает "ноль или более экземпляров".

Мы определяем методы как:

  • синхронный запрос ("syn request"). Отправляющий узел должен ждать определенного метода ответа, но может реализовать это асинхронно;
  • синхронный ответ ("syn reply for XYZ");
  • асинхронный запрос или ответ ("async")

Техническая спецификация[править | править код]

Номера портов определённые IANA[править | править код]

Стандартный номер порта AMQP был назначен IANA как 5672 как для TCP, так и для UDP. Порт UDP зарезервирован для использования в будущих мультикастовых реализациях

Wire-level формат для AMQP[править | править код]

Официальная грамматика протокола[править | править код]

Мы предоставляем полную грамматику для AMQP (это предоставляется для справки, и вам может быть интереснее перейти к следующим разделам, которые подробно описывают различные типы фреймов и их форматы):

amqp = protocol-header *amqp-unit

protocol-header = literal-AMQP protocol-id protocol-version
literal-AMQP = %d65.77.81.80 ; "AMQP"
protocol-id = %d0 ; Must be 0
protocol-version = %d0.9.1 ; 0-9-1

method = method-frame [ content ]
method-frame = %d1 frame-properties method-payload frame-end
frame-properties = channel payload-size
channel = short-uint ; Non-zero
payload-size = long-uint
method-payload = class-id method-id *amqp-field
class-id = %x00.01-%xFF.FF
method-id = %x00.01-%xFF.FF
amqp-field = BIT / OCTET
            / short-uint / long-uint / long-long-uint
            / short-string / long-string
            / timestamp
            / field-table
short-uint = 2*OCTET
long-uint = 4*OCTET
long-long-uint = 8*OCTET
short-string = OCTET *string-char ; length + content
string-char = %x01 .. %xFF
long-string = long-uint *OCTET ; length + content
timestamp = long-long-uint ; 64-bit POSIX
field-table = long-uint *field-value-pair
field-value-pair = field-name field-value
field-name = short-string
field-value = 't' boolean
            / 'b' short-short-int
            / 'B' short-short-uint
            / 'U' short-int
            / 'u' short-uint
            / 'I' long-int
            / 'i' long-uint
            / 'L' long-long-int
            / 'l' long-long-uint
            / 'f' float
            / 'd' double
            / 'D' decimal-value
            / 's' short-string
            / 'S' long-string
            / 'A' field-array
            / 'T' timestamp
            / 'F' field-table
            / 'V' ; no field
boolean = OCTET ; 0 = FALSE, else TRUE
short-short-int = OCTET
short-short-uint = OCTET
short-int = 2*OCTET
long-int = 4*OCTET
long-long-int = 8*OCTET
float = 4*OCTET ; IEEE-754
double = 8*OCTET ; rfc1832 XDR double
decimal-value = scale long-uint
scale = OCTET ; number of decimal digits
field-array = long-int *field-value ; array of values
frame-end = %xCE
content = %d2 content-header *content-body
content-header = frame-properties header-payload frame-end
header-payload = content-class content-weight content-body-size
 property-flags property-list
content-class = OCTET
content-weight = %x00
content-body-size = long-long-uint
property-flags = 15*BIT %b0 / 15*BIT %b1 property-flags
property-list = *amqp-field
content-body = %d3 frame-properties body-payload frame-end
body-payload = *OCTET

heartbeat = %d8 %d0 %d0 frame-end


Мы используем расширенный синтаксис BNF, определенный в IETF RFC 2234. В заключение:

  • Название правила - это просто само название
  • Терминалы задаются одним или несколькими числовыми символами с базовой интерпретацией этих символов, обозначенных как " d " или "x"
  • Правило может определять простую упорядоченную строку значений путем перечисления последовательности имен правил
  • Диапазон альтернативных числовых значений можно задать компактно, используя тире ( " - " ) для указания диапазона
  • Элементы, заключенные в круглые скобки, рассматриваются как единый элемент, содержимое которого строго упорядочено.
  • Элементы, разделенные прямой косой чертой ( " / " ), являются альтернативами.
  • Оператор *, предшествующий элементу, указывает на повторение. Полная форма: "<a>*< b>элемент", где <a> и <b> являются необязательными десятичными значениями, как минимум <a> и как максимум <b> вхождений элемента.

Заголовок протокола[править | править код]

Клиент должен запустить новое соединение, отправив заголовок протокола. Это 8-октетная последовательность:

+---+---+---+---+---+---+---+---+
|'A'|'M'|'Q'|'P'| 0 | 0 | 9 | 1 |
+---+---+---+---+---+---+---+---+
 8 octets

Заголовок протокола состоит из прописных букв "AMQP", за которыми следует константа %d0 затем:

  1. Основная версия протокола, используемая в соответствии с разделом 1.4.2. (оф. документации версии 0-9-1)
  2. Второстепенная версия протокола, используемая в соответствии с разделом 1.4.2. (оф. документации версии 0-9-1)
  3. Пересмотр протокола, используемый в соответствии с разделом 1.4.2. (оф. документации версии 0-9-1)

Модель согласования протоколов совместима с существующими протоколами, такими как HTTP, которые инициируют соединение с постоянной текстовой строкой, и с брандмауэрами, которые отслеживают начало протокола, чтобы решить, какие правила к нему применить.

Клиент и сервер договариваются о протоколе и версии следующим образом:

  • Клиент открывает новое соединение сокета с сервером AMQP и отправляет заголовок протокола.
  • Сервер либо принимает, либо отклоняет заголовок протокола. Если он отклоняет заголовок протокола, то записывает допустимый заголовок протокола в сокет и затем закрывает сокет.
  • В противном случае он оставляет сокет открытым и реализует протокол соответствующим образом.

Пример:

Client sends:   Server responds:
AMQP%d0.0.9.1   Connection.Start method
AMQP%d0.1.0.0   AMQP%d0.0.9.1<Close connection>
HTTP            AMQP%d0.0.9.1<Close connection>

Принципы для реализации протокола:

  • Сервер может принимать протоколы, отличные от AMQP, такие как HTTP
  • Если сервер не распознает первые 5 октетов данных в сокете или не поддерживает конкретную версию протокола, которую запрашивает клиент, он должен написать допустимый заголовок протокола в сокет, затем очистить (flush) сокет (чтобы гарантировать, что клиентское приложение получит данные) и затем закрыть соединение с сокетом. Сервер может распечатать диагностическое сообщение для облегчения отладки.
  • Клиент может определить версию протокола сервера, попытавшись подключиться к его самой высокой поддерживаемой версии и повторно подключиться к более низкой версии, если он получает такую информацию обратно от сервера.
  • Клиенты и серверы, реализующие несколько версий AMQP, должны использовать все восемь октетов заголовка протокола для идентификации протокола.


Основной формат кадра[править | править код]

Все кадры начинаются с 7-октетного заголовка, состоящего из поля типа (октет), поля канала (короткое целое число) и поля размера (длинное целое число):

0      1         3         7                size+7     size+8
+------+---------+---------+ +-------------+ +-----------+
| type | channel |    size | |     payload | | frame-end |
+------+---------+---------+ +-------------+ +-----------+
 octet    short      long      'size' octets     octet

AMQP определяет следующие типы фреймов:

  • Type = 1, "METHOD": method frame.
  • Type = 2, "HEADER": content header frame
  • Type = 3, "BODY": content body frame.
  • Type = 4, "HEARTBEAT": heartbeat frame.

Номер канала равен 0 для всех кадров, которые являются глобальными для соединения, и 1-65535 для кадров, которые ссылаются на определенные каналы.

Поле size - это размер полезной нагрузки, исключая октет конца кадра. В то время как AMQP предполагает надежный подключенный протокол, мы используем конец кадра для обнаружения ошибок кадрирования, вызванных неправильными реализациями клиента или сервера.

Принципы для реализации протокола:

  • Октет конца кадра всегда должен иметь шестнадцатеричное значение %xCE.
  • Если одноранговый узел получает кадр с типом, который не является одним из этих определенных типов, он должен рассматривать это как фатальную ошибку протокола и закрыть соединение без отправки каких-либо дополнительных данных о нем
  • Когда одноранговый узел читает кадр, он должен проверить, что конец кадра допустим, прежде чем пытаться декодировать кадр. Если конец кадра не является допустимым, он должен рассматривать это как фатальную ошибку протокола и закрыть соединение без отправки каких-либо дополнительных данных о нем. Он должен регистрировать информацию о проблеме, так как это указывает на ошибку в реализации кода фрейминга сервера или клиента.
  • Одноранговый узел не должен отправлять кадры, превышающие согласованный размер. Одноранговый узел, принимающий слишком большой кадр, должен сигнализировать об исключении соединения с кодом ответа 501 (ошибка кадра).
  • Номер канала должен быть равен нулю для всех heartbeat кадров, а также для фреймов метода, заголовка и тела, которые ссылаются на класс Connection. Одноранговый узел, который получает ненулевой номер канала для одного из этих кадров, должен сигнализировать об исключении соединения с кодом ответа 503 (команда недопустима).

Полезная нагрузка метода (Method Payloads)[править | править код]

Тела фреймов метода состоят из инвариантного списка полей данных, называемых "аргументами". Все тела методов начинаются с идентификаторов класса и метода:

0          2           4
+----------+-----------+-------------- - -
| class-id | method-id | arguments...
+----------+-----------+-------------- - -
   short        short   ...

Принципы для реализации протокола:

  • Идентификатор класса и идентификатор метода - это константы, определенные в спецификациях класса и метода AMQP.
  • Аргументы представляют собой набор полей AMQP, специфичных для каждого метода
  • Идентификатор класса значения от %x00.01 до %xEF.FF зарезервированы для стандартных классов AMQP.
  • ID классов от %xF0.00 до %xFF.FF (%d61440-%d65535) могут использоваться при реализации для нестандартных классов расширений.

Поля данных AMQP[править | править код]

AMQP имеет два уровня спецификации полей данных: собственные поля данных, используемые для аргументов метода, и поля данных, передаваемые между приложениями в таблицах полей. Таблицы полей содержат Надстрочный набор собственных полей данных.

Integers[править | править код]

AMQP определяет следующие нативные типы целого числа:

  • Unsigned octet (8 bits).
  • Unsigned short integers (16 bits).
  • Unsigned long integers (32 bits).
  • Unsigned long long integers (64 bits).

Целые числа и длины строк всегда беззнаковы и хранятся в сетевом порядке байтов. Мы не пытаемся оптимизировать случай, когда две низко-высокие системы (например, два процессора Intel) разговаривают друг с другом.

Принципы для реализации протокола:

  • Разработчики не должны предполагать, что целые числа, закодированные во фрейме, выровнены по границам слов памяти.
Bits[править | править код]

AMQP определяет собственный тип битового поля. Биты накапливаются в целые октеты. Когда два или более бита соприкасаются в кадре, они будут упакованы в один или несколько октетов, начиная с нижнего бита в каждом октете. Нет никакого требования, чтобы все битовые значения в кадре были смежными, но это обычно делается для минимизации размеров кадра.

Strings[править | править код]

Строки AMQP имеют переменную длину и представлены целочисленной длиной, за которой следует ноль или более октетов данных. AMQP определяет два собственных типа строк:

  • Короткие строки, хранящиеся в виде 8-битной целочисленной длины без знака, за которой следует ноль или более октетов данных. Короткие строки могут содержать до 255 октетов данных UTF-8, но не могут содержать двоичные нулевые октеты.
  • Длинные строки, хранящиеся в виде 32-разрядной целочисленной длины без знака, за которой следует ноль или более октетов данных. Длинные строки могут содержать любые данные
Timestamps[править | править код]

Временные метки хранятся в 64-битном формате POSIX time_t с точностью до одной секунды. Используя 64 бита, мы избегаем будущих проблем обертывания, связанных с 31-битными и 32-битными значениями time_t.

Поля таблиц[править | править код]

Поля таблиц - это длинные строки, содержащие упакованные пары имя-значение. Пары значений name кодируются в виде короткой строки, определяющей имя, и октета, определяющего тип значений, а затем само значение. Допустимые типы полей для таблиц являются продолжением собственных типов integer, bit, string и timestamp и показаны в грамматике. Целочисленные поля с несколькими октетами всегда хранятся в сетевом порядке байтов.

Принципы для реализации протокола:

  • Имена полей должны начинаться с буквы " $ " или " # "и могут продолжаться буквами" $ "или"#", цифрами или подчеркиванием до максимальной длины 128 символов.
  • Сервер должен проверить имена полей и при получении недопустимого имени Поля он должен сигнализировать об исключении соединения с кодом ответа 503 (синтаксическая ошибка).
  • Десятичные значения предназначены не для поддержки значений с плавающей запятой, а для поддержки бизнес-значений с фиксированной запятой, таких как курсы валют и суммы. Они кодируются в виде октета, представляющего собой количество мест, за которыми следует длинное целое число со знаком. Октет "decimals" - not signed.
  • Дубликаты полей являются незаконными. Поведение однорангового узла по отношению к таблице, содержащей повторяющиеся поля, не определено.

Кадрирование контента[править | править код]

Определенные специфические методы (Publish, Deliver и т. д.) обрабатывают контент. Пожалуйста, обратитесь к главе ""Functional Specifications" для получения спецификаций каждого метода. Методы, которые обрабатывают контент, делают это безусловно.

Контент состоит из списка из 1 или более фреймов следующим образом:

  1. Ровно один кадр заголовка содержимого, который предоставляет свойства для содержимого.
  2. Опционально, один или несколько фреймов тела контента

Кадры контента на определенном канале строго последовательны. То есть они могут быть смешаны с кадрами для других каналов, но никакие два кадра контента из одного канала не могут быть смешаны и не могут "перекрывать" друг друга, а также кадры контента для одного контента не могут быть смешаны с кадрами метода на одном канале. (ориг. Content frames on a specific channel are strictly sequential. That is, they may be mixed with frames for other channels, but no two content frames from the same channel may be mixed or overlapped, nor may content frames for a single content be mixed with method frames on the same channel.)

Обратите внимание, что любой фрейм, не относящийся к контенту, явно отмечает конец содержимого. Хотя размер контента хорошо известен из заголовка контента (а следовательно, и количество фреймов контента), это позволяет отправителю прервать отправку контента без необходимости закрывать канал.

Принципы для реализации протокола:

  • Одноранговый узел, получающий неполное или плохо отформатированное содержимое, должен вызвать исключение соединения с кодом ответа 505 (неожиданный кадр). Это включает в себя отсутствующие заголовки контента, неправильные идентификаторы классов в заголовках контента, отсутствующие фреймы тела контента и т. д.
Заголовок контента[править | править код]

Заголовок полезной нагрузки контента имеет следующий формат:

0          2        4           12               14
+----------+--------+-----------+----------------+------------- - -
| class-id | weight | body size | property flags | property list...
+----------+--------+-----------+----------------+------------- - -
   short      short   long long        short         remainder...

Принципы для реализации протокола:

  • Идентификатор класса должен совпадать с идентификатором класса фрейма метода. Одноранговый узел должен ответить на недопустимый идентификатор класса, вызвав исключение соединения с кодом ответа 501 (ошибка кадра).
  • Поле веса не используется и должно быть равно нулю.
  • Размер тела - это 64-разрядное значение, определяющее общий размер тела содержимого, то есть сумму размеров тела для следующих фреймов тела содержимого. Ноль указывает на отсутствие фреймов тела содержимого.
  • Флаги свойств представляют собой массив битов, которые указывают на наличие или отсутствие каждого значения свойства в последовательности. Биты упорядочены от самых высоких до самых низких. Бит 15 указывает на первое свойство.
  • Флаги свойств могут указывать более 16 свойств. Если установлен последний бит (0), это означает, что за ним следует еще одно поле флагов свойств. Существует множество полей флагов свойств.
  • Значения свойств являются специфичными для класса полями данных AMQP.
  • Битовые свойства обозначаются только соответствующим флагом свойства (1 или 0) и никогда не присутствуют в списке свойств.
  • Номер канала в кадрах контента не должен быть равен нулю. Одноранговый узел, получающий нулевой номер канала в кадре содержимого, должен сигнализировать об исключении соединения с кодом ответа 504 (ошибка канала).
Тело контента[править | править код]

Полезная нагрузка тела контента - это "непрозрачный" бинарный блок заканчивающийся frame end октетом:

+-----------------------+ +-----------+
| Opaque binary payload | | frame-end |
+-----------------------+ +-----------+

Тело содержимого может быть разделено на столько кадров, сколько необходимо. Максимальный размер полезной нагрузки кадра согласовывается обоими одноранговыми узлами во время согласования соединения.

Принципы для реализации протокола:

  • Одноранговый узел должен обрабатывать тело содержимого, которое разбивается на несколько фреймов, сохраняя эти фреймы как единый набор и либо повторно передавая их как есть, разбивая на меньшие фреймы, либо объединяя в один блок для доставки в приложение.

Heartbeat frames[править | править код]

Heartbeat frames говорят получателю, что отправитель все еще жив. Частота и время Heartbeat frames согласовываются во время настройки соединения.

Принципы для реализации протокола:

  • Heartbeat frames должны иметь номер канала, равный нулю. Одноранговый узел, получающий недопустимый кадр Heartbeat frame, должен вызвать исключение соединения с кодом ответа 501 (ошибка кадра).
  • Если одноранговый узел не поддерживает Heartbeat, он должен отбросить Heartbeat frame, не сигнализируя о какой-либо ошибке или неисправности.
  • Клиент должен начать посылать Heartbeat после получения метода Connection.Tune и начать следить за Heartbeat после получения Connection.Open. Сервер должен начать отправлять и мониторить Heartbeat после получения Connection.Tune-Ok
  • Узел должен приложить максимум усилий, чтобы посылать Heartbeat через определенные интервалы. Heartbeat может быть послано в любое время. Любой отправленный октет является допустимой заменой Heartbeat, таким образом, Heartbeat должны быть отправлены только в том случае, если трафик AMQP без Heartbeat не отправляется дольше одного интервала Heartbeat. Если одноранговый узел не обнаруживает входящего трафика (т. е. принятых октетов) в течение двух или более интервалов Heartbeat, он должен закрыть соединение, не вызывая Connection.Close/Close-Ok handshaking,и логировать ошибку
  • Heartbeat должно продолжаться до тех пор, сокет не будет закрыт, в том числе во время и после Connection.Close/Close-Ok handshaking

Channel Multiplexing[править | править код]

AMQP позволяет одноранговым узлам создавать несколько независимых потоков управления. Каждый канал действует как виртуальное соединение, которое совместно использует один сокет:

   frames      frames     frames       frames
+-----------+-----------+-----------+-----------+
| channel   | channel   | channel   | channel   |
+-----------+-----------+-----------+-----------+
|                     socket                    |
+-----------------------------------------------+

Принципы для реализации протокола:

  • Одноранговый узел AMQP МОЖЕТ поддерживать несколько каналов. Максимальное количество каналов определяется при согласовании соединения, и одноранговый узел может согласовать это количество до 1.
  • Каждый одноранговый узел ДОЛЖЕН сбалансировать трафик на всех открытых каналах справедливым образом. Эта балансировка может быть выполнена на основе каждого кадра или на основе количества трафика на канал. Одноранговый узел НЕ ДОЛЖЕН позволять одному очень занятому каналу ограничивать прогресс менее занятого канала.

Гарантия видимости[править | править код]

Сервер должен убедиться в том, что клиентские наблюдения о состоянии сервера являются согласованными.

В следующем примере показано, что означает наблюдение клиента в данном контексте:

  • Клиент 1 и клиент 2 подключены к одному виртуальному хосту
  • Клиент 1 декларирует очередь
  • Клиент 1 получает Declare.Ok
  • Клиент 1 говорит клиенту 2 об этом
  • Клиент 2 делает пассивную декларацию той же очереди

Гарантия видимости гарантирует то, что клиент 2 видит очередь

Закрытие канала[править | править код]

Сервер будет считать канал закрытым если произойдёт что-то из следующего:

  • Либо одноранговый узел закрывает канал, либо родительское соединение используя Close/Close-Ok handshake
  • Либо одноранговый узел вызывает исключение на канале, либо родительское соединение.
  • Либо узел закрывает родителькое соединение без Close/Close-Ok handshaking

Когда сервер закрывает канал, все неподтвержденные сообщения на канале помечаются для повторной доставки. Когда сервер закрывает соединение, он удаляет все авто-удаляемые сущности принадлежащие этому соединению.

Синхронизация контента[править | править код]

В некоторых случаях синхронные методы request-response оказывают влияние на асинхронную доставку контента по одному и тому же каналу, в том числе:

  • Методы Basic.Consume и Basic.Cancel который стартуют и останавливают поток сообщений из очереди сообщений
  • метод Basic.Recover который запрашивает повторную доставку сообщений в канал
  • методы Queue.Bind, Queue.Unbind, and Queue.Purge которые влияют на поток сообщений направленный в очередь сообщений

Принципы для реализации протокола:

  • Эффекты request-response не должны быть видны на канале до метода response и должны быть видны после него.

Гарантия упорядоченности контента[править | править код]

Порядок прохождения методов через канал стабилен: методы принимаются в том же порядке, в каком они отправляются. На транспортном уровне это обеспечивается протоколом TCP/IP. Кроме того, содержимое стабильно обрабатывается сервером. В частности, содержимое, проходящее по одному пути внутри сервера, будет оставаться упорядоченным. Для содержимого заданного приоритета, проходящего через один путь, мы определяем путь обработки содержимого как состоящий из одного входящего канала, одного exchange, одной очереди и одного исходящего канала.

Принципы для реализации протокола:

  • Сервер должен сохранять порядок содержимого, проходящего через один путь обработки содержимого, если только поле повторной доставки не было изменено в методах Basic.Deliver или Basic.Get-Ok и в соответствии с правилами, определяющими условия, при которых это поле может быть установлено.

Обработка ошибок[править | править код]

Исключения[править | править код]

Используя стандартную модель программирования "exceptions", AMQP не сигнализирует об успехе, а только о неудаче. AMQP определяет два уровня исключений:

  1. Исключения каналов. Они закрывают канал, который вызвал ошибку. Исключения каналов обычно происходят из-за "мягких" ошибок, которые не влияют на остальную часть приложения.
  2. Исключения соединений. Они закрывают соединение сокета и обычно происходят из-за "жестких" ошибок, которые указывают на ошибку программирования, плохую конфигурацию или другой случай, требующий вмешательства.

Мы формально документируем утверждения в определении каждого класса и метода.

Формат кода ответа[править | править код]

Коды ответов AMQP соответствуют определению "Reply Code Severities and Theory" из IETF RFC 2821.

Реализации[править | править код]

Особенности протокола AMQP[править | править код]

  • Строки в AMQP - регистро-зависимые
  • Соглашение о версионировании - номер версии состоит из двух или трёх цифр: major.minor.revision При этом revision указывать необязательно. Числа могут принимать значения от 0 до 99. Числа от 100 и выше зарезервированны для внутреннего использования. Версия 1.1 эквивалентна версии 1.1.0

Примечания[править | править код]

Литература[править | править код]

  • Emrah Ayanoglu; Yusuf Aytaş; Dotan Nahum. Mastering RabbitMQ. — Packt Publishing, 2016. — 286 с. — ISBN 978-1-78398-153-3.

Ссылки[править | править код]