azure.storage.queue.aio package

class azure.storage.queue.aio.QueueClient(account_url, queue_name, credential=None, **kwargs)[source]

A client to interact with a specific Queue.

Parameters
  • account_url (str) – The URL to the storage account. In order to create a client given the full URI to the queue, use the from_queue_url() classmethod.

  • queue_name (str) – The name of the queue.

  • credential – The credentials with which to authenticate. This is optional if the account URL already has a SAS token. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity.

Keyword Arguments
  • api_version (str) – The Storage API version to use for requests. Default value is ‘2019-07-07’. Setting to an older version may result in reduced feature compatibility.

  • secondary_hostname (str) – The hostname of the secondary endpoint.

  • message_encode_policy – The encoding policy to use on outgoing messages. Default is not to encode messages. Other options include TextBase64EncodePolicy, BinaryBase64EncodePolicy or None.

  • message_decode_policy – The decoding policy to use on incoming messages. Default value is not to decode messages. Other options include TextBase64DecodePolicy, BinaryBase64DecodePolicy or None.

Example:

Create the queue client with url and credential.
token_auth_queue = QueueClient.from_queue_url(
    queue_url=queue.url,
    credential=sas_token
)
Create the queue client with a connection string.
from azure.storage.queue.aio import QueueClient
queue = QueueClient.from_connection_string(self.connection_string, "myqueue1")
async clear_messages(**kwargs)[source]

Deletes all messages from the specified queue.

Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Example:

Clears all messages.
await queue.clear_messages()
async close()

This method is to close the sockets opened by the client. It need not be used when using with a context manager.

async create_queue(**kwargs)[source]

Creates a new queue in the storage account.

If a queue with the same name already exists, the operation fails with a ResourceExistsError.

Keyword Arguments
  • metadata (dict(str,str)) – A dict containing name-value pairs to associate with the queue as metadata. Note that metadata names preserve the case with which they were created, but are case-insensitive when set or read.

  • timeout (int) – The server timeout, expressed in seconds.

Returns

None or the result of cls(response)

Return type

None

Raises

StorageErrorException

Example:

Create a queue.
await queue.create_queue()
async delete_message(message, pop_receipt=None, **kwargs)[source]

Deletes the specified message.

Normally after a client retrieves a message with the receive messages operation, the client is expected to process and delete the message. To delete the message, you must have the message object itself, or two items of data: id and pop_receipt. The id is returned from the previous receive_messages operation. The pop_receipt is returned from the most recent receive_messages() or update_message() operation. In order for the delete_message operation to succeed, the pop_receipt specified on the request must match the pop_receipt returned from the receive_messages() or update_message() operation.

Parameters
Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Example:

Delete a message.
# Get the message at the front of the queue
messages = queue.receive_messages()
async for msg in messages:
    # Delete the specified message
    await queue.delete_message(msg)
async delete_queue(**kwargs)[source]

Deletes the specified queue and any messages it contains.

When a queue is successfully deleted, it is immediately marked for deletion and is no longer accessible to clients. The queue is later removed from the Queue service during garbage collection.

Note that deleting a queue is likely to take at least 40 seconds to complete. If an operation is attempted against the queue while it was being deleted, an HttpResponseError will be thrown.

Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Return type

None

Example:

Delete a queue.
await queue.delete_queue()
classmethod from_connection_string(conn_str, queue_name, credential=None, **kwargs)[source]

Create QueueClient from a Connection String.

Parameters
  • conn_str (str) – A connection string to an Azure Storage account.

  • queue_name (str) – The queue name.

  • credential – The credentials with which to authenticate. This is optional if the account URL already has a SAS token, or the connection string already has shared access key values. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity.

Returns

A queue client.

Return type

QueueClient

Example:

Create the queue client from connection string.
from azure.storage.queue import QueueClient
queue = QueueClient.from_connection_string(self.connection_string, "myqueue1")
classmethod from_queue_url(queue_url, credential=None, **kwargs)[source]

A client to interact with a specific Queue.

Parameters
  • queue_url (str) – The full URI to the queue, including SAS token if used.

  • credential – The credentials with which to authenticate. This is optional if the account URL already has a SAS token. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity.

Returns

A queue client.

Return type

QueueClient

async get_queue_access_policy(**kwargs)[source]

Returns details about any stored access policies specified on the queue that may be used with Shared Access Signatures.

Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Returns

A dictionary of access policies associated with the queue.

Return type

dict(str, AccessPolicy)

async get_queue_properties(**kwargs)[source]

Returns all user-defined metadata for the specified queue.

The data returned does not include the queue’s list of messages.

Keyword Arguments

