Skip to content

Utils

arkindex_worker.utils

Attributes

CHUNK_SIZE module-attribute

CHUNK_SIZE = 1024

Chunk Size used for ZSTD compression

DEFAULT_BATCH_SIZE module-attribute

DEFAULT_BATCH_SIZE = 50

Batch size used for bulk publication to Arkindex

Functions

pluralize

pluralize(singular: str, count: int) -> str

Pluralize a noun, if necessary, using simplified rules of English pluralization and a list of exceptions.

Parameters:

Name Type Description Default
singular str

A singular noun describing an object

required
count int

The object count, to determine whether to pluralize or not

required

Returns:

Type Description
str

The noun in its singular or plural form

Source code in arkindex_worker/utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def pluralize(singular: str, count: int) -> str:
    """Pluralize a noun, if necessary, using simplified rules of English pluralization and a list of exceptions.

    :param str singular: A singular noun describing an object
    :param int count: The object count, to determine whether to pluralize or not
    :return str: The noun in its singular or plural form
    """
    if count == 1:
        return singular

    some_exceptions = {
        "child": "children",
        "class": "classes",
        "entity": "entities",
        "metadata": "metadata",
    }
    if singular in some_exceptions:
        return some_exceptions[singular]

    return singular + "s"

parse_source_id

parse_source_id(value: str) -> bool | str | None

Parse a UUID argument (Worker Version, Worker Run, …) to use it directly in the API. Arkindex API filters generally expect False to filter manual sources.

Source code in arkindex_worker/utils.py
42
43
44
45
46
47
48
49
def parse_source_id(value: str) -> bool | str | None:
    """
    Parse a UUID argument (Worker Version, Worker Run, ...) to use it directly in the API.
    Arkindex API filters generally expect `False` to filter manual sources.
    """
    if value == MANUAL_SOURCE:
        return False
    return value or None

decompress_zst_archive

decompress_zst_archive(
    compressed_archive: Path,
) -> tuple[int, Path]

Decompress a ZST-compressed tar archive in data dir. The tar archive is not extracted. This returns the path to the archive and the file descriptor.

Beware of closing the file descriptor explicitly or the main process will keep the memory held even if the file is deleted.

Parameters:

Name Type Description Default
compressed_archive Path

Path to the target ZST-compressed archive

required

Returns:

Type Description
tuple[int, Path]

File descriptor and path to the uncompressed tar archive

Source code in arkindex_worker/utils.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def decompress_zst_archive(compressed_archive: Path) -> tuple[int, Path]:
    """
    Decompress a ZST-compressed tar archive in data dir. The tar archive is not extracted.
    This returns the path to the archive and the file descriptor.

    Beware of closing the file descriptor explicitly or the main
    process will keep the memory held even if the file is deleted.

    :param compressed_archive: Path to the target ZST-compressed archive
    :return: File descriptor and path to the uncompressed tar archive
    """
    dctx = zstd.ZstdDecompressor()
    archive_fd, archive_path = tempfile.mkstemp(suffix=".tar")
    archive_path = Path(archive_path)

    logger.debug(f"Uncompressing file to {archive_path}")
    try:
        with (
            compressed_archive.open("rb") as compressed,
            archive_path.open("wb") as decompressed,
        ):
            dctx.copy_stream(compressed, decompressed)
        logger.debug(f"Successfully uncompressed archive {compressed_archive}")
    except zstd.ZstdError as e:
        raise Exception(f"Couldn't uncompressed archive: {e}") from e

    return archive_fd, archive_path

extract_tar_archive

extract_tar_archive(archive_path: Path, destination: Path)

Extract the tar archive’s content to a specific destination

Parameters:

Name Type Description Default
archive_path Path

Path to the archive

required
destination Path

Path where the archive’s data will be extracted

required
Source code in arkindex_worker/utils.py
85
86
87
88
89
90
91
92
93
94
95
96
def extract_tar_archive(archive_path: Path, destination: Path):
    """
    Extract the tar archive's content to a specific destination

    :param archive_path: Path to the archive
    :param destination: Path where the archive's data will be extracted
    """
    try:
        with tarfile.open(archive_path) as tar_archive:
            tar_archive.extractall(destination)
    except tarfile.ReadError as e:
        raise Exception(f"Couldn't handle the decompressed Tar archive: {e}") from e

extract_tar_zst_archive

extract_tar_zst_archive(
    compressed_archive: Path, destination: Path
) -> tuple[int, Path]

