-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathflush_shared_cache.py
More file actions
135 lines (112 loc) · 5.27 KB
/
flush_shared_cache.py
File metadata and controls
135 lines (112 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import copy, os
from multiprocessing import Lock
import utilities
# A Orchestrator is a proxying manager manages the shared cache data between processes.
# Orchestrator is managed by an internal server process
class Orchestrator(object):
def __init__(self, number_clusters):
self.cached_data_clusters = [{} for _ in range(number_clusters)]
self.lock = Lock()
def getClusterByIndex(self, cluster_index):
self.lock.acquire()
img = copy.deepcopy(self.cached_data_clusters[cluster_index])
self.lock.release()
return img
def getAllLocalCacheWithoutLock(self):
self.lock.acquire()
img = copy.deepcopy(self.cached_data_clusters)
self.lock.release()
utilities.dump_data_to_file(img, "tmp", "cache")
content = self.getClusterSizeAllWithoutLock()
print("Cache pairs " + str(content))
print("Cache (mb) " + str(os.path.getsize(os.path.join("tmp", "cache"))/(1024*1024.0)))
def getClusterSizeByIndex(self,cluster_index):
temp_size = 0
self.lock.acquire()
for _,value in self.cached_data_clusters[cluster_index].items():
temp_size += len(value)
self.lock.release()
return temp_size
def getClusterSizeAll(self):
cur_size = []
self.lock.acquire()
for cluster_index in range(len(self.cached_data_clusters)):
temp_size = 0
for _, value in self.cached_data_clusters[cluster_index].items():
temp_size += len(value)
cur_size.append(temp_size)
self.lock.release()
return cur_size
def getClusterSizeAllWithoutLock(self):
cur_size = []
for cluster_index in range(len(self.cached_data_clusters)):
temp_size = 0
for _, value in self.cached_data_clusters[cluster_index].items():
temp_size += len(value)
cur_size.append(temp_size)
# print("Test " + str(cur_size))
return sum(cur_size)
def getClusterKeyNum(self):
cur_size = []
self.lock.acquire()
for cluster_index in range(len(self.cached_data_clusters)):
cur_size.append(len(self.cached_data_clusters[cluster_index]))
self.lock.release()
return cur_size
def clearClusterByIndex(self, cluster_index):
self.lock.acquire()
self.cached_data_clusters[cluster_index].clear()
self.lock.release()
def readCacheSizeAll(self):
self.lock.acquire()
count = 0
for cluster_index in range(len(self.cached_data_clusters)):
count += len(self.cached_data_clusters[cluster_index])
self.lock.release()
return count
def updateEntriesByDocument(self, keyword, docId, cluster_index):
self.lock.acquire()
if keyword in self.cached_data_clusters[cluster_index]:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([docId])
self.lock.release()
def updateClusterEntries(self, key_id_pairs): # (keyword,id,cluster_index)
self.lock.acquire()
for (keyword, docId, cluster_index) in key_id_pairs:
if cluster_index in self.limited_indices:
index_in_cap = self.limited_indices.index(cluster_index)
cur_threshold = self.limited_capacities[index_in_cap]
cur_cap = 0
for _, value in self.cached_data_clusters[cluster_index].items():
cur_cap += len(value)
if cur_cap < cur_threshold:
if keyword in self.cached_data_clusters[cluster_index]:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([docId])
else:
if keyword in self.cached_data_clusters[cluster_index]:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([docId])
self.lock.release()
def updateClusterByIndex(self, cluster_index, falling_back_data):
self.lock.acquire()
for keyword, ids in falling_back_data.items():
if keyword in self.cached_data_clusters[cluster_index]:
for docId in ids:
self.cached_data_clusters[cluster_index][keyword].add(docId)
else:
self.cached_data_clusters[cluster_index][keyword] = set([])
for docId in ids:
self.cached_data_clusters[cluster_index][keyword].add(docId)
self.lock.release()
def search_local_cache(self, keyword):
for cluster in self.cached_data_clusters:
if keyword in cluster:
return cluster[keyword]
return None
def setClusterCap(self, limited_indices, limited_capacities):
self.limited_indices = limited_indices
self.limited_capacities = limited_capacities