Source code for azure.servicebus._servicebus_sender

# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import logging
import time
import uuid
import datetime
import warnings
from typing import Any, TYPE_CHECKING, Union, List, Optional, Mapping, cast, Iterable

from ._base_handler import BaseHandler
from ._common import mgmt_handlers
from ._common.message import (
    ServiceBusMessage,
    ServiceBusMessageBatch,
)
from .amqp import AmqpAnnotatedMessage
from ._common.utils import create_authentication, transform_outbound_messages
from ._common.tracing import (
    send_trace_context_manager,
    trace_message,
    is_tracing_enabled,
    get_span_links_from_batch,
    get_span_link_from_message,
    SPAN_NAME_SCHEDULE,
    TraceAttributes,
)
from ._common.constants import (
    REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
    REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION,
    MGMT_REQUEST_SEQUENCE_NUMBERS,
    MGMT_REQUEST_SESSION_ID,
    MGMT_REQUEST_MESSAGE,
    MGMT_REQUEST_MESSAGES,
    MGMT_REQUEST_MESSAGE_ID,
    MGMT_REQUEST_PARTITION_KEY,
    MAX_MESSAGE_LENGTH_BYTES,
    MAX_BATCH_SIZE_STANDARD,
    MAX_BATCH_SIZE_PREMIUM,
)

if TYPE_CHECKING:
    from azure.core.credentials import (
        TokenCredential,
        AzureSasCredential,
        AzureNamedKeyCredential,
    )

    try:
        # pylint:disable=unused-import
        from uamqp import SendClient as uamqp_SendClientSync
        from uamqp.authentication import JWTTokenAuth as uamqp_JWTTokenAuth
    except ImportError:
        pass

    from ._transport._base import AmqpTransport
    from ._pyamqp.authentication import JWTTokenAuth as pyamqp_JWTTokenAuth
    from ._pyamqp.client import SendClient as pyamqp_SendClientSync

    MessageTypes = Union[
        Mapping[str, Any],
        ServiceBusMessage,
        AmqpAnnotatedMessage,
        Iterable[Mapping[str, Any]],
        Iterable[ServiceBusMessage],
        Iterable[AmqpAnnotatedMessage],
    ]
    MessageObjTypes = Union[
        ServiceBusMessage,
        AmqpAnnotatedMessage,
        ServiceBusMessageBatch,
        List[Union[ServiceBusMessage, AmqpAnnotatedMessage]],
    ]

_LOGGER = logging.getLogger(__name__)


class SenderMixin:
    def _create_attribute(self, **kwargs):
        self._auth_uri = f"sb://{self.fully_qualified_namespace}/{self._entity_name}"
        self._entity_uri = f"amqps://{self.fully_qualified_namespace}/{self._entity_name}"
        # TODO: What's the retry overlap between servicebus and pyamqp?
        self._error_policy = self._amqp_transport.create_retry_policy(self._config)
        self._name = kwargs.get("client_identifier") or f"SBSender-{uuid.uuid4()}"
        self.entity_name: str = self._entity_name

    @classmethod
    def _build_schedule_request(cls, schedule_time_utc, amqp_transport, tracing_attributes, *messages):
        request_body = {MGMT_REQUEST_MESSAGES: []}
        trace_links = []
        for message in messages:
            if not isinstance(message, ServiceBusMessage):
                raise ValueError(
                    f"Scheduling batch messages only supports iterables containing "
                    f"ServiceBusMessage Objects. Received instead: {message.__class__.__name__}"
                )
            message.scheduled_enqueue_time_utc = schedule_time_utc
            message = transform_outbound_messages(
                message, ServiceBusMessage, to_outgoing_amqp_message=amqp_transport.to_outgoing_amqp_message
            )
            # pylint: disable=protected-access
            message._message = trace_message(
                message._message, amqp_transport=amqp_transport, additional_attributes=tracing_attributes
            )

            if is_tracing_enabled():
                link = get_span_link_from_message(message._message)
                if link:
                    trace_links.append(link)

            message_data = {}
            message_data[MGMT_REQUEST_MESSAGE_ID] = message.message_id
            if message.session_id:
                message_data[MGMT_REQUEST_SESSION_ID] = message.session_id
            if message.partition_key:
                message_data[MGMT_REQUEST_PARTITION_KEY] = message.partition_key
            message_data[MGMT_REQUEST_MESSAGE] = bytearray(amqp_transport.encode_message(message))
            request_body[MGMT_REQUEST_MESSAGES].append(message_data)
        return request_body, trace_links