timeout (int) – The timeout parameter is expressed in seconds.

Returns

User-defined metadata for the queue.

Return type

QueueProperties

Example:

Get the properties on the queue.
properties = await queue.get_queue_properties()
async peek_messages(max_messages=None, **kwargs)[source]

Retrieves one or more messages from the front of the queue, but does not alter the visibility of the message.

Only messages that are visible may be retrieved. When a message is retrieved for the first time with a call to receive_messages(), its dequeue_count property is set to 1. If it is not deleted and is subsequently retrieved again, the dequeue_count property is incremented. The client may use this value to determine how many times a message has been retrieved. Note that a call to peek_messages does not increment the value of dequeue_count, but returns this value for the client to read.

If the key-encryption-key or resolver field is set on the local service object, the messages will be decrypted before being returned.

Parameters

max_messages (int) – A nonzero integer value that specifies the number of messages to peek from the queue, up to a maximum of 32. By default, a single message is peeked from the queue with this operation.

Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Returns

A list of QueueMessage objects. Note that next_visible_on and pop_receipt will not be populated as peek does not pop the message and can only retrieve already visible messages.

Return type

list(QueueMessage)

Example:

Peek messages.
# Peek at one message at the front of the queue
msg = await queue.peek_messages()

# Peek at the last 5 messages
messages = await queue.peek_messages(max_messages=5)

# Print the last 5 messages
for message in messages:
    print(message.content)
receive_messages(**kwargs)[source]

Removes one or more messages from the front of the queue.

When a message is retrieved from the queue, the response includes the message content and a pop_receipt value, which is required to delete the message. The message is not automatically deleted from the queue, but after it has been retrieved, it is not visible to other clients for the time interval specified by the visibility_timeout parameter.

If the key-encryption-key or resolver field is set on the local service object, the messages will be decrypted before being returned.

Keyword Arguments
  • messages_per_page (int) – A nonzero integer value that specifies the number of messages to retrieve from the queue, up to a maximum of 32. If fewer are visible, the visible messages are returned. By default, a single message is retrieved from the queue with this operation.

  • visibility_timeout (int) – If not specified, the default value is 0. Specifies the new visibility timeout value, in seconds, relative to server time. The value must be larger than or equal to 0, and cannot be larger than 7 days. The visibility timeout of a message cannot be set to a value later than the expiry time. visibility_timeout should be set to a value smaller than the time-to-live value.

  • timeout (int) – The server timeout, expressed in seconds.

Returns

Returns a message iterator of dict-like Message objects.

Return type

AsyncItemPaged[QueueMessage]

Example:

Receive messages from the queue.
# Receive messages one-by-one
messages = queue.receive_messages()
async for msg in messages:
    print(msg.content)

# Receive messages by batch
messages = queue.receive_messages(messages_per_page=5)
async for msg_batch in messages.by_page():
    for msg in msg_batch:
        print(msg.content)
        await queue.delete_message(msg)
async send_message(content, **kwargs)[source]

Adds a new message to the back of the message queue.

The visibility timeout specifies the time that the message will be invisible. After the timeout expires, the message will become visible. If a visibility timeout is not specified, the default value of 0 is used.

The message time-to-live specifies how long a message will remain in the queue. The message will be deleted from the queue when the time-to-live period expires.

If the key-encryption-key field is set on the local service object, this method will encrypt the content before uploading.

Parameters

content (obj) – Message content. Allowed type is determined by the encode_function set on the service. Default is str. The encoded message can be up to 64KB in size.

Keyword Arguments
  • visibility_timeout (int) – If not specified, the default value is 0. Specifies the new visibility timeout value, in seconds, relative to server time. The value must be larger than or equal to 0, and cannot be larger than 7 days. The visibility timeout of a message cannot be set to a value later than the expiry time. visibility_timeout should be set to a value smaller than the time-to-live value.

  • time_to_live (int) – Specifies the time-to-live interval for the message, in seconds. The time-to-live may be any positive number or -1 for infinity. If this parameter is omitted, the default time-to-live is 7 days.

  • timeout (int) – The server timeout, expressed in seconds.

Returns

A QueueMessage object. This object is also populated with the content although it is not returned from the service.

Return type

QueueMessage

Example:

Send messages.
await asyncio.gather(
    queue.send_message(u"message1"),
    queue.send_message(u"message2", visibility_timeout=30),  # wait 30s before becoming visible
    queue.send_message(u"message3"),
    queue.send_message(u"message4"),
    queue.send_message(u"message5")
)
async set_queue_access_policy(signed_identifiers, **kwargs)[source]

Sets stored access policies for the queue that may be used with Shared Access Signatures.

