From 72305954be07645cfde043a8a6a242d19ab4417b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Sat, 7 Mar 2026 22:34:27 +0100 Subject: [PATCH 1/3] improve: read cache after write corner case for missed delete event MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/TemporaryResourceCache.java | 40 +++++++++++++++++-- ...t.java => TemporaryResourceCacheTest.java} | 2 +- 2 files changed, 38 insertions(+), 4 deletions(-) rename operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/{TemporaryPrimaryResourceCacheTest.java => TemporaryResourceCacheTest.java} (99%) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 43d9dc1fab..db5519bf49 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -18,7 +18,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +57,12 @@ public class TemporaryResourceCache { private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); + private final ConcurrentSkipListMap cachedVersions = + new ConcurrentSkipListMap<>(); + private final Map cache = new ConcurrentHashMap<>(); private final boolean comparableResourceVersions; - private String latestResourceVersion; + private volatile String latestResourceVersion; private final Map activeUpdates = new HashMap<>(); @@ -139,7 +144,7 @@ private synchronized EventHandling onEvent( "Removing resource from temp cache. comparison: {} unknown state: {}", comp, unknownState); - cache.remove(resourceId); + cacheRemove(resourceId, cached.getMetadata().getResourceVersion()); // we propagate event only for our update or newer other can be discarded since we know we // will receive // additional event @@ -155,12 +160,25 @@ private synchronized EventHandling onEvent( delete ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState) : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); + checkStaleResources(); return EventHandling.DEFER; } else { return result; } } + private void checkStaleResources() { + CompletableFuture.runAsync( + () -> { + var longLatest = Long.parseLong(latestResourceVersion); + var head = cachedVersions.headMap(longLatest); + for (var entry : head.entrySet()) { + cache.remove(entry.getValue()); + cachedVersions.remove(entry.getKey()); + } + }); + } + /** put the item into the cache if it's for a later state than what has already been observed. */ public synchronized void putResource(T newResource) { if (!comparableResourceVersions) { @@ -204,11 +222,27 @@ public synchronized void putResource(T newResource) { "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), resourceId); - cache.put(resourceId, newResource); + cacheResource(resourceId, newResource); } } public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + private void cacheResource(ResourceID resourceId, T resource) { + var actualCached = cache.get(resourceId); + cache.put(resourceId, resource); + CompletableFuture.runAsync( + () -> { + cachedVersions.remove(Long.parseLong(actualCached.getMetadata().getResourceVersion())); + cachedVersions.put( + Long.parseLong(resource.getMetadata().getResourceVersion()), resourceId); + }); + } + + private void cacheRemove(ResourceID resourceId, String cachedResourceVersion) { + cache.remove(resourceId); + CompletableFuture.runAsync(() -> cachedVersions.remove(Long.parseLong(cachedResourceVersion))); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java similarity index 99% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 592a552433..a40259c17c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; -class TemporaryPrimaryResourceCacheTest { +class TemporaryResourceCacheTest { public static final String RESOURCE_VERSION = "2"; From a7f5373f3aebb4b9f2fb0431507604ed258e7a43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Sun, 8 Mar 2026 14:28:55 +0100 Subject: [PATCH 2/3] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/TemporaryResourceCache.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index db5519bf49..83173b75f1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -144,7 +144,7 @@ private synchronized EventHandling onEvent( "Removing resource from temp cache. comparison: {} unknown state: {}", comp, unknownState); - cacheRemove(resourceId, cached.getMetadata().getResourceVersion()); + cacheRemove(resourceId); // we propagate event only for our update or newer other can be discarded since we know we // will receive // additional event @@ -160,7 +160,6 @@ private synchronized EventHandling onEvent( delete ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState) : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); - checkStaleResources(); return EventHandling.DEFER; } else { return result; @@ -173,8 +172,10 @@ private void checkStaleResources() { var longLatest = Long.parseLong(latestResourceVersion); var head = cachedVersions.headMap(longLatest); for (var entry : head.entrySet()) { - cache.remove(entry.getValue()); - cachedVersions.remove(entry.getKey()); + synchronized (this) { + cache.remove(entry.getValue()); + cachedVersions.remove(entry.getKey()); + } } }); } @@ -231,18 +232,15 @@ public synchronized Optional getResourceFromCache(ResourceID resourceID) { } private void cacheResource(ResourceID resourceId, T resource) { - var actualCached = cache.get(resourceId); - cache.put(resourceId, resource); - CompletableFuture.runAsync( - () -> { - cachedVersions.remove(Long.parseLong(actualCached.getMetadata().getResourceVersion())); - cachedVersions.put( - Long.parseLong(resource.getMetadata().getResourceVersion()), resourceId); - }); + var prevValue = cache.put(resourceId, resource); + if (prevValue != null) { + cachedVersions.remove(Long.parseLong(prevValue.getMetadata().getResourceVersion())); + } + cachedVersions.put(Long.parseLong(resource.getMetadata().getResourceVersion()), resourceId); } - private void cacheRemove(ResourceID resourceId, String cachedResourceVersion) { - cache.remove(resourceId); - CompletableFuture.runAsync(() -> cachedVersions.remove(Long.parseLong(cachedResourceVersion))); + private void cacheRemove(ResourceID resourceId) { + var removed = cache.remove(resourceId); + cachedVersions.remove(Long.parseLong(removed.getMetadata().getResourceVersion())); } } From e4ececf5787e5cd0f2f4c9c86d323269d1346622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Sun, 8 Mar 2026 14:41:51 +0100 Subject: [PATCH 3/3] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/TemporaryResourceCache.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 83173b75f1..2f6a44659e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -153,6 +153,7 @@ private synchronized EventHandling onEvent( result = EventHandling.OBSOLETE; } } + checkStaleResources(); var ed = activeUpdates.get(resourceId); if (ed != null && result != EventHandling.OBSOLETE) { log.debug("Setting last event for id: {} delete: {}", resourceId, delete); @@ -167,6 +168,7 @@ private synchronized EventHandling onEvent( } private void checkStaleResources() { + // todo add only once within an interval check CompletableFuture.runAsync( () -> { var longLatest = Long.parseLong(latestResourceVersion);