Skip to content

RecvClient#

Bases: Container

Messaging receiver.

Source code in pubtools/sign/clients/msg_recv_client.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class RecvClient(Container):
    """Messaging receiver."""

    def __init__(
        self,
        uid: str,
        topic: str,
        message_ids: List[str],
        id_key: str,
        broker_urls: List[str],
        cert: str,
        ca_cert: str,
        timeout: int,
        retries: int,
        errors: List[MsgError],
        received: Dict[Any, Any],
    ) -> None:
        """Recv Client Initializer.

        Args:
            topic (str): Topic where to listen for incoming messages
            message_ids (List[str]): List of awaited message ids
            id_key (str): Attribute name in message body which is considered as id
            broker_urls (List[str]): List of broker urls
            cert (str): Messaging client certificate
            ca_cert (str): Messaging ca certificate
            timeout (int): Timeout for the messaging receiver
            retries (int): How many attempts to retry receiving messages
            errors (List[MsgError]): List of errors which occured during the process
            received (Dict[Any, Any]): Mapping of received messages
            uid (str): Unique identifier for the receiver
        """
        self.message_ids = message_ids
        self.recv: Dict[Any, Any] = received
        self._errors: List[MsgError] = errors
        self.topic = topic
        self.message_ids = message_ids
        self.id_key = id_key
        self.broker_urls = broker_urls
        self.cert = cert
        self.ca_cert = ca_cert
        self.timeout = timeout
        self.uid = uid
        self._retries = retries
        handler = _RecvClient(
            uid=uid,
            topic=topic,
            message_ids=message_ids,
            id_key=id_key,
            broker_urls=broker_urls,
            cert=cert,
            ca_cert=ca_cert,
            timeout=timeout,
            recv=self.recv,
            errors=self._errors,
        )
        super().__init__(handler)
        self._handler = handler

    def get_errors(self) -> List[MsgError]:
        """Get errors from receiver.

        This method doesn't have any meaningfull usecase, it's only used for testing

        Returns:
            List[MsgError]: List of errors which occured during the process
        """
        return self._errors  # pragma: no cover

    def get_received(self) -> Dict[Any, Any]:
        """Get received messages.

        This method doesn't have any meaningfull usecase, it's only used for testing

        Returns:
            Dict[Any, Any]: Dictionary of received messages
        """
        return self.recv  # pragma: no cover

    def run(self) -> Union[Dict[Any, Any], List[MsgError]]:  # type: ignore[override]
        """Run the receiver.

        This method starts the receiver and waits for messages to be received.

        Returns:
            Union[Dict[Any, Any], List[MsgError]]: Dictionary of received messages if successful,
            or a list of errors if any occurred.
        """
        LOG.info("Running messaging receiver")
        if not len(self.message_ids):
            LOG.warning("No messages to receive")
            return []
        super().run()
        if self._errors:
            return self._errors
        return self.recv

    def close(self) -> None:
        """Close receiver."""
        LOG.info("Closing messaging receiver")
        if self._handler:
            cast(_RecvClient, self._handler).close()

__init__(uid, topic, message_ids, id_key, broker_urls, cert, ca_cert, timeout, retries, errors, received) #

Recv Client Initializer.

Parameters:

Name Type Description Default
topic str

Topic where to listen for incoming messages

required
message_ids List[str]

List of awaited message ids

required
id_key str

Attribute name in message body which is considered as id

required
broker_urls List[str]

List of broker urls

required
cert str

Messaging client certificate

required
ca_cert str

Messaging ca certificate

required
timeout int

Timeout for the messaging receiver

required
retries int

How many attempts to retry receiving messages

required
errors List[MsgError]

List of errors which occured during the process

required
received Dict[Any, Any]

Mapping of received messages

required
uid str

Unique identifier for the receiver