[docs] class ServiceBusSender(BaseHandler, SenderMixin): """The ServiceBusSender class defines a high level interface for sending messages to the Azure Service Bus Queue or Topic. **Please use the `get_<queue/topic>_sender` method of ~azure.servicebus.ServiceBusClient to create a ServiceBusSender instance.** :ivar fully_qualified_namespace: The fully qualified host name for the Service Bus namespace. The namespace format is: `<yournamespace>.servicebus.windows.net`. :vartype fully_qualified_namespace: str :ivar entity_name: The name of the entity that the client connects to. :vartype entity_name: str :param str fully_qualified_namespace: The fully qualified host name for the Service Bus namespace. The namespace format is: `<yournamespace>.servicebus.windows.net`. :param credential: The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the `get_token(self, *scopes)` method, or alternatively, an AzureSasCredential can be provided too. :type credential: ~azure.core.credentials.TokenCredential or ~azure.core.credentials.AzureSasCredential or ~azure.core.credentials.AzureNamedKeyCredential :keyword str queue_name: The path of specific Service Bus Queue the client connects to. :keyword str topic_name: The path of specific Service Bus Topic the client connects to. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword transport_type: The type of transport protocol that will be used for communicating with the Service Bus service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.servicebus.TransportType :keyword Dict http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. :keyword str client_identifier: A string-based identifier to uniquely identify the client instance. Service Bus will associate it with some error messages for easier correlation of errors. If not specified, a unique id will be generated. :keyword float socket_timeout: The time in seconds that the underlying socket on the connection should wait when sending and receiving data before timing out. The default value is 0.2 for TransportType.Amqp and 1 for TransportType.AmqpOverWebsocket. If connection errors are occurring due to write timing out, a larger than default value may need to be passed in. """ def __init__( self, fully_qualified_namespace: str, credential: Union["TokenCredential", "AzureSasCredential", "AzureNamedKeyCredential"], *, queue_name: Optional[str] = None, topic_name: Optional[str] = None, **kwargs: Any, ) -> None: self._amqp_transport: "AmqpTransport" if kwargs.get("entity_name"): super(ServiceBusSender, self).__init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, **kwargs ) else: if queue_name and topic_name: raise ValueError("Queue/Topic name can not be specified simultaneously.") entity_name = queue_name or topic_name if not entity_name: raise ValueError("Queue/Topic name is missing. Please specify queue_name/topic_name.") super(ServiceBusSender, self).__init__( fully_qualified_namespace=fully_qualified_namespace, credential=credential, entity_name=str(entity_name), queue_name=queue_name, topic_name=topic_name, **kwargs, ) self._max_message_size_on_link = 0 self._max_batch_size_on_link = 0 self._create_attribute(**kwargs) self._connection = kwargs.get("connection") self._handler: Union["pyamqp_SendClientSync", "uamqp_SendClientSync"] def __enter__(self) -> "ServiceBusSender": if self._shutdown.is_set(): raise ValueError( "The handler has already been shutdown. Please use ServiceBusClient to create a new instance." ) self._open_with_retry() return self @classmethod def _from_connection_string(cls, conn_str: str, **kwargs: Any) -> "ServiceBusSender": """Create a ServiceBusSender from a connection string. :param conn_str: The connection string of a Service Bus. :type conn_str: str :keyword str queue_name: The path of specific Service Bus Queue the client connects to. Only one of queue_name or topic_name can be provided. :keyword str topic_name: The path of specific Service Bus Topic the client connects to. Only one of queue_name or topic_name can be provided. :keyword bool logging_enable: Whether to output network trace logs to the logger. Default is `False`. :keyword transport_type: The type of transport protocol that will be used for communicating with the Service Bus service. Default is `TransportType.Amqp`. :paramtype transport_type: ~azure.servicebus.TransportType :keyword Dict http_proxy: HTTP proxy settings. This must be a dictionary with the following keys: `'proxy_hostname'` (str value) and `'proxy_port'` (int value). Additionally the following keys may also be present: `'username', 'password'`. :keyword str user_agent: If specified, this will be added in front of the built-in user agent string. :returns: The ServiceBusSender. :rtype: ~azure.servicebus.ServiceBusSender :raises ~azure.servicebus.ServiceBusAuthenticationError: Indicates an issue in token/identity validity. :raises ~azure.servicebus.ServiceBusAuthorizationError: Indicates an access/rights related failure. .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START create_servicebus_sender_from_conn_str_sync] :end-before: [END create_servicebus_sender_from_conn_str_sync] :language: python :dedent: 4 :caption: Create a new instance of the ServiceBusSender from connection string. """ constructor_args = cls._convert_connection_string_to_kwargs(conn_str, **kwargs) return cls(**constructor_args) def _create_handler(self, auth: Union["uamqp_JWTTokenAuth", "pyamqp_JWTTokenAuth"]) -> None: self._handler = self._amqp_transport.create_send_client( config=self._config, target=self._entity_uri, auth=auth, properties=self._properties, retry_policy=self._error_policy, client_name=self._name, ) def _open(self): # pylint: disable=protected-access if self._running: return if self._handler: self._handler.close() auth = None if self._connection else create_authentication(self) self._create_handler(auth) try: self._handler.open(connection=self._connection) while not self._handler.client_ready(): time.sleep(0.05) self._running = True self._max_message_size_on_link = ( self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES ) if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM: self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM else: self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD except: self._close_handler() raise def _send( self, message: Union[ServiceBusMessage, ServiceBusMessageBatch], timeout: Optional[float] = None, last_exception: Optional[Exception] = None, # pylint: disable=unused-argument ) -> None: self._amqp_transport.send_messages(self, message, _LOGGER, timeout=timeout, last_exception=last_exception)
[docs] def schedule_messages( self, messages: "MessageTypes", schedule_time_utc: datetime.datetime, *, timeout: Optional[float] = None, **kwargs: Any, ) -> List[int]: """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. :type messages: Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.amqp.AmqpAnnotatedMessage, List[Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.amqp.AmqpAnnotatedMessage]]] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout. :returns: A list of the sequence numbers of the enqueued messages. :rtype: list[int] .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START scheduling_messages] :end-before: [END scheduling_messages] :language: python :dedent: 4 :caption: Schedule a message to be sent in future """ if kwargs: warnings.warn(f"Unsupported keyword args: {kwargs}") # pylint: disable=protected-access self._check_live() obj_messages = transform_outbound_messages( messages, ServiceBusMessage, to_outgoing_amqp_message=self._amqp_transport.to_outgoing_amqp_message ) if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") tracing_attributes = { TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, } if isinstance(obj_messages, ServiceBusMessage): request_body, trace_links = self._build_schedule_request( schedule_time_utc, self._amqp_transport, tracing_attributes, obj_messages ) else: if len(obj_messages) == 0: return [] # No-op on empty list. request_body, trace_links = self._build_schedule_request( schedule_time_utc, self._amqp_transport, tracing_attributes, *obj_messages ) with send_trace_context_manager(self, span_name=SPAN_NAME_SCHEDULE, links=trace_links): return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, mgmt_handlers.schedule_op, timeout=timeout, )
[docs] def cancel_scheduled_messages( self, sequence_numbers: Union[int, List[int]], *, timeout: Optional[float] = None, **kwargs: Any ) -> None: """ Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. :type sequence_numbers: int or list[int] :keyword float timeout: The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued. .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START cancel_scheduled_messages] :end-before: [END cancel_scheduled_messages] :language: python :dedent: 4 :caption: Cancelling messages scheduled to be sent in future """ if kwargs: warnings.warn(f"Unsupported keyword args: {kwargs}") self._check_live() if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") if isinstance(sequence_numbers, int): numbers = [self._amqp_transport.AMQP_LONG_VALUE(sequence_numbers)] else: numbers = [self._amqp_transport.AMQP_LONG_VALUE(s) for s in sequence_numbers] if len(numbers) == 0: return None # no-op on empty list. request_body = {MGMT_REQUEST_SEQUENCE_NUMBERS: self._amqp_transport.AMQP_ARRAY_VALUE(numbers)} return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION, request_body, mgmt_handlers.default, timeout=timeout, )
[docs] def send_messages( self, message: Union["MessageTypes", ServiceBusMessageBatch], *, timeout: Optional[float] = None, **kwargs: Any ) -> None: """Sends message and blocks until acknowledgement is received or operation times out. If a list of messages was provided, attempts to send them as a single batch, throwing a `ValueError` if they cannot fit in a single batch. :param message: The ServiceBus message to be sent. :type message: Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.ServiceBusMessageBatch, ~azure.servicebus.amqp.AmqpAnnotatedMessage, List[Union[~azure.servicebus.ServiceBusMessage, ~azure.servicebus.amqp.AmqpAnnotatedMessage]]] :keyword Optional[float] timeout: The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout. :rtype: None :raises: :class: ~azure.servicebus.exceptions.OperationTimeoutError if sending times out. :class: ~azure.servicebus.exceptions.MessageSizeExceededError if the size of the message is over service bus frame size limit. :class: ~azure.servicebus.exceptions.ServiceBusError when other errors happen such as connection error, authentication error, and any unexpected errors. It's also the top-level root class of above errors. .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START send_sync] :end-before: [END send_sync] :language: python :dedent: 4 :caption: Send message. """ if kwargs: warnings.warn(f"Unsupported keyword args: {kwargs}") self._check_live() if timeout is not None and timeout <= 0: raise ValueError("The timeout must be greater than 0.") try: # Short circuit noop if an empty list or batch is provided. if len(cast(Union[List, ServiceBusMessageBatch], message)) == 0: # pylint: disable=len-as-condition return except TypeError: # continue if ServiceBusMessage pass obj_message: Union[ServiceBusMessage, ServiceBusMessageBatch] if isinstance(message, ServiceBusMessageBatch): # If AmqpTransports are not the same, create batch with correct BatchMessage. if self._amqp_transport.KIND != message._amqp_transport.KIND: # pylint: disable=protected-access # pylint: disable=protected-access batch = self.create_message_batch() batch._from_list(message._messages) # type: ignore obj_message = batch else: obj_message = message else: obj_message = transform_outbound_messages( # type: ignore message, ServiceBusMessage, self._amqp_transport.to_outgoing_amqp_message ) try: batch = self.create_message_batch() batch._from_list(obj_message) # type: ignore # pylint: disable=protected-access obj_message = batch except TypeError: # Message was not a list or generator. Do needed tracing. # pylint: disable=protected-access obj_message._message = trace_message( obj_message._message, amqp_transport=self._amqp_transport, additional_attributes={ TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, }, ) trace_links = [] if is_tracing_enabled(): if isinstance(obj_message, ServiceBusMessageBatch): trace_links = get_span_links_from_batch(obj_message) else: link = get_span_link_from_message(obj_message._message) # pylint: disable=protected-access if link: trace_links.append(link) with send_trace_context_manager(self, links=trace_links): self._do_retryable_operation( self._send, message=obj_message, timeout=timeout, operation_requires_timeout=True, require_last_exception=True, )
[docs] def create_message_batch(self, max_size_in_bytes: Optional[int] = None) -> ServiceBusMessageBatch: """Create a ServiceBusMessageBatch object with the max size of all content being constrained by max_size_in_bytes. The max_size should be no greater than the max allowed message size defined by the service. :param Optional[int] max_size_in_bytes: The maximum size of bytes data that a ServiceBusMessageBatch object can hold. By default, the value is determined by your Service Bus tier. :return: A ServiceBusMessageBatch object :rtype: ~azure.servicebus.ServiceBusMessageBatch .. admonition:: Example: .. literalinclude:: ../samples/sync_samples/sample_code_servicebus.py :start-after: [START create_batch_sync] :end-before: [END create_batch_sync] :language: python :dedent: 4 :caption: Create ServiceBusMessageBatch object within limited size """ self._check_live() if not self._max_message_size_on_link: self._open_with_retry() if max_size_in_bytes and max_size_in_bytes > self._max_batch_size_on_link: raise ValueError( f"Max message size: {max_size_in_bytes} is too large, " f"acceptable max batch size is: {self._max_batch_size_on_link} bytes." ) return ServiceBusMessageBatch( max_size_in_bytes=(max_size_in_bytes or self._max_batch_size_on_link), amqp_transport=self._amqp_transport, tracing_attributes={ TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace, TraceAttributes.TRACE_MESSAGING_DESTINATION_ATTRIBUTE: self.entity_name, }, )
@property def client_identifier(self) -> str: """ Get the ServiceBusSender client_identifier associated with the sender instance. :rtype: str """ return self._name def __str__(self) -> str: return f"Sender client id: {self.client_identifier}, entity: {self.entity_name}"