From d2f7c1c87f1c52f6abff1e024dc24bde00962b2d Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Thu, 26 Feb 2026 10:13:32 +0000 Subject: [PATCH 1/2] chore(deps): update dependency poetry to v4.2.1 (#995) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 266a27297..c0d409889 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2.1 orbs: linter: talkiq/linter@4.0.1 - poetry: talkiq/poetry@4.2.0 + poetry: talkiq/poetry@4.2.1 executors: python310: From 915c5916fe4bd2843384b4022257d862083419f0 Mon Sep 17 00:00:00 2001 From: Kevin James Date: Thu, 26 Feb 2026 12:32:39 +0000 Subject: [PATCH 2/2] feat(pubsub): [BREAKING] remove deprecated metrics agent --- pubsub/gcloud/aio/pubsub/__init__.py | 23 ------------- pubsub/gcloud/aio/pubsub/metrics_agent.py | 19 ----------- pubsub/gcloud/aio/pubsub/subscriber.py | 41 +---------------------- pubsub/tests/unit/subscriber_test.py | 25 ++------------ 4 files changed, 3 insertions(+), 105 deletions(-) delete mode 100644 pubsub/gcloud/aio/pubsub/metrics_agent.py diff --git a/pubsub/gcloud/aio/pubsub/__init__.py b/pubsub/gcloud/aio/pubsub/__init__.py index f0da93b39..c060a1f6f 100644 --- a/pubsub/gcloud/aio/pubsub/__init__.py +++ b/pubsub/gcloud/aio/pubsub/__init__.py @@ -143,29 +143,6 @@ async def handler(message): - ``subscriber_messages_received`` - [counter] the number of messages pulled from pubsub -Metrics Agent (Deprecated) -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -``subscribe`` has also an optional ``metrics_client`` argument which will be -removed in a future release. You can provide any metrics agent that implements -the same interface as ``MetricsAgent`` (Datadog client will do ;) ) and get the -following metrics: - -- ``pubsub.producer.batch`` - [histogram] actual size of a batch retrieved from - pubsub. -- ``pubsub.consumer.failfast`` - [increment] a message was dropped due to its - lease being expired. -- ``pubsub.consumer.latency.receive`` - [histogram] how many seconds it took - for a message to reach handler after it was published. -- ``pubsub.consumer.succeeded`` - [increment] ``handler`` call was successfull. -- ``pubsub.consumer.failed`` - [increment] ``handler`` call raised an - exception. -- ``pubsub.consumer.latency.runtime`` - [histogram] ``handler`` execution time - in seconds. -- ``pubsub.acker.batch.failed`` - [increment] ack request failed. -- ``pubsub.acker.batch`` - [histogram] actual number of messages that was acked - in a single request. - Publisher --------- diff --git a/pubsub/gcloud/aio/pubsub/metrics_agent.py b/pubsub/gcloud/aio/pubsub/metrics_agent.py deleted file mode 100644 index 447c3b181..000000000 --- a/pubsub/gcloud/aio/pubsub/metrics_agent.py +++ /dev/null @@ -1,19 +0,0 @@ -class MetricsAgent: - """ - Any metric client should implement this interface - to be compatible with subscriber.subscribe - """ - - def histogram( - self, - metric: str, - value: float, - ) -> None: - pass - - def increment( - self, - metric: str, - value: float = 1, - ) -> None: - pass diff --git a/pubsub/gcloud/aio/pubsub/subscriber.py b/pubsub/gcloud/aio/pubsub/subscriber.py index 2f7838ffa..29b328e53 100644 --- a/pubsub/gcloud/aio/pubsub/subscriber.py +++ b/pubsub/gcloud/aio/pubsub/subscriber.py @@ -8,7 +8,6 @@ import asyncio import logging import time - import warnings from collections.abc import Awaitable from collections.abc import Callable from typing import Optional @@ -18,7 +17,6 @@ from . import metrics from .subscriber_client import SubscriberClient from .subscriber_message import SubscriberMessage - from .metrics_agent import MetricsAgent log = logging.getLogger(__name__) @@ -97,7 +95,6 @@ async def acker( ack_queue: 'asyncio.Queue[str]', subscriber_client: 'SubscriberClient', ack_window: float, - metrics_client: MetricsAgent, ) -> None: ack_ids: list[str] = [] while True: @@ -160,7 +157,6 @@ async def maybe_ack(ack_id: str) -> None: exc_info=e, extra={'exc_message': str(e)}, ) - metrics_client.increment('pubsub.acker.batch.failed') metrics.BATCH_STATUS.labels( component='acker', outcome='failed', @@ -175,7 +171,6 @@ async def maybe_ack(ack_id: str) -> None: exc_info=e, extra={'exc_message': str(e)}, ) - metrics_client.increment('pubsub.acker.batch.failed') metrics.BATCH_STATUS.labels( component='acker', outcome='failed', @@ -183,7 +178,6 @@ async def maybe_ack(ack_id: str) -> None: continue - metrics_client.histogram('pubsub.acker.batch', len(ack_ids)) metrics.BATCH_STATUS.labels( component='acker', outcome='succeeded', @@ -199,7 +193,6 @@ async def nacker( nack_queue: 'asyncio.Queue[str]', subscriber_client: 'SubscriberClient', nack_window: float, - metrics_client: MetricsAgent, ) -> None: ack_ids: list[str] = [] while True: @@ -264,7 +257,6 @@ async def maybe_nack(ack_id: str) -> None: exc_info=e, extra={'exc_message': str(e)}, ) - metrics_client.increment('pubsub.nacker.batch.failed') metrics.BATCH_STATUS.labels( component='nacker', outcome='failed', ).inc() @@ -278,14 +270,12 @@ async def maybe_nack(ack_id: str) -> None: exc_info=e, extra={'exc_message': str(e)}, ) - metrics_client.increment('pubsub.nacker.batch.failed') metrics.BATCH_STATUS.labels( component='nacker', outcome='failed', ).inc() continue - metrics_client.histogram('pubsub.nacker.batch', len(ack_ids)) metrics.BATCH_STATUS.labels( component='nacker', outcome='succeeded', @@ -302,7 +292,6 @@ async def _execute_callback( ack_queue: 'asyncio.Queue[str]', nack_queue: 'Optional[asyncio.Queue[str]]', insertion_time: float, - metrics_client: MetricsAgent, ) -> None: try: start = time.perf_counter() @@ -312,11 +301,6 @@ async def _execute_callback( with metrics.CONSUME_LATENCY.labels(phase='runtime').time(): await callback(message) await ack_queue.put(message.ack_id) - metrics_client.histogram( - 'pubsub.consumer.latency.runtime', - time.perf_counter() - start, - ) - metrics_client.increment('pubsub.consumer.succeeded') metrics.CONSUME.labels(outcome='succeeded').inc() except asyncio.CancelledError: @@ -324,7 +308,6 @@ async def _execute_callback( await nack_queue.put(message.ack_id) log.warning('application callback was cancelled') - metrics_client.increment('pubsub.consumer.cancelled') metrics.CONSUME.labels(outcome='cancelled').inc() except Exception as e: if nack_queue: @@ -335,7 +318,6 @@ async def _execute_callback( exc_info=e, extra={'exc_message': str(e)}, ) - metrics_client.increment('pubsub.consumer.failed') metrics.CONSUME.labels(outcome='failed').inc() async def consumer( # pylint: disable=too-many-locals @@ -345,7 +327,6 @@ async def consumer( # pylint: disable=too-many-locals ack_deadline_cache: AckDeadlineCache, max_tasks: int, nack_queue: 'Optional[asyncio.Queue[str]]', - metrics_client: MetricsAgent, ) -> None: try: semaphore = asyncio.Semaphore(max_tasks) @@ -358,7 +339,6 @@ async def _consume_one( ack_deadline = await ack_deadline_cache.get() if (time.perf_counter() - pulled_at) >= ack_deadline: - metrics_client.increment('pubsub.consumer.failfast') metrics.CONSUME.labels(outcome='failfast').inc() message_queue.task_done() semaphore.release() @@ -367,9 +347,6 @@ async def _consume_one( # publish_time is in UTC Zulu # https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage recv_latency = time.time() - message.publish_time.timestamp() - metrics_client.histogram( - 'pubsub.consumer.latency.receive', recv_latency, - ) metrics.CONSUME_LATENCY.labels(phase='receive').observe( recv_latency, ) @@ -381,7 +358,6 @@ async def _consume_one( ack_queue, nack_queue, time.perf_counter(), - metrics_client, ), ) task.add_done_callback(lambda _f: semaphore.release()) @@ -407,7 +383,6 @@ async def producer( message_queue: MessageQueue, subscriber_client: 'SubscriberClient', max_messages: int, - metrics_client: MetricsAgent, ) -> None: try: while True: @@ -429,9 +404,6 @@ async def producer( except (asyncio.TimeoutError, KeyError): continue - metrics_client.histogram( - 'pubsub.producer.batch', len(new_messages), - ) metrics.MESSAGES_RECEIVED.inc(len(new_messages)) metrics.BATCH_SIZE.observe(len(new_messages)) @@ -473,7 +445,6 @@ async def subscribe( num_tasks_per_consumer: int = 1, enable_nack: bool = True, nack_window: float = 0.3, - metrics_client: MetricsAgent | None = None, ) -> None: # pylint: disable=too-many-locals ack_queue: 'asyncio.Queue[str]' = asyncio.Queue( @@ -487,13 +458,6 @@ async def subscribe( ack_deadline, ) - if metrics_client is not None: - warnings.warn( - 'Using MetricsAgent in subscribe() is deprecated. ' - 'Refer to Prometheus metrics instead.', - DeprecationWarning, - ) - metrics_client = metrics_client or MetricsAgent() acker_tasks = [] consumer_tasks = [] producer_tasks = [] @@ -502,7 +466,7 @@ async def subscribe( asyncio.ensure_future( acker( subscription, ack_queue, subscriber_client, - ack_window=ack_window, metrics_client=metrics_client, + ack_window=ack_window, ), ), ) @@ -515,7 +479,6 @@ async def subscribe( nacker( subscription, nack_queue, subscriber_client, nack_window=nack_window, - metrics_client=metrics_client, ), ), ) @@ -532,7 +495,6 @@ async def subscribe( ack_deadline_cache, num_tasks_per_consumer, nack_queue, - metrics_client=metrics_client, ), ), ) @@ -543,7 +505,6 @@ async def subscribe( q, subscriber_client, max_messages=max_messages_per_producer, - metrics_client=metrics_client, ), ), ) diff --git a/pubsub/tests/unit/subscriber_test.py b/pubsub/tests/unit/subscriber_test.py index b21d37f1e..86694ceba 100644 --- a/pubsub/tests/unit/subscriber_test.py +++ b/pubsub/tests/unit/subscriber_test.py @@ -221,7 +221,6 @@ async def test_producer_fetches_messages(subscriber_client): queue, subscriber_client, max_messages=1, - metrics_client=MagicMock(), ), ) message, pulled_at = await asyncio.wait_for(queue.get(), 0.1) @@ -246,7 +245,6 @@ async def f(*args, **kwargs): queue, subscriber_client, max_messages=1, - metrics_client=MagicMock(), ), ) await asyncio.sleep(0) @@ -274,7 +272,6 @@ async def f(*args, **kwargs): queue, subscriber_client, max_messages=1, - metrics_client=MagicMock(), ), ) await asyncio.sleep(0) @@ -302,7 +299,6 @@ async def f(*args, **kwargs): queue, subscriber_client, max_messages=1, - metrics_client=MagicMock(), ), ) await asyncio.sleep(0) @@ -328,7 +324,6 @@ async def test_producer_gracefully_shutsdown(subscriber_client): queue, subscriber_client, max_messages=1, - metrics_client=MagicMock(), ), ) await asyncio.sleep(0) @@ -353,7 +348,6 @@ async def test_producer_fetches_once_then_waits_for_consumer( queue, subscriber_client, max_messages=1, - metrics_client=MagicMock(), ), ) await asyncio.sleep(0) @@ -380,7 +374,7 @@ async def test_consumer_calls_none_means_ack( consumer_task = asyncio.ensure_future( consumer( queue, application_callback, ack_queue, - ack_deadline_cache, 1, nack_queue, MagicMock(), + ack_deadline_cache, 1, nack_queue, ), ) @@ -418,7 +412,6 @@ async def callback(mock): consumer_task = asyncio.ensure_future( consumer( queue, callback, ack_queue, ack_deadline_cache, 2, None, - MagicMock(), ), ) @@ -461,7 +454,7 @@ async def test_consumer_drops_expired_messages( consumer_task = asyncio.ensure_future( consumer( queue, application_callback, ack_queue, - ack_deadline_cache, 1, nack_queue, MagicMock(), + ack_deadline_cache, 1, nack_queue, ), ) @@ -489,7 +482,6 @@ async def f(*args): consumer_task = asyncio.ensure_future( consumer( queue, f, ack_queue, ack_deadline_cache, 1, None, - MagicMock(), ), ) await queue.put((message, 0.0)) @@ -516,7 +508,6 @@ async def f(*args): consumer_task = asyncio.ensure_future( consumer( queue, f, ack_queue, ack_deadline_cache, 1, nack_queue, - MagicMock(), ), ) @@ -558,7 +549,6 @@ async def f(*args): ack_deadline_cache, 1, nack_queue, - MagicMock(), ), ) await queue.put((message, 0.0)) @@ -591,7 +581,6 @@ async def test_consumer_gracefull_shutdown_without_pending_tasks( ack_deadline_cache, 1, nack_queue, - MagicMock(), ), ) await asyncio.sleep(0.1) @@ -612,7 +601,6 @@ async def test_acker_does_ack(subscriber_client): queue, subscriber_client, 0.0, - MagicMock(), ), ) await queue.put('ack_id') @@ -640,7 +628,6 @@ async def f(*args, **kwargs): queue, subscriber_client, 0.0, - MagicMock(), ), ) await queue.put('ack_id') @@ -660,7 +647,6 @@ async def test_acker_does_batching(subscriber_client): queue, subscriber_client, 0.1, - MagicMock(), ), ) await queue.put('ack_id_1') @@ -689,7 +675,6 @@ async def f(*args, **kwargs): queue, subscriber_client, 0.1, - MagicMock(), ), ) await queue.put('ack_id_1') @@ -730,7 +715,6 @@ async def f(*args, **kwargs): queue, subscriber_client, 0.1, - MagicMock(), ), ) await queue.put('ack_id_1') @@ -766,7 +750,6 @@ async def test_nacker_does_modify_ack_deadline(subscriber_client): queue, subscriber_client, 0.0, - MagicMock(), ), ) await queue.put('ack_id') @@ -794,7 +777,6 @@ async def f(*args, **kwargs): queue, subscriber_client, 0.0, - MagicMock(), ), ) await queue.put('ack_id') @@ -814,7 +796,6 @@ async def test_nacker_does_batching(subscriber_client): queue, subscriber_client, 0.1, - MagicMock(), ), ) await queue.put('ack_id_1') @@ -845,7 +826,6 @@ async def f(*args, **kwargs): queue, subscriber_client, 0.1, - MagicMock(), ), ) await queue.put('ack_id_1') @@ -893,7 +873,6 @@ async def f(*args, **kwargs): queue, subscriber_client, 0.1, - MagicMock(), ), ) await queue.put('ack_id_1')