diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 43d9dc1fab..2f6a44659e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -18,7 +18,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +57,12 @@ public class TemporaryResourceCache { private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); + private final ConcurrentSkipListMap cachedVersions = + new ConcurrentSkipListMap<>(); + private final Map cache = new ConcurrentHashMap<>(); private final boolean comparableResourceVersions; - private String latestResourceVersion; + private volatile String latestResourceVersion; private final Map activeUpdates = new HashMap<>(); @@ -139,7 +144,7 @@ private synchronized EventHandling onEvent( "Removing resource from temp cache. comparison: {} unknown state: {}", comp, unknownState); - cache.remove(resourceId); + cacheRemove(resourceId); // we propagate event only for our update or newer other can be discarded since we know we // will receive // additional event @@ -148,6 +153,7 @@ private synchronized EventHandling onEvent( result = EventHandling.OBSOLETE; } } + checkStaleResources(); var ed = activeUpdates.get(resourceId); if (ed != null && result != EventHandling.OBSOLETE) { log.debug("Setting last event for id: {} delete: {}", resourceId, delete); @@ -161,6 +167,21 @@ private synchronized EventHandling onEvent( } } + private void checkStaleResources() { + // todo add only once within an interval check + CompletableFuture.runAsync( + () -> { + var longLatest = Long.parseLong(latestResourceVersion); + var head = cachedVersions.headMap(longLatest); + for (var entry : head.entrySet()) { + synchronized (this) { + cache.remove(entry.getValue()); + cachedVersions.remove(entry.getKey()); + } + } + }); + } + /** put the item into the cache if it's for a later state than what has already been observed. */ public synchronized void putResource(T newResource) { if (!comparableResourceVersions) { @@ -204,11 +225,24 @@ public synchronized void putResource(T newResource) { "Temporarily moving ahead to target version {} for resource id: {}", newResource.getMetadata().getResourceVersion(), resourceId); - cache.put(resourceId, newResource); + cacheResource(resourceId, newResource); } } public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + private void cacheResource(ResourceID resourceId, T resource) { + var prevValue = cache.put(resourceId, resource); + if (prevValue != null) { + cachedVersions.remove(Long.parseLong(prevValue.getMetadata().getResourceVersion())); + } + cachedVersions.put(Long.parseLong(resource.getMetadata().getResourceVersion()), resourceId); + } + + private void cacheRemove(ResourceID resourceId) { + var removed = cache.remove(resourceId); + cachedVersions.remove(Long.parseLong(removed.getMetadata().getResourceVersion())); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java similarity index 99% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 592a552433..a40259c17c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -30,7 +30,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; -class TemporaryPrimaryResourceCacheTest { +class TemporaryResourceCacheTest { public static final String RESOURCE_VERSION = "2";