Extract a ZST-compressed tar archive’s content to a specific destination

Parameters:

Name Type Description Default
compressed_archive Path

Path to the target ZST-compressed archive

required
destination Path

Path where the archive’s data will be extracted

required

Returns:

Type Description
tuple[int, Path]

File descriptor and path to the uncompressed tar archive

Source code in arkindex_worker/utils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def extract_tar_zst_archive(
    compressed_archive: Path, destination: Path
) -> tuple[int, Path]:
    """
    Extract a ZST-compressed tar archive's content to a specific destination

    :param compressed_archive: Path to the target ZST-compressed archive
    :param destination: Path where the archive's data will be extracted
    :return: File descriptor and path to the uncompressed tar archive
    """

    archive_fd, archive_path = decompress_zst_archive(compressed_archive)
    extract_tar_archive(archive_path, destination)

    return archive_fd, archive_path

close_delete_file

close_delete_file(file_descriptor: int, file_path: Path)

Close the file descriptor of the file and delete the file

Parameters:

Name Type Description Default
file_descriptor int

File descriptor of the archive

required
file_path Path

Path to the archive

required
Source code in arkindex_worker/utils.py
116
117
118
119
120
121
122
123
124
125
126
127
def close_delete_file(file_descriptor: int, file_path: Path):
    """
    Close the file descriptor of the file and delete the file

    :param file_descriptor: File descriptor of the archive
    :param file_path: Path to the archive
    """
    try:
        os.close(file_descriptor)
        file_path.unlink()
    except OSError as e:
        logger.warning(f"Unable to delete file {file_path}: {e}")

zstd_compress

zstd_compress(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]

Compress a file using the Zstandard compression algorithm.

Parameters:

Name Type Description Default
source Path

Path to the file to compress.

required
destination Path | None

Optional path for the created ZSTD archive. A tempfile will be created if this is omitted.

None

Returns:

Type Description
tuple[int | None, Path, str]

The file descriptor (if one was created) and path to the compressed file, hash of its content.

Source code in arkindex_worker/utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def zstd_compress(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]:
    """Compress a file using the Zstandard compression algorithm.

    :param source: Path to the file to compress.
    :param destination: Optional path for the created ZSTD archive. A tempfile will be created if this is omitted.
    :return: The file descriptor (if one was created) and path to the compressed file, hash of its content.
    """
    compressor = zstd.ZstdCompressor(level=3)
    archive_hasher = hashlib.md5()

    # Parse destination and create a tmpfile if none was specified
    file_d, destination = (
        tempfile.mkstemp(prefix="teklia-", suffix=".tar.zst")
        if destination is None
        else (None, destination)
    )
    destination = Path(destination)
    logger.debug(f"Compressing file to {destination}")

    try:
        with destination.open("wb") as archive_file, source.open("rb") as model_data:
            for model_chunk in iter(lambda: model_data.read(CHUNK_SIZE), b""):
                compressed_chunk = compressor.compress(model_chunk)
                archive_hasher.update(compressed_chunk)
                archive_file.write(compressed_chunk)
        logger.debug(f"Successfully compressed {source}")
    except zstd.ZstdError as e:
        raise Exception(f"Couldn't compress archive: {e}") from e
    return file_d, destination, archive_hasher.hexdigest()

create_tar_archive

create_tar_archive(
    path: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]

Create a tar archive using the content at specified location.

Parameters:

Name Type Description Default
path Path

Path to the file to archive

required
destination Path | None

Optional path for the created TAR archive. A tempfile will be created if this is omitted.

None

Returns:

Type Description
tuple[int | None, Path, str]

The file descriptor (if one was created) and path to the TAR archive, hash of its content.

Source code in arkindex_worker/utils.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def create_tar_archive(
    path: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]:
    """Create a tar archive using the content at specified location.

    :param path: Path to the file to archive
    :param destination: Optional path for the created TAR archive. A tempfile will be created if this is omitted.
    :return: The file descriptor (if one was created) and path to the TAR archive, hash of its content.
    """
    # Parse destination and create a tmpfile if none was specified
    file_d, destination = (
        tempfile.mkstemp(prefix="teklia-", suffix=".tar")
        if destination is None
        else (None, destination)
    )
    destination = Path(destination)
    logger.debug(f"Compressing file to {destination}")

    # Create an uncompressed tar archive with all the needed files
    # Files hierarchy ifs kept in the archive.
    files = []
    try:
        logger.debug(f"Compressing files to {destination}")
        with tarfile.open(destination, "w") as tar:
            for p in path.rglob("*"):
                x = p.relative_to(path)
                tar.add(p, arcname=x, recursive=False)
                # Only keep files when computing the hash
                if p.is_file():
                    files.append(p)
        logger.debug(f"Successfully created Tar archive from files @ {path}")
    except tarfile.TarError as e:
        raise Exception(f"Couldn't create Tar archive: {e}") from e

    # Sort by path
    files.sort()

    content_hasher = hashlib.md5()
    # Compute hash of the files
    for file_path in files:
        with file_path.open("rb") as file_data:
            for chunk in iter(lambda: file_data.read(CHUNK_SIZE), b""):
                content_hasher.update(chunk)
    return file_d, destination, content_hasher.hexdigest()