required
Source code in pubtools/sign/clients/msg_recv_client.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def __init__(
    self,
    uid: str,
    topic: str,
    message_ids: List[str],
    id_key: str,
    broker_urls: List[str],
    cert: str,
    ca_cert: str,
    timeout: int,
    retries: int,
    errors: List[MsgError],
    received: Dict[Any, Any],
) -> None:
    """Recv Client Initializer.

    Args:
        topic (str): Topic where to listen for incoming messages
        message_ids (List[str]): List of awaited message ids
        id_key (str): Attribute name in message body which is considered as id
        broker_urls (List[str]): List of broker urls
        cert (str): Messaging client certificate
        ca_cert (str): Messaging ca certificate
        timeout (int): Timeout for the messaging receiver
        retries (int): How many attempts to retry receiving messages
        errors (List[MsgError]): List of errors which occured during the process
        received (Dict[Any, Any]): Mapping of received messages
        uid (str): Unique identifier for the receiver
    """
    self.message_ids = message_ids
    self.recv: Dict[Any, Any] = received
    self._errors: List[MsgError] = errors
    self.topic = topic
    self.message_ids = message_ids
    self.id_key = id_key
    self.broker_urls = broker_urls
    self.cert = cert
    self.ca_cert = ca_cert
    self.timeout = timeout
    self.uid = uid
    self._retries = retries
    handler = _RecvClient(
        uid=uid,
        topic=topic,
        message_ids=message_ids,
        id_key=id_key,
        broker_urls=broker_urls,
        cert=cert,
        ca_cert=ca_cert,
        timeout=timeout,
        recv=self.recv,
        errors=self._errors,
    )
    super().__init__(handler)
    self._handler = handler

close() #

Close receiver.

Source code in pubtools/sign/clients/msg_recv_client.py
257
258
259
260
261
def close(self) -> None:
    """Close receiver."""
    LOG.info("Closing messaging receiver")
    if self._handler:
        cast(_RecvClient, self._handler).close()

get_errors() #

Get errors from receiver.

This method doesn't have any meaningfull usecase, it's only used for testing

Returns:

Type Description
List[MsgError]

List[MsgError]: List of errors which occured during the process

Source code in pubtools/sign/clients/msg_recv_client.py
219
220
221
222
223
224
225
226
227
def get_errors(self) -> List[MsgError]:
    """Get errors from receiver.

    This method doesn't have any meaningfull usecase, it's only used for testing

    Returns:
        List[MsgError]: List of errors which occured during the process
    """
    return self._errors  # pragma: no cover

get_received() #

Get received messages.

This method doesn't have any meaningfull usecase, it's only used for testing

Returns:

Type Description
Dict[Any, Any]

Dict[Any, Any]: Dictionary of received messages

Source code in pubtools/sign/clients/msg_recv_client.py
229
230
231
232
233
234
235
236
237
def get_received(self) -> Dict[Any, Any]:
    """Get received messages.

    This method doesn't have any meaningfull usecase, it's only used for testing

    Returns:
        Dict[Any, Any]: Dictionary of received messages
    """
    return self.recv  # pragma: no cover

run() #

Run the receiver.

This method starts the receiver and waits for messages to be received.

Returns:

Type Description
Union[Dict[Any, Any], List[MsgError]]

Union[Dict[Any, Any], List[MsgError]]: Dictionary of received messages if successful,

Union[Dict[Any, Any], List[MsgError]]

or a list of errors if any occurred.

Source code in pubtools/sign/clients/msg_recv_client.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def run(self) -> Union[Dict[Any, Any], List[MsgError]]:  # type: ignore[override]
    """Run the receiver.

    This method starts the receiver and waits for messages to be received.

    Returns:
        Union[Dict[Any, Any], List[MsgError]]: Dictionary of received messages if successful,
        or a list of errors if any occurred.
    """
    LOG.info("Running messaging receiver")
    if not len(self.message_ids):
        LOG.warning("No messages to receive")
        return []
    super().run()
    if self._errors:
        return self._errors
    return self.recv