When you set permissions for a queue, the existing permissions are replaced. To update the queue’s permissions, call get_queue_access_policy() to fetch all access policies associated with the queue, modify the access policy that you wish to change, and then call this function with the complete set of data to perform the update.

When you establish a stored access policy on a queue, it may take up to 30 seconds to take effect. During this interval, a shared access signature that is associated with the stored access policy will throw an HttpResponseError until the access policy becomes active.

Parameters

signed_identifiers (dict(str, AccessPolicy)) – SignedIdentifier access policies to associate with the queue. This may contain up to 5 elements. An empty dict will clear the access policies set on the service.

Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Example:

Set an access policy on the queue.
# Create an access policy
from azure.storage.queue import AccessPolicy, QueueSasPermissions
access_policy = AccessPolicy()
access_policy.start = datetime.utcnow() - timedelta(hours=1)
access_policy.expiry = datetime.utcnow() + timedelta(hours=1)
access_policy.permission = QueueSasPermissions(read=True)
identifiers = {'my-access-policy-id': access_policy}

# Set the access policy
await queue.set_queue_access_policy(identifiers)
async set_queue_metadata(metadata=None, **kwargs)[source]

Sets user-defined metadata on the specified queue.

Metadata is associated with the queue as name-value pairs.

Parameters

metadata (dict(str, str)) – A dict containing name-value pairs to associate with the queue as metadata.

Keyword Arguments

timeout (int) – The server timeout, expressed in seconds.

Example:

Set metadata on the queue.
metadata = {'foo': 'val1', 'bar': 'val2', 'baz': 'val3'}
await queue.set_queue_metadata(metadata=metadata)
async update_message(message, pop_receipt=None, content=None, **kwargs)[source]

Updates the visibility timeout of a message. You can also use this operation to update the contents of a message.

This operation can be used to continually extend the invisibility of a queue message. This functionality can be useful if you want a worker role to “lease” a queue message. For example, if a worker role calls receive_messages() and recognizes that it needs more time to process a message, it can continually extend the message’s invisibility until it is processed. If the worker role were to fail during processing, eventually the message would become visible again and another worker role could process it.

If the key-encryption-key field is set on the local service object, this method will encrypt the content before uploading.

Parameters
  • message (str or QueueMessage) – The message object or id identifying the message to update.

  • pop_receipt (str) – A valid pop receipt value returned from an earlier call to the receive_messages() or update_message() operation.

  • content (obj) – Message content. Allowed type is determined by the encode_function set on the service. Default is str.

Keyword Arguments
  • visibility_timeout (int) – Specifies the new visibility timeout value, in seconds, relative to server time. The new value must be larger than or equal to 0, and cannot be larger than 7 days. The visibility timeout of a message cannot be set to a value later than the expiry time. A message can be updated until it has been deleted or has expired. The message object or message id identifying the message to update.

  • timeout (int) – The server timeout, expressed in seconds.

Returns

A QueueMessage object. For convenience, this object is also populated with the content, although it is not returned by the service.

Return type

QueueMessage

Example:

Update a message.
# Send a message
await queue.send_message(u"update me")

# Receive the message
messages = queue.receive_messages()

# Update the message
async for message in messages:
    message = await queue.update_message(
        message,
        visibility_timeout=0,
        content=u"updated")
property api_version

The version of the Storage API used for requests.

Type

str

property location_mode

The location mode that the client is currently using.

By default this will be “primary”. Options include “primary” and “secondary”.

Type

str

property primary_endpoint

The full primary endpoint URL.

Type

str

property primary_hostname

The hostname of the primary endpoint.

Type

str

property secondary_endpoint

The full secondary endpoint URL if configured.

If not available a ValueError will be raised. To explicitly specify a secondary hostname, use the optional secondary_hostname keyword argument on instantiation.

Type

str

Raises

ValueError

property secondary_hostname

The hostname of the secondary endpoint.

If not available this will be None. To explicitly specify a secondary hostname, use the optional secondary_hostname keyword argument on instantiation.

Type

str or None

property url

The full endpoint URL to this entity, including SAS token if used.

This could be either the primary endpoint, or the secondary endpoint depending on the current location_mode().

class azure.storage.queue.aio.QueueServiceClient(account_url, credential=None, **kwargs)[source]

A client to interact with the Queue Service at the account level.

This client provides operations to retrieve and configure the account properties as well as list, create and delete queues within the account. For operations relating to a specific queue, a client for this entity can be retrieved using the get_queue_client() function.

