Conversation
19a5443 to
ec316bf
Compare
This adds WORKER_TYPE setting. The default value is 'pulpcore'. When 'redis' is selected, the tasking system uses Redis to lock resources. Redis workers produce less load on the PostgreSQL database. closes: pulp#7210 Generated By: Claude Code.
da96f66 to
dd46ad1
Compare
Added redis connection checks to the worker so it shuts down if the connection is broken.
gerrod3
left a comment
There was a problem hiding this comment.
There's still a lot I haven't deeply reviewed yet, but this was getting long and I had a big idea around dispatch that I want to discuss
| def execute_task(task): | ||
| """Redis-aware task execution that releases Redis locks for immediate tasks.""" | ||
| # This extra stack is needed to isolate the current_task ContextVar | ||
| contextvars.copy_context().run(_execute_task, task) |
There was a problem hiding this comment.
Reading through this version and the base version there is nothing much different between the two besides that this one calls safe_release_task_locks. Could this be a wrapper of the original with a try/finally to release the redis locks?
There was a problem hiding this comment.
redis_tasks._execute_task and tasks._execute_task share the same core logic:
set_running → log_task_start → run the task function → log result → set_completed/set_failed →
send notification. The Redis version cannot directly call the PulpcoreWorker's _execute_task
because it must release Redis locks between task execution and state transition — a step that
doesn't exist in the PulpcoreWorker implementation.
| except Exception: | ||
| # Exception during execute_task() | ||
| # Atomically release all locks as safety net | ||
| safe_release_task_locks(task, lock_owner) |
There was a problem hiding this comment.
Not needed, execute_task should already handle letting go of the locks.
There was a problem hiding this comment.
that is correct. i'll update the comment to more accurately state that the except block is for the case where using_workdir() fails before the execute function gets a chance to run and release locks itself.
There was a problem hiding this comment.
Can you create a redis_using_workdir(task, app_lock) that handles this in a finally block?
gerrod3
left a comment
There was a problem hiding this comment.
Lots of comment/logging statements to remove. Need more specificity on the try/except blocks. And finally there are gaps in the task logic that need to be addressed.
| local resource_name = ARGV[2 + i] | ||
|
|
||
| -- Remove from set | ||
| local removed = redis.call("srem", key, lock_owner) |
There was a problem hiding this comment.
This call can fail if the item at key is no longer a set, i.e. is now a string for an exclusive lock.
| # Log debug for successful releases | ||
| num_released_exclusive = len(exclusive_resources) - len(not_owned_exclusive) | ||
| num_released_shared = len(shared_resources) - len(not_in_shared) | ||
| if num_released_exclusive > 0: | ||
| _logger.debug("Released %d exclusive lock(s)", num_released_exclusive) | ||
| if num_released_shared > 0: | ||
| _logger.debug("Released %d shared lock(s)", num_released_shared) | ||
| if not task_lock_not_owned: | ||
| _logger.debug("Released task lock %s", task_lock_key) |
There was a problem hiding this comment.
Do we still need these debug logs?
| def _release_resource_locks(self, task_lock_key, resources, shared_resources=None): | ||
| """ | ||
| Atomically release task lock and resource locks. | ||
|
|
||
| Uses a Lua script to ensure we only release locks that we own. | ||
|
|
||
| Args: | ||
| task_lock_key (str): Redis key for the task lock (e.g., "task:{task_id}") | ||
| resources (list): List of exclusive resource names to release locks for | ||
| shared_resources (list): Optional list of shared resource names | ||
| """ | ||
| release_resource_locks( | ||
| self.redis_conn, self.name, task_lock_key, resources, shared_resources | ||
| ) |
There was a problem hiding this comment.
Remove this method, it doesn't do anything meaningful.
This removes the need to perform a refresh of a task from the db before executing a task. This also removes some excessive logging from dispatch method.
… in Redis when tasks are not found for a missing worker
This adds WORKER_TYPE setting. The default value is 'pulpcore'. When 'redis' is selected, the tasking system uses Redis to lock resources. Redis workers produce less load on the PostgreSQL database.
closes: #7210
Generated By: Claude Code.
📜 Checklist
See: Pull Request Walkthrough