25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342 | class GridArchive(Archive):
"""An archive that divides each dimension into a uniformly-sized cells.
The source code of this class is inspired by the GridArchive class of pyribs <https://github.com/icaros-usc/pyribs/blob/master/ribs/archives/_grid_archive.py>
This archive is the container described in `Mouret 2015
<https://arxiv.org/pdf/1504.04909.pdf>`_. It can be visualized as an
n-dimensional grid in the measure space that is divided into a certain
number of cells in each dimension. Each cell contains an elite, i.e. a
solution that `maximizes` the objective function for the measures in that
cell.
"""
def __init__(
self,
dimensions: Sequence[int],
ranges: Sequence[Tuple[float, float]],
instances: Optional[Iterable[Instance]] = None,
eps: float = 1e-6,
dtype=np.float64,
):
"""Creates a GridArchive instance
Args:
dimensions (Sequence[int]): (array-like of int): Number of cells in each dimension of the
measure space, e.g. ``[20, 30, 40]`` indicates there should be 3
dimensions with 20, 30, and 40 cells. (The number of dimensions is
implicitly defined in the length of this argument).
ranges (Sequence[Tuple[float]]): (array-like of (float, float)): Upper and lower bound of each
dimension of the measure space, e.g. ``[(-1, 1), (-2, 2)]``
indicates the first dimension should have bounds :math:`[-1,1]`
(inclusive), and the second dimension should have bounds
:math:`[-2,2]` (inclusive). ``ranges`` should be the same length as
``dims``.
instances (Optional[Iterable[Instance]], optional): Instances to pre-initialise the archive. Defaults to None.
eps (float, optional): Due to floating point precision errors, we add a small
epsilon when computing the archive indices in the :meth:`index_of`
method -- refer to the implementation `here. Defaults to 1e-6.
dtype(str or data-type): Data type of the solutions, objectives,
and measures.
Raises:
ValueError: ``dimensions`` and ``ranges`` are not the same length
"""
Archive.__init__(self, threshold=np.finfo(np.float32).max, dtype=dtype)
if len(ranges) == 0 or len(dimensions) == 0:
raise ValueError("dimensions and ranges must have length >= 1")
if len(ranges) != len(dimensions):
raise ValueError(
f"len(dimensions) = {len(dimensions)} != len(ranges) = {len(ranges)} in GridArchive.__init__()"
)
self._dimensions = np.asarray(dimensions)
ranges = list(zip(*ranges))
self._lower_bounds = np.array(ranges[0], dtype=dtype)
self._upper_bounds = np.array(ranges[1], dtype=dtype)
self._interval = self._upper_bounds - self._lower_bounds
self._eps = eps
self._cells = np.prod(self._dimensions, dtype=object)
self._grid: Dict[int, Instance] = {}
_bounds = []
for dimension, l_b, u_b in zip(
self._dimensions, self._lower_bounds, self._upper_bounds
):
_bounds.append(np.linspace(l_b, u_b, dimension))
self._boundaries = np.asarray(_bounds)
if instances is not None:
self.extend(instances)
@property
def dimensions(self):
return self._dimensions
@property
def bounds(self):
"""list of numpy.ndarray: The boundaries of the cells in each dimension.
Entry ``i`` in this list is an array that contains the boundaries of the
cells in dimension ``i``. The array contains ``self.dims[i] + 1``
entries laid out like this::
Archive cells: | 0 | 1 | ... | self.dims[i] |
boundaries[i]: 0 1 2 self.dims[i] - 1 self.dims[i]
Thus, ``boundaries[i][j]`` and ``boundaries[i][j + 1]`` are the lower
and upper bounds of cell ``j`` in dimension ``i``. To access the lower
bounds of all the cells in dimension ``i``, use ``boundaries[i][:-1]``,
and to access all the upper bounds, use ``boundaries[i][1:]``.
"""
return self._boundaries
@property
def n_cells(self):
return self._cells
@property
def coverage(self):
"""Get the coverage of the hypercube space.
The coverage is calculated has the number of cells filled over the total space available.
Returns:
float: Filled cells over the total available.
"""
if len(self._grid) == 0:
return 0.0
return len(self._grid) / self._cells
@property
def filled_cells(self):
return self._grid.keys()
@property
def instances(self):
return list(self._grid.values())
def __str__(self):
return f"GridArchive(dim={self._dimensions},cells={self._cells},bounds={self._boundaries})"
def __repr__(self):
return f"GridArchive(dim={self._dimensions},cells={self._cells},bounds={self._boundaries})"
def __len__(self):
return len(self._grid)
def __getitem__(self, key):
"""Returns a dictionary with the descriptors as the keys. The values are the instances found.
Note that some of the given keys may not be in the archive.
Args:
key (array-like or descriptor): Descriptors of the instances that want to retrieve.
Valid examples are:
- archive[[0,11], [0,5]] --> Get the instances with the descriptors (0,11) and (0, 5)
- archive[0,11] --> Get the instance with the descriptor (0,11)
Raises:
TypeError: If the key is an slice. Not allowed.
ValueError: If the shape of the keys are not valid.
Returns:
dict: Returns a dict with the found instances.
"""
if isinstance(key, slice):
raise TypeError(
"Slicing is not available in GridArchive. Use 1D index or descriptor-type indeces"
)
descriptors = np.asarray(key)
if descriptors.ndim == 1 and descriptors.shape[0] != len(self._dimensions):
raise ValueError(
f"Expected descriptors to be an array with shape "
f"(batch_size, 1) or (batch_size, dimensions) (i.e. shape "
f"(batch_size, {len(self._dimensions)})) but it had shape "
f"{descriptors.shape}"
)
indeces = self.index_of(descriptors).tolist()
if isinstance(indeces, int):
indeces = [indeces]
descriptors = [descriptors]
instances = {}
for idx, desc in zip(indeces, descriptors):
if idx not in self._grid:
print(f"There is not any instance in the cell {desc}.")
else:
instances[tuple(desc)] = copy.copy(self._grid[idx])
return instances
def __iter__(self):
"""Iterates over the dictionary of instances
Returns:
Iterator: Yields position in the hypercube and instance located in such position
"""
return iter(self._grid.values())
def lower_i(self, i):
if i < 0 or i > len(self._lower_bounds):
msg = f"index {i} is out of bounds. Valid values are [0-{len(self._boundaries)}]"
raise ValueError(msg)
return self._lower_bounds[i]
def upper_i(self, i):
if i < 0 or i > len(self._upper_bounds):
msg = f"index {i} is out of bounds. Valid values are [0-{len(self._boundaries)}]"
raise ValueError(msg)
return self._upper_bounds[i]
def append(self, instance: Instance):
"""Inserts an Instance into the Grid
Args:
instance (Instance): Instace to be inserted
Raises:
TypeError: ``instance`` is not a instance of the class Instance.
"""
if isinstance(instance, Instance):
index = self.index_of(np.asarray(instance.descriptor))
if index not in self._grid or instance > self._grid[index]:
self._grid[index] = instance.clone()
else:
msg = "Only objects of type Instance can be inserted into a GridArchive"
raise TypeError(msg)
def extend(self, iterable: Iterable[Instance], *args, **kwargs):
"""Includes all the instances in iterable into the Grid
Args:
iterable (Iterable[Instance]): Iterable of instances
"""
if not all(isinstance(i, Instance) for i in iterable):
msg = "Only objects of type Instance can be inserted into a GridArchive"
raise TypeError(msg)
indeces = self.index_of([i.descriptor for i in iterable])
for idx, instance in zip(indeces, iterable, strict=True):
if idx not in self._grid or instance.fitness > self._grid[idx].fitness:
self._grid[idx] = instance.clone()
def remove(self, iterable: Iterable[Instance]):
"""Removes all the instances in iterable from the grid"""
if not all(isinstance(i, Instance) for i in iterable):
msg = "Only objects of type Instance can be removed from a CVTArchive"
raise TypeError(msg)
indeces_to_remove = self.index_of([i.descriptor for i in iterable])
for index in indeces_to_remove:
if index in self._grid:
del self._grid[index]
def index_of(self, descriptors):
"""Computes the indeces of a batch of descriptors.
Args:
descriptors (array-like): (batch_size, dimensions) array of descriptors for each instance
Raises:
ValueError: ``descriptors`` is not shape (batch_size, dimensions)
Returns:
np.ndarray: (batch_size, ) array of integer indices representing the flattened grid coordinates.
"""
if len(descriptors) == 0:
return np.empty(0)
descriptors = np.asarray(descriptors)
if (
descriptors.ndim == 1
and descriptors.shape[0] != len(self._dimensions)
or descriptors.ndim == 2
and descriptors.shape[1] != len(self._dimensions)
):
raise ValueError(
f"Expected descriptors to be an array with shape "
f"(batch_size, dimensions) (i.e. shape "
f"(batch_size, {len(self._dimensions)})) but it had shape "
f"{descriptors.shape}"
)
grid_indices = (
(self._dimensions * (descriptors - self._lower_bounds) + self._eps)
/ self._interval
).astype(int)
# Clip the indexes to make sure they are in the expected range for each dimension
clipped = np.clip(grid_indices, 0, self._dimensions - 1)
return self._grid_to_int_index(clipped)
def _grid_to_int_index(self, grid_indices) -> np.ndarray:
grid_indices = np.asarray(grid_indices)
if len(self._dimensions) > 64:
strides = np.cumprod((1,) + tuple(self._dimensions[::-1][:-1]))[::-1]
# Reshape strides to (1, num_dimensions) to make it broadcastable with grid_indices
strides = strides.reshape(1, -1)
flattened_indeces = np.sum(grid_indices * strides, axis=1, dtype=object)
else:
flattened_indeces = np.ravel_multi_index(
grid_indices.T, self._dimensions
).astype(int)
return flattened_indeces
def int_to_grid_index(self, int_indices) -> np.ndarray:
int_indices = np.asarray(int_indices)
if len(self._dimensions) > 64:
# Manually unravel the index for dimensions > 64
unravel_indices = []
remaining_indices = int_indices.astype(object)
for dim_size in self._dimensions[::-1]:
unravel_indices.append(remaining_indices % dim_size)
remaining_indices //= dim_size
unravel_indices = np.array(unravel_indices[::-1]).T
else:
unravel_indices = np.asarray(
np.unravel_index(
int_indices,
self._dimensions,
)
).T.astype(int)
return unravel_indices
def asdict(self) -> dict:
return {
"dimensions": self._dimensions.tolist(),
"lbs": self._lower_bounds.tolist(),
"ubs": self._upper_bounds.tolist(),
"n_cells": self._cells,
"instances": {
i: instance.asdict() for i, instance in enumerate(self._grid.values())
},
}
def to_json(self) -> str:
return json.dumps(self.asdict(), indent=4)
|