Conversation
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
📝 WalkthroughWalkthroughModule migration to v6 with API and import updates, added context.Context to public driver/kv/plugin methods, internal state moved from pointers to atomic types, JSON tag and encoder changes, CI workflow and test infra updated (in-memory tracing, DialContext), and various map/TTL/heap refinements. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRsSuggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR updates the memory plugin from v5 to v6, migrating to new API packages (api-go/v6, api-plugins/v6), goridge v4, and jobs v6. It replaces the external otel plugin with an in-memory tracer for tests, modernizes Go idioms (atomic types, range integers, wg.Go, clear()), and removes stale comments and dependencies.
Changes:
- Migrated module from
memory/v5tomemory/v6with updated API/dependency imports and addedcontext.Contextparameters to driver methods. - Replaced otel plugin dependency in tests with an in-memory
TracerProviderand modernized test patterns (Go 1.22+ range integers,sync.WaitGroup.Go,net.Dialer.DialContext). - Cleaned up atomic operations (switched from raw
atomic.*functions toatomic.Bool/atomic.Int64/atomic.Uint64struct fields) and removed unnecessary comments and code.
Reviewed changes
Copilot reviewed 15 out of 17 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| go.mod / go.sum | Module rename to v6, updated dependencies |
| tests/go.mod / tests/go.sum | Updated test dependencies, removed otel/temporal deps |
| plugin.go | Updated imports to v6, added context params to driver constructors |
| memorykv/driver.go | Added context params, use context for spans, cleaned up code/comments |
| memorykv/map.go | Simplified Clean() with clear() |
| memoryjobs/driver.go | Switched to atomic struct fields, removed helper functions, added context params |
| memoryjobs/item.go | Switched to atomic.Bool/atomic.Int64, replaced goccy/go-json with encoding/json |
| tests/kv_memory_test.go | Updated imports, modernized test patterns |
| tests/jobs_memory_test.go | Replaced otel plugin with in-memory tracer, modernized tests |
| tests/helpers/helpers.go | Updated imports, modernized dial/loop patterns |
| tests/configs/.rr-memory-tracer.yaml | Removed otel config section |
| .golangci.yml | Added gocognit to disabled linters, removed test-specific exclusion rules |
| .github/workflows/linux_jobs.yml | Updated PHP to 8.5, fixed coverage artifact names, added awk filtering |
| .github/workflows/linux_inmemory.yml | Same CI changes as linux_jobs.yml |
| go.work.sum | Updated workspace checksums |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
memorykv/driver.go (1)
128-176:⚠️ Potential issue | 🟠 MajorOld TTL callback not stopped when overwriting existing key in
Set.Line 134 deletes the existing entry before setting the new one, but it doesn't stop the TTL callback goroutine of the old entry. The old callback will continue running and eventually call
heap.removeEntry()when its TTL expires, which could delete the newly set entry with the same key.🐛 Proposed fix to stop old callback before overwriting
for i := range items { if items[i] == nil { continue } // check for the duplicates - d.heap.Delete(items[i].Key()) + if old, ok := d.heap.LoadAndDelete(items[i].Key()); ok && old.callback != nil { + select { + case old.callback.stopCh <- struct{}{}: + default: + } + } // TTL is set🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@memorykv/driver.go` around lines 128 - 176, The Set loop currently calls d.heap.Delete(key) but doesn't stop any running TTL goroutine for the existing Item, so retrieve the existing entry from the heap (e.g., via a Get or lookup on d.heap) before deleting; if existing.Item.callback != nil then signal its stopCh (and/or close it in the same manner ttlcallback expects) and ensure the goroutine is stopped (optionally wait for acknowledgement) before calling d.heap.Delete and replacing the entry; reference the Set loop, d.heap.Delete, ttlcallback, Item.callback and cb.stopCh when making this change.memoryjobs/driver.go (2)
275-291:⚠️ Potential issue | 🟠 MajorMissing decrement of
delayedcounter after delay expires.The
delayedcounter is incremented at line 277 when a delayed job is scheduled, but it's never decremented aftertime.Sleepcompletes and the item is pushed tolocalQueue. This will cause theState()method to report incorrectDelayedcounts that accumulate over time.🐛 Proposed fix to decrement delayed counter
go func(jj *Item) { c.goroutines.Add(1) c.delayed.Add(1) defer c.goroutines.Add(^uint64(0)) + defer c.delayed.Add(-1) time.Sleep(jj.Options.DelayDuration()) select { case c.localQueue <- jj: default: c.log.Warn("can't push job", zap.String("error", "local queue closed or full")) } }(msg)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@memoryjobs/driver.go` around lines 275 - 291, The delayed counter is incremented in the anonymous goroutine started at "go func(jj *Item)" via c.delayed.Add(1) but never decremented; add a corresponding decrement (c.delayed.Add(^uint64(0))) so the counter is reduced when the delayed work completes. Place the decrement as a deferred call immediately after the increment (mirroring how goroutines uses defer c.goroutines.Add(^uint64(0))) so it always runs after time.Sleep and the attempt to push to c.localQueue, ensuring Delayed in State() is accurate even if the send fails.
150-162:⚠️ Potential issue | 🔴 CriticalFix false error wrapping in
Pushmethod that reports success as failure.Line 161 unconditionally wraps the result of
handleItemwitherrors.E(op, ...). SincehandleItemreturnsnilon successful execution (lines 291 and 296),errors.E(op, nil)incorrectly returns a non-nil error, causingPushto report failure when the operation succeeds.Wrap the result only when an error actually occurs:
if err := c.handleItem(ctx, fromJob(jb)); err != nil { return errors.E(op, err) } return nil🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@memoryjobs/driver.go` around lines 150 - 162, The Push method currently wraps the result of handleItem unconditionally using errors.E(op, ...), causing a nil success to be reported as an error; update Driver.Push so it calls c.handleItem(ctx, fromJob(jb)), captures the returned error, and only returns errors.E(op, err) when err != nil, otherwise return nil; locate the Push function and adjust the final return to conditionally wrap the error from handleItem instead of always calling errors.E(op, ...).
🧹 Nitpick comments (2)
memoryjobs/driver.go (2)
180-197: Potential race in Run/Resume between check and set oflisteners.The pattern of Load() check followed by consume() and then Store(true) at lines 187-193 (and similarly in Resume at lines 230-236) is not atomic. Two concurrent calls could both pass the check and spawn duplicate consumer goroutines. If this is protected by higher-level synchronization in the jobs plugin, this is fine. Otherwise, consider using
CompareAndSwapfor atomic state transitions.♻️ Suggested atomic state transition
func (c *Driver) Run(ctx context.Context, pipe jobs.Pipeline) error { const op = errors.Op("in_memory_jobs_run") _, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "in_memory_run") defer span.End() t := time.Now().UTC() - if c.listeners.Load() { + if !c.listeners.CompareAndSwap(false, true) { c.log.Warn("listener already in the active state") return errors.E(op, errors.Str("listener already in the active state")) } c.consume() - c.listeners.Store(true)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@memoryjobs/driver.go` around lines 180 - 197, The Run method currently does a non-atomic listeners.Load() check then calls c.consume() and c.listeners.Store(true), which can race with concurrent Run/Resume calls; change the logic in Driver.Run (and similarly in Driver.Resume) to perform an atomic state transition using c.listeners.CompareAndSwap(false, true) and only call c.consume() when the CAS succeeds, returning the existing-listener error if CAS fails; ensure you reference the listeners field, the consume() call, and the Run/Resume methods when making this change so duplicate consumer goroutines cannot be spawned.
314-342: Minor ordering consideration in prefetch accounting.The
msgInFlight.Add(1)at line 338 occurs afterc.pq.Insert(item)at line 336. In theory, if the job is processed and acknowledged extremely quickly before line 338 executes, the decrement in the ack handler could race ahead. However, this is unlikely in practice due to the execution flow. Consider moving the increment before the insert if strict accounting is required.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@memoryjobs/driver.go` around lines 314 - 342, The prefetch counter increment should occur before the item becomes visible to consumers to avoid a rare race where an extremely fast ACK decrements before we increment; move the c.msgInFlight.Add(1) call to just before c.pq.Insert(item) (while still holding c.cond.L) so the sequence is: set item options, ensure headers, inject propagation, c.msgInFlight.Add(1), then c.pq.Insert(item); no other behavior changes to c.handleItem, c.cond, or c.stopped are required.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/linux_inmemory.yml:
- Around line 105-112: The awk filter in the GitHub Actions job is matching
github.com/roadrunner-server/memory/v5 but your module and coverage paths are
v6; update the awk pattern in the linux_inmemory.yml block (the
/^github\.com\/roadrunner-server\/memory\/v5\// match/sub call) to use v6, and
also update all occurrences of v5 to v6 in tests/pkgs.txt so the coverage lines
for the memory package are correctly retained.
In @.github/workflows/linux_jobs.yml:
- Around line 105-112: The awk filter is matching the old module path
"github.com/roadrunner-server/memory/v5/" so coverage lines for the upgraded
module (v6) won’t be filtered; update the pattern inside the awk script (the
literal string used in the regex/sub call) from
"github.com/roadrunner-server/memory/v5/" to
"github.com/roadrunner-server/memory/v6/" so the block that tests
/^github\.com\/roadrunner-server\/memory\/v5\// and the sub(...) call both
reference the v6 prefix (leave the rest of the awk logic unchanged).
In `@memorykv/driver.go`:
- Around line 74-91: The Get method validates using keyTrimmed but performs the
lookup with the original key; update the lookup to use keyTrimmed so
whitespace-normalized keys are used consistently: in Driver.Get replace the call
to d.heap.Get(key) with d.heap.Get(keyTrimmed) (keeping the existing validation,
tracing span, and error handling intact) so stored entries and lookups match
when keys contain leading/trailing whitespace.
- Around line 265-270: The Delete loop can block when sending to
k.callback.stopCh (buffer size 1) if the TTL goroutine has exited or is blocked;
update the code in the Delete function where you call
d.heap.LoadAndDelete(keys[i]) and send to k.callback.stopCh to perform a
non-blocking send (use a select with a default) and only attempt the send when
k.callback != nil to avoid blocking; keep the existing logic to stop the
callback but ensure the send never blocks by falling through if the channel is
not ready.
---
Outside diff comments:
In `@memoryjobs/driver.go`:
- Around line 275-291: The delayed counter is incremented in the anonymous
goroutine started at "go func(jj *Item)" via c.delayed.Add(1) but never
decremented; add a corresponding decrement (c.delayed.Add(^uint64(0))) so the
counter is reduced when the delayed work completes. Place the decrement as a
deferred call immediately after the increment (mirroring how goroutines uses
defer c.goroutines.Add(^uint64(0))) so it always runs after time.Sleep and the
attempt to push to c.localQueue, ensuring Delayed in State() is accurate even if
the send fails.
- Around line 150-162: The Push method currently wraps the result of handleItem
unconditionally using errors.E(op, ...), causing a nil success to be reported as
an error; update Driver.Push so it calls c.handleItem(ctx, fromJob(jb)),
captures the returned error, and only returns errors.E(op, err) when err != nil,
otherwise return nil; locate the Push function and adjust the final return to
conditionally wrap the error from handleItem instead of always calling
errors.E(op, ...).
In `@memorykv/driver.go`:
- Around line 128-176: The Set loop currently calls d.heap.Delete(key) but
doesn't stop any running TTL goroutine for the existing Item, so retrieve the
existing entry from the heap (e.g., via a Get or lookup on d.heap) before
deleting; if existing.Item.callback != nil then signal its stopCh (and/or close
it in the same manner ttlcallback expects) and ensure the goroutine is stopped
(optionally wait for acknowledgement) before calling d.heap.Delete and replacing
the entry; reference the Set loop, d.heap.Delete, ttlcallback, Item.callback and
cb.stopCh when making this change.
---
Nitpick comments:
In `@memoryjobs/driver.go`:
- Around line 180-197: The Run method currently does a non-atomic
listeners.Load() check then calls c.consume() and c.listeners.Store(true), which
can race with concurrent Run/Resume calls; change the logic in Driver.Run (and
similarly in Driver.Resume) to perform an atomic state transition using
c.listeners.CompareAndSwap(false, true) and only call c.consume() when the CAS
succeeds, returning the existing-listener error if CAS fails; ensure you
reference the listeners field, the consume() call, and the Run/Resume methods
when making this change so duplicate consumer goroutines cannot be spawned.
- Around line 314-342: The prefetch counter increment should occur before the
item becomes visible to consumers to avoid a rare race where an extremely fast
ACK decrements before we increment; move the c.msgInFlight.Add(1) call to just
before c.pq.Insert(item) (while still holding c.cond.L) so the sequence is: set
item options, ensure headers, inject propagation, c.msgInFlight.Add(1), then
c.pq.Insert(item); no other behavior changes to c.handleItem, c.cond, or
c.stopped are required.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8acf01d9-ae64-428d-b4fe-98fcd634b4a7
⛔ Files ignored due to path filters (3)
go.sumis excluded by!**/*.sumgo.work.sumis excluded by!**/*.sumtests/go.sumis excluded by!**/*.sum
📒 Files selected for processing (14)
.github/workflows/linux_inmemory.yml.github/workflows/linux_jobs.yml.golangci.ymlgo.modmemoryjobs/driver.gomemoryjobs/item.gomemorykv/driver.gomemorykv/map.goplugin.gotests/configs/.rr-memory-tracer.yamltests/go.modtests/helpers/helpers.gotests/jobs_memory_test.gotests/kv_memory_test.go
💤 Files with no reviewable changes (1)
- tests/configs/.rr-memory-tracer.yaml
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/go.mod (1)
60-60: Complete the RoadRunner module migration by upgrading remaining v5 modules to v6.The indirect dependencies on
api/v4andgoridge/v3are transitive requirements from the v5 modules still present in the dependency tree (config/v5,http/v5,informer/v5,logger/v5,resetter/v5,rpc/v5,server/v5). To eliminate the cross-major coupling and complete the v6/v4 migration cleanly, upgrade these remaining v5 modules to their v6 equivalents.Also applies to: 64-64
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/go.mod` at line 60, Update the RoadRunner module versions in go.mod by replacing all remaining v5 module imports with their v6 counterparts (e.g., github.com/roadrunner-server/config/v5 -> github.com/roadrunner-server/config/v6, github.com/roadrunner-server/http/v5 -> github.com/roadrunner-server/http/v6, github.com/roadrunner-server/informer/v5 -> github.com/roadrunner-server/informer/v6, github.com/roadrunner-server/logger/v5 -> github.com/roadrunner-server/logger/v6, github.com/roadrunner-server/resetter/v5 -> github.com/roadrunner-server/resetter/v6, github.com/roadrunner-server/rpc/v5 -> github.com/roadrunner-server/rpc/v6, github.com/roadrunner-server/server/v5 -> github.com/roadrunner-server/server/v6), ensure transitive indirect entries for github.com/roadrunner-server/api/v4 and github.com/roadrunner-server/goridge/v3 are removed or upgraded to their v6/v4 equivalents, then run the Go tooling (go get ./... and go mod tidy) to resolve and lock the updated module graph.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/go.mod`:
- Line 60: Update the RoadRunner module versions in go.mod by replacing all
remaining v5 module imports with their v6 counterparts (e.g.,
github.com/roadrunner-server/config/v5 ->
github.com/roadrunner-server/config/v6, github.com/roadrunner-server/http/v5 ->
github.com/roadrunner-server/http/v6, github.com/roadrunner-server/informer/v5
-> github.com/roadrunner-server/informer/v6,
github.com/roadrunner-server/logger/v5 ->
github.com/roadrunner-server/logger/v6, github.com/roadrunner-server/resetter/v5
-> github.com/roadrunner-server/resetter/v6, github.com/roadrunner-server/rpc/v5
-> github.com/roadrunner-server/rpc/v6, github.com/roadrunner-server/server/v5
-> github.com/roadrunner-server/server/v6), ensure transitive indirect entries
for github.com/roadrunner-server/api/v4 and
github.com/roadrunner-server/goridge/v3 are removed or upgraded to their v6/v4
equivalents, then run the Go tooling (go get ./... and go mod tidy) to resolve
and lock the updated module graph.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d518dbfe-78b4-43ee-805b-2293c456f574
⛔ Files ignored due to path filters (3)
go.sumis excluded by!**/*.sumgo.work.sumis excluded by!**/*.sumtests/go.sumis excluded by!**/*.sum
📒 Files selected for processing (9)
.github/workflows/linux_inmemory.yml.github/workflows/linux_jobs.ymltests/configs/.rr-memory-tracer.yamltests/env/docker-compose-otel.yamltests/env/otel-collector-config.ymltests/go.modtests/kv_memory_test.gotests/php_test_files/composer.jsontests/pkgs.txt
💤 Files with no reviewable changes (3)
- tests/pkgs.txt
- tests/env/otel-collector-config.yml
- tests/env/docker-compose-otel.yaml
🚧 Files skipped from review as they are similar to previous changes (3)
- tests/kv_memory_test.go
- .github/workflows/linux_jobs.yml
- .github/workflows/linux_inmemory.yml
Reason for This PR
Description of Changes
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.][Reviewer TODO: Verify that these criteria are met. Request changes if not]git commit -s).CHANGELOG.md.Summary by CodeRabbit
Breaking Changes
Improvements
Configuration
Tests