Parameters
  • account_url (str) – The URL to the queue service endpoint. Any other entities included in the URL path (e.g. queue) will be discarded. This URL can be optionally authenticated with a SAS token.

  • credential – The credentials with which to authenticate. This is optional if the account URL already has a SAS token. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity.

Keyword Arguments
  • api_version (str) – The Storage API version to use for requests. Default value is ‘2019-07-07’. Setting to an older version may result in reduced feature compatibility.

  • secondary_hostname (str) – The hostname of the secondary endpoint.

Example:

Creating the QueueServiceClient with an account url and credential.
from azure.storage.queue.aio import QueueServiceClient
queue_service = QueueServiceClient(account_url=self.account_url, credential=self.access_key)
Creating the QueueServiceClient with Azure Identity credentials.
# Get a token credential for authentication
from azure.identity.aio import ClientSecretCredential
token_credential = ClientSecretCredential(
    self.active_directory_tenant_id,
    self.active_directory_application_id,
    self.active_directory_application_secret
)

# Instantiate a QueueServiceClient using a token credential
from azure.storage.queue.aio import QueueServiceClient
queue_service = QueueServiceClient(account_url=self.account_url, credential=token_credential)
async close()

This method is to close the sockets opened by the client. It need not be used when using with a context manager.

async create_queue(name, metadata=None, **kwargs)[source]

Creates a new queue under the specified account.

If a queue with the same name already exists, the operation fails. Returns a client with which to interact with the newly created queue.

Parameters
  • name (str) – The name of the queue to create.

  • metadata (dict(str, str)) – A dict with name_value pairs to associate with the queue as metadata. Example: {‘Category’: ‘test’}

Keyword Arguments

timeout (int) – The timeout parameter is expressed in seconds.

Return type

QueueClient

Example:

Create a queue in the service.
await queue_service.create_queue("myqueue1")
async delete_queue(queue, **kwargs)[source]

Deletes the specified queue and any messages it contains.

When a queue is successfully deleted, it is immediately marked for deletion and is no longer accessible to clients. The queue is later removed from the Queue service during garbage collection.

Note that deleting a queue is likely to take at least 40 seconds to complete. If an operation is attempted against the queue while it was being deleted, an HttpResponseError will be thrown.

Parameters

queue (str or QueueProperties) – The queue to delete. This can either be the name of the queue, or an instance of QueueProperties.

Keyword Arguments

timeout (int) – The timeout parameter is expressed in seconds.

Return type

None

Example:

Delete a queue in the service.
await queue_service.delete_queue("myqueue1")
classmethod from_connection_string(conn_str, credential=None, **kwargs)[source]

Create QueueServiceClient from a Connection String.

Parameters
  • conn_str (str) – A connection string to an Azure Storage account.

  • credential – The credentials with which to authenticate. This is optional if the account URL already has a SAS token, or the connection string already has shared access key values. The value can be a SAS token string, an account shared access key, or an instance of a TokenCredentials class from azure.identity.

Returns

A Queue service client.

Return type

QueueClient

Example:

Creating the QueueServiceClient with a connection string.
from azure.storage.queue import QueueServiceClient
queue_service = QueueServiceClient.from_connection_string(conn_str=self.connection_string)
get_queue_client(queue, **kwargs)[source]

Get a client to interact with the specified queue.

The queue need not already exist.

Parameters

queue (str or QueueProperties) – The queue. This can either be the name of the queue, or an instance of QueueProperties.

Returns

A QueueClient object.

Return type

QueueClient

Example:

Get the queue client.
# Get the queue client to interact with a specific queue
queue = queue_service.get_queue_client(queue="myqueue2")
async get_service_properties(**kwargs)[source]

Gets the properties of a storage account’s Queue service, including Azure Storage Analytics.

Keyword Arguments

timeout (int) – The timeout parameter is expressed in seconds.

Returns

An object containing queue service properties such as analytics logging, hour/minute metrics, cors rules, etc.

Return type

Dict[str, Any]

Example:

Getting queue service properties.
properties = await queue_service.get_service_properties()
async get_service_stats(**kwargs)[source]

Retrieves statistics related to replication for the Queue service.

It is only available when read-access geo-redundant replication is enabled for the storage account.

With geo-redundant replication, Azure Storage maintains your data durable in two locations. In both locations, Azure Storage constantly maintains multiple healthy replicas of your data. The location where you read, create, update, or delete data is the primary storage account location. The primary location exists in the region you choose at the time you create an account via the Azure Management Azure classic portal, for example, North Central US. The location to which your data is replicated is the secondary location. The secondary location is automatically determined based on the location of the primary; it is in a second data center that resides in the same region as the primary location. Read-only access is available from the secondary location, if read-access geo-redundant replication is enabled for your storage account.