create_tar_zst_archive

create_tar_zst_archive(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str, str]

Helper to create a TAR+ZST archive from a source folder.

Parameters:

Name Type Description Default
source Path

Path to the folder whose content should be archived.

required
destination Path | None

Path to the created archive, defaults to None. If unspecified, a temporary file will be created.

None

Returns:

Type Description
tuple[int | None, Path, str, str]

The file descriptor of the created tempfile (if one was created), path to the archive, its hash and the hash of the tar archive’s content.

Source code in arkindex_worker/utils.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def create_tar_zst_archive(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str, str]:
    """Helper to create a TAR+ZST archive from a source folder.

    :param source: Path to the folder whose content should be archived.
    :param destination: Path to the created archive, defaults to None. If unspecified, a temporary file will be created.
    :return: The file descriptor of the created tempfile (if one was created), path to the archive, its hash and the hash of the tar archive's content.
    """
    # Create tar archive
    tar_fd, tar_archive, tar_hash = create_tar_archive(source)

    zst_fd, zst_archive, zst_hash = zstd_compress(tar_archive, destination)

    close_delete_file(tar_fd, tar_archive)

    return zst_fd, zst_archive, zst_hash, tar_hash

batch_publication

batch_publication(func: Callable) -> Callable

Decorator for functions that should raise an error when the value passed through the batch_size parameter is not a strictly positive integer.

Parameters:

Name Type Description Default
func Callable

The function to wrap with the batch_size check

required

Returns:

Type Description
Callable

The function passing the batch_size check

Source code in arkindex_worker/utils.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def batch_publication(func: Callable) -> Callable:
    """
    Decorator for functions that should raise an error when the value passed through the ``batch_size`` parameter is **not** a strictly positive integer.

    :param func: The function to wrap with the ``batch_size`` check
    :return: The function passing the ``batch_size`` check
    """
    signature = inspect.signature(func)

    def wrapper(self, *args, **kwargs):
        bound_func = signature.bind(self, *args, **kwargs)
        bound_func.apply_defaults()
        batch_size = bound_func.arguments.get("batch_size")
        assert (
            batch_size is not None and isinstance(batch_size, int) and batch_size > 0
        ), "batch_size shouldn't be null and should be a strictly positive integer"

        return func(self, *args, **kwargs)

    wrapper.__name__ = func.__name__
    return wrapper

make_batches

make_batches(
    objects: list, singular_name: str, batch_size: int
) -> Generator[list[Any]]

Split an object list in successive batches of maximum size batch_size.

Parameters:

Name Type Description Default
objects list

The object list to divide in batches of batch_size size

required
singular_name str

The singular form of the noun associated with the object list

required
batch_size int

The maximum size of each batch to split the object list

required

Returns:

Type Description
Generator[list[Any]]

A generator of successive batches containing batch_size items from objects

Source code in arkindex_worker/utils.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def make_batches(
    objects: list, singular_name: str, batch_size: int
) -> Generator[list[Any]]:
    """Split an object list in successive batches of maximum size ``batch_size``.

    :param objects: The object list to divide in batches of ``batch_size`` size
    :param singular_name: The singular form of the noun associated with the object list
    :param batch_size: The maximum size of each batch to split the object list
    :return: A generator of successive batches containing ``batch_size`` items from ``objects``
    """
    count = len(objects)
    logger.info(
        f"Creating batches of size {batch_size} to process {count} {pluralize(singular_name, count)}"
    )

    index = 1
    iterator = iter(objects)
    while batch := list(islice(iterator, batch_size)):
        count = len(batch)
        logger.info(
            f"Processing batch {index} containing {count} {pluralize(singular_name, count)}..."
        )

        yield batch

        index += 1