Skip to content

Cache

arkindex_worker.cache

Database mappings and helper methods for the experimental worker caching feature.

On methods that support caching, the database will be used for all reads, and writes will go both to the Arkindex API and the database, reducing network usage.

Classes

JSONField

Bases: Field

A Peewee field that stores a JSON payload as a string and parses it automatically.

Version

Bases: Model

Cache version table, used to warn about incompatible cache databases when a worker uses an outdated version of base-worker.

CachedImage

Bases: Model

Cache image table

CachedElement

Bases: Model

Cache element table

Functions
open_image
open_image(*args, max_size=None, **kwargs)

Open this element’s image as a Pillow image. This does not crop the image to the element’s polygon. IIIF servers with maxWidth, maxHeight or maxArea restrictions on image size are not supported.

Parameters:

Name Type Description Default
*args

Positional arguments passed to arkindex_worker.image.open_image

()
max_size Optional[int]

Subresolution of the image.

None
**kwargs

Keyword arguments passed to arkindex_worker.image.open_image

{}

Returns:

Type Description
Image

A Pillow image.

Raises:

Type Description
ValueError

When this element does not have an image ID or a polygon.

Source code in arkindex_worker/cache.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def open_image(self, *args, max_size: Optional[int] = None, **kwargs) -> Image:
    """
    Open this element's image as a Pillow image.
    This does not crop the image to the element's polygon.
    IIIF servers with maxWidth, maxHeight or maxArea restrictions on image size are not supported.

    :param *args: Positional arguments passed to [arkindex_worker.image.open_image][]
    :param max_size: Subresolution of the image.
    :param **kwargs: Keyword arguments passed to [arkindex_worker.image.open_image][]
    :raises ValueError: When this element does not have an image ID or a polygon.
    :return: A Pillow image.
    """
    from arkindex_worker.image import open_image, polygon_bounding_box

    if not self.image_id or not self.polygon:
        raise ValueError(f"Element {self.id} has no image")

    # Always fetch the image from the bounding box when size differs from full image
    bounding_box = polygon_bounding_box(self.polygon)
    if (
        bounding_box.width != self.image.width
        or bounding_box.height != self.image.height
    ):
        box = f"{bounding_box.x},{bounding_box.y},{bounding_box.width},{bounding_box.height}"
    else:
        box = "full"

    if max_size is None:
        resize = "full"
    else:
        # Do not resize for polygons that do not exactly match the images
        # as the resize is made directly by the IIIF server using the box parameter
        if (
            bounding_box.width != self.image.width
            or bounding_box.height != self.image.height
        ):
            resize = "full"

        # Do not resize when the image is below the maximum size
        elif self.image.width <= max_size and self.image.height <= max_size:
            resize = "full"
        else:
            ratio = max_size / max(self.image.width, self.image.height)
            new_width, new_height = int(self.image.width * ratio), int(
                self.image.height * ratio
            )
            resize = f"{new_width},{new_height}"

    url = self.image.url
    if not url.endswith("/"):
        url += "/"

    return open_image(
        f"{url}{box}/{resize}/0/default.jpg",
        *args,
        rotation_angle=self.rotation_angle,
        mirrored=self.mirrored,
        **kwargs,
    )

CachedTranscription

Bases: Model

Cache transcription table

CachedClassification

Bases: Model

Cache classification table

CachedEntity

Bases: Model

Cache entity table

CachedTranscriptionEntity

Bases: Model

Cache transcription entity table

Functions

init_cache_db

init_cache_db(path)

Create the cache database on the given path

Parameters:

Name Type Description Default
path str

Where the new database should be created

required
Source code in arkindex_worker/cache.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def init_cache_db(path: str):
    """
    Create the cache database on the given path
    :param path: Where the new database should be created
    """
    db.init(
        path,
        pragmas={
            # SQLite ignores foreign keys and check constraints by default!
            "foreign_keys": 1,
            "ignore_check_constraints": 0,
        },
    )
    db.connect()
    logger.info(f"Connected to cache on {path}")

create_tables

create_tables()

Creates the tables in the cache DB only if they do not already exist.

Source code in arkindex_worker/cache.py
270
271
272
273
274
def create_tables():
    """
    Creates the tables in the cache DB only if they do not already exist.
    """
    db.create_tables(MODELS)