Keyword Arguments

timeout (int) – The timeout parameter is expressed in seconds.

Returns

The queue service stats.

Return type

Dict[str, Any]

list_queues(name_starts_with=None, include_metadata=False, **kwargs)[source]

Returns a generator to list the queues under the specified account.

The generator will lazily follow the continuation tokens returned by the service and stop when all queues have been returned.

Parameters
  • name_starts_with (str) – Filters the results to return only queues whose names begin with the specified prefix.

  • include_metadata (bool) – Specifies that queue metadata be returned in the response.

Keyword Arguments
  • results_per_page (int) – The maximum number of queue names to retrieve per API call. If the request does not specify the server will return up to 5,000 items.

  • timeout (int) – The server timeout, expressed in seconds. This function may make multiple calls to the service in which case the timeout value specified will be applied to each individual call.

Returns

An iterable (auto-paging) of QueueProperties.

Return type

AsyncItemPaged[QueueProperties]

Example:

List queues in the service.
# List all the queues in the service
list_queues = queue_service.list_queues()
async for queue in list_queues:
    print(queue)

# List the queues in the service that start with the name "my_"
list_my_queues = queue_service.list_queues(name_starts_with="my_")
async for queue in list_my_queues:
    print(queue)
async set_service_properties(analytics_logging=None, hour_metrics=None, minute_metrics=None, cors=None, **kwargs)[source]

Sets the properties of a storage account’s Queue service, including Azure Storage Analytics.

If an element (e.g. analytics_logging) is left as None, the existing settings on the service for that functionality are preserved.

Parameters
  • analytics_logging (QueueAnalyticsLogging) – Groups the Azure Analytics Logging settings.

  • hour_metrics (Metrics) – The hour metrics settings provide a summary of request statistics grouped by API in hourly aggregates for queues.

  • minute_metrics (Metrics) – The minute metrics settings provide request statistics for each minute for queues.

  • cors (list(CorsRule)) – You can include up to five CorsRule elements in the list. If an empty list is specified, all CORS rules will be deleted, and CORS will be disabled for the service.

Keyword Arguments

timeout (int) – The timeout parameter is expressed in seconds.

Return type

None

Example:

Setting queue service properties.
# Create service properties
from azure.storage.queue import QueueAnalyticsLogging, Metrics, CorsRule, RetentionPolicy

# Create logging settings
logging = QueueAnalyticsLogging(read=True, write=True, delete=True, retention_policy=RetentionPolicy(enabled=True, days=5))

# Create metrics for requests statistics
hour_metrics = Metrics(enabled=True, include_apis=True, retention_policy=RetentionPolicy(enabled=True, days=5))
minute_metrics = Metrics(enabled=True, include_apis=True, retention_policy=RetentionPolicy(enabled=True, days=5))

# Create CORS rules
cors_rule1 = CorsRule(['www.xyz.com'], ['GET'])
allowed_origins = ['www.xyz.com', "www.ab.com", "www.bc.com"]
allowed_methods = ['GET', 'PUT']
max_age_in_seconds = 500
exposed_headers = ["x-ms-meta-data*", "x-ms-meta-source*", "x-ms-meta-abc", "x-ms-meta-bcd"]
allowed_headers = ["x-ms-meta-data*", "x-ms-meta-target*", "x-ms-meta-xyz", "x-ms-meta-foo"]
cors_rule2 = CorsRule(
    allowed_origins,
    allowed_methods,
    max_age_in_seconds=max_age_in_seconds,
    exposed_headers=exposed_headers,
    allowed_headers=allowed_headers
)

cors = [cors_rule1, cors_rule2]

# Set the service properties
await queue_service.set_service_properties(logging, hour_metrics, minute_metrics, cors)
property api_version

The version of the Storage API used for requests.

Type

str

property location_mode

The location mode that the client is currently using.

By default this will be “primary”. Options include “primary” and “secondary”.

Type

str

property primary_endpoint

The full primary endpoint URL.

Type

str

property primary_hostname

The hostname of the primary endpoint.

Type

str

property secondary_endpoint

The full secondary endpoint URL if configured.

If not available a ValueError will be raised. To explicitly specify a secondary hostname, use the optional secondary_hostname keyword argument on instantiation.

Type

str

Raises

ValueError

property secondary_hostname

The hostname of the secondary endpoint.

If not available this will be None. To explicitly specify a secondary hostname, use the optional secondary_hostname keyword argument on instantiation.

Type

str or None

property url

The full endpoint URL to this entity, including SAS token if used.

This could be either the primary endpoint, or the secondary endpoint depending on the current location_mode().