create_version_table

create_version_table()

Creates the Version table in the cache DB. This step must be independent from other tables creation since we only want to create the table and add the one and only Version entry when the cache is created from scratch.

Source code in arkindex_worker/cache.py
277
278
279
280
281
282
283
284
285
def create_version_table():
    """
    Creates the Version table in the cache DB.
    This step must be independent from other tables creation since we only
    want to create the table and add the one and only Version entry when the
    cache is created from scratch.
    """
    db.create_tables([Version])
    Version.create(version=SQL_VERSION)

check_version

check_version(cache_path)

Check the validity of the SQLite version

Parameters:

Name Type Description Default
cache_path Union[str, Path]

Path towards a local SQLite database

required
Source code in arkindex_worker/cache.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def check_version(cache_path: Union[str, Path]):
    """
    Check the validity of the SQLite version

    :param cache_path: Path towards a local SQLite database
    """
    with SqliteDatabase(cache_path) as provided_db:
        with provided_db.bind_ctx([Version]):
            try:
                version = Version.get().version
            except OperationalError:
                version = None

            assert (
                version == SQL_VERSION
            ), f"The SQLite database {cache_path} does not have the correct cache version, it should be {SQL_VERSION}"

retrieve_parents_cache_path

retrieve_parents_cache_path(
    parent_ids, data_dir="/data", chunk=None
)

Retrieve the path of the given parent’s in the

Parameters:

Name Type Description Default
parent_ids list

List of element IDs to search

required
data_dir str

Base folder where to look for

'/data'
chunk int

Index of the chunk of the db that might contain the paths

None

Returns:

Type Description
list

The corresponding list of paths

Source code in arkindex_worker/cache.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def retrieve_parents_cache_path(
    parent_ids: list, data_dir: str = "/data", chunk: int = None
) -> list:
    """
    Retrieve the path of the given parent's in the
    :param parent_ids: List of element IDs to search
    :param data_dir: Base folder where to look for
    :param chunk: Index of the chunk of the db that might contain the paths
    :return: The corresponding list of paths
    """
    assert isinstance(parent_ids, list)
    assert os.path.isdir(data_dir)

    # Handle possible chunk in parent task name
    # This is needed to support the init_elements databases
    filenames = [
        "db.sqlite",
    ]
    if chunk is not None:
        filenames.append(f"db_{chunk}.sqlite")

    # Find all the paths for these databases
    return list(
        filter(
            lambda p: os.path.isfile(p),
            [
                os.path.join(data_dir, parent, name)
                for parent in parent_ids
                for name in filenames
            ],
        )
    )

merge_parents_cache

merge_parents_cache(paths, current_database)

Merge all the potential parent task’s databases into the existing local one

Parameters:

Name Type Description Default
paths list

Path to cache databases

required
current_database str

Path to the current database

required
Source code in arkindex_worker/cache.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def merge_parents_cache(paths: list, current_database: str):
    """
    Merge all the potential parent task's databases into the existing local one
    :param paths: Path to cache databases
    :param current_database: Path to the current database
    """
    assert os.path.exists(current_database)

    if not paths:
        logger.info("No parents cache to use")
        return

    # Open a connection on current database
    connection = sqlite3.connect(current_database)
    cursor = connection.cursor()

    # Merge each table into the local database
    for idx, path in enumerate(paths):
        # Check that the parent cache uses a compatible version
        check_version(path)

        with SqliteDatabase(path) as source:
            with source.bind_ctx(MODELS):
                source.create_tables(MODELS)

        logger.info(f"Merging parent db {path} into {current_database}")
        statements = [
            "PRAGMA page_size=80000;",
            "PRAGMA synchronous=OFF;",
            f"ATTACH DATABASE '{path}' AS source_{idx};",
            f"REPLACE INTO images SELECT * FROM source_{idx}.images;",
            f"REPLACE INTO elements SELECT * FROM source_{idx}.elements;",
            f"REPLACE INTO transcriptions SELECT * FROM source_{idx}.transcriptions;",
            f"REPLACE INTO classifications SELECT * FROM source_{idx}.classifications;",
        ]

        for statement in statements:
            cursor.execute(statement)
        connection.commit()