Skip to content

Utils

arkindex_worker.utils

Attributes

CHUNK_SIZE module-attribute

CHUNK_SIZE = 1024

Chunk Size used for ZSTD compression

Functions

parse_source_id

parse_source_id(value: str) -> bool | str | None

Parse a UUID argument (Worker Version, Worker Run, …) to use it directly in the API. Arkindex API filters generally expect False to filter manual sources.

Source code in arkindex_worker/utils.py
16
17
18
19
20
21
22
23
def parse_source_id(value: str) -> bool | str | None:
    """
    Parse a UUID argument (Worker Version, Worker Run, ...) to use it directly in the API.
    Arkindex API filters generally expect `False` to filter manual sources.
    """
    if value == MANUAL_SOURCE:
        return False
    return value or None

decompress_zst_archive

decompress_zst_archive(
    compressed_archive: Path,
) -> tuple[int, Path]

Decompress a ZST-compressed tar archive in data dir. The tar archive is not extracted. This returns the path to the archive and the file descriptor.

Beware of closing the file descriptor explicitly or the main process will keep the memory held even if the file is deleted.

Parameters:

Name Type Description Default
compressed_archive Path

Path to the target ZST-compressed archive

required

Returns:

Type Description
tuple[int, Path]

File descriptor and path to the uncompressed tar archive

Source code in arkindex_worker/utils.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def decompress_zst_archive(compressed_archive: Path) -> tuple[int, Path]:
    """
    Decompress a ZST-compressed tar archive in data dir. The tar archive is not extracted.
    This returns the path to the archive and the file descriptor.

    Beware of closing the file descriptor explicitly or the main
    process will keep the memory held even if the file is deleted.

    :param compressed_archive: Path to the target ZST-compressed archive
    :return: File descriptor and path to the uncompressed tar archive
    """
    dctx = zstandard.ZstdDecompressor()
    archive_fd, archive_path = tempfile.mkstemp(suffix=".tar")
    archive_path = Path(archive_path)

    logger.debug(f"Uncompressing file to {archive_path}")
    try:
        with (
            compressed_archive.open("rb") as compressed,
            archive_path.open("wb") as decompressed,
        ):
            dctx.copy_stream(compressed, decompressed)
        logger.debug(f"Successfully uncompressed archive {compressed_archive}")
    except zstandard.ZstdError as e:
        raise Exception(f"Couldn't uncompressed archive: {e}") from e

    return archive_fd, archive_path

extract_tar_archive

extract_tar_archive(archive_path: Path, destination: Path)

Extract the tar archive’s content to a specific destination

Parameters:

Name Type Description Default
archive_path Path

Path to the archive

required
destination Path

Path where the archive’s data will be extracted

required
Source code in arkindex_worker/utils.py
59
60
61
62
63
64
65
66
67
68
69
70
def extract_tar_archive(archive_path: Path, destination: Path):
    """
    Extract the tar archive's content to a specific destination

    :param archive_path: Path to the archive
    :param destination: Path where the archive's data will be extracted
    """
    try:
        with tarfile.open(archive_path) as tar_archive:
            tar_archive.extractall(destination)
    except tarfile.ReadError as e:
        raise Exception(f"Couldn't handle the decompressed Tar archive: {e}") from e

extract_tar_zst_archive

extract_tar_zst_archive(
    compressed_archive: Path, destination: Path
) -> tuple[int, Path]

Extract a ZST-compressed tar archive’s content to a specific destination

Parameters:

Name Type Description Default
compressed_archive Path

Path to the target ZST-compressed archive

required
destination Path

Path where the archive’s data will be extracted

required

Returns:

Type Description
tuple[int, Path]

File descriptor and path to the uncompressed tar archive

Source code in arkindex_worker/utils.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def extract_tar_zst_archive(
    compressed_archive: Path, destination: Path
) -> tuple[int, Path]:
    """
    Extract a ZST-compressed tar archive's content to a specific destination

    :param compressed_archive: Path to the target ZST-compressed archive
    :param destination: Path where the archive's data will be extracted
    :return: File descriptor and path to the uncompressed tar archive
    """

    archive_fd, archive_path = decompress_zst_archive(compressed_archive)
    extract_tar_archive(archive_path, destination)

    return archive_fd, archive_path

close_delete_file

close_delete_file(file_descriptor: int, file_path: Path)

Close the file descriptor of the file and delete the file

Parameters:

Name Type Description Default
file_descriptor int

File descriptor of the archive

required
file_path Path

Path to the archive

required
Source code in arkindex_worker/utils.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def close_delete_file(file_descriptor: int, file_path: Path):
    """
    Close the file descriptor of the file and delete the file

    :param file_descriptor: File descriptor of the archive
    :param file_path: Path to the archive
    """
    try:
        os.close(file_descriptor)
        file_path.unlink()
    except OSError as e:
        logger.warning(f"Unable to delete file {file_path}: {e}")

zstd_compress

zstd_compress(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]

Compress a file using the Zstandard compression algorithm.

Parameters:

Name Type Description Default
source Path

Path to the file to compress.

required
destination Path | None

Optional path for the created ZSTD archive. A tempfile will be created if this is omitted.

None

Returns:

Type Description
tuple[int | None, Path, str]

The file descriptor (if one was created) and path to the compressed file, hash of its content.

Source code in arkindex_worker/utils.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def zstd_compress(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]:
    """Compress a file using the Zstandard compression algorithm.

    :param source: Path to the file to compress.
    :param destination: Optional path for the created ZSTD archive. A tempfile will be created if this is omitted.
    :return: The file descriptor (if one was created) and path to the compressed file, hash of its content.
    """
    compressor = zstd.ZstdCompressor(level=3)
    archive_hasher = hashlib.md5()

    # Parse destination and create a tmpfile if none was specified
    file_d, destination = (
        tempfile.mkstemp(prefix="teklia-", suffix=".tar.zst")
        if destination is None
        else (None, destination)
    )
    destination = Path(destination)
    logger.debug(f"Compressing file to {destination}")

    try:
        with destination.open("wb") as archive_file, source.open("rb") as model_data:
            for model_chunk in iter(lambda: model_data.read(CHUNK_SIZE), b""):
                compressed_chunk = compressor.compress(model_chunk)
                archive_hasher.update(compressed_chunk)
                archive_file.write(compressed_chunk)
        logger.debug(f"Successfully compressed {source}")
    except zstandard.ZstdError as e:
        raise Exception(f"Couldn't compress archive: {e}") from e
    return file_d, destination, archive_hasher.hexdigest()

create_tar_archive

create_tar_archive(
    path: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]

Create a tar archive using the content at specified location.

Parameters:

Name Type Description Default
path Path

Path to the file to archive

required
destination Path | None

Optional path for the created TAR archive. A tempfile will be created if this is omitted.

None

Returns:

Type Description
tuple[int | None, Path, str]

The file descriptor (if one was created) and path to the TAR archive, hash of its content.

Source code in arkindex_worker/utils.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def create_tar_archive(
    path: Path, destination: Path | None = None
) -> tuple[int | None, Path, str]:
    """Create a tar archive using the content at specified location.

    :param path: Path to the file to archive
    :param destination: Optional path for the created TAR archive. A tempfile will be created if this is omitted.
    :return: The file descriptor (if one was created) and path to the TAR archive, hash of its content.
    """
    # Parse destination and create a tmpfile if none was specified
    file_d, destination = (
        tempfile.mkstemp(prefix="teklia-", suffix=".tar")
        if destination is None
        else (None, destination)
    )
    destination = Path(destination)
    logger.debug(f"Compressing file to {destination}")

    # Create an uncompressed tar archive with all the needed files
    # Files hierarchy ifs kept in the archive.
    files = []
    try:
        logger.debug(f"Compressing files to {destination}")
        with tarfile.open(destination, "w") as tar:
            for p in path.rglob("*"):
                x = p.relative_to(path)
                tar.add(p, arcname=x, recursive=False)
                # Only keep files when computing the hash
                if p.is_file():
                    files.append(p)
        logger.debug(f"Successfully created Tar archive from files @ {path}")
    except tarfile.TarError as e:
        raise Exception(f"Couldn't create Tar archive: {e}") from e

    # Sort by path
    files.sort()

    content_hasher = hashlib.md5()
    # Compute hash of the files
    for file_path in files:
        with file_path.open("rb") as file_data:
            for chunk in iter(lambda: file_data.read(CHUNK_SIZE), b""):
                content_hasher.update(chunk)
    return file_d, destination, content_hasher.hexdigest()

create_tar_zst_archive

create_tar_zst_archive(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str, str]

Helper to create a TAR+ZST archive from a source folder.

Parameters:

Name Type Description Default
source Path

Path to the folder whose content should be archived.

required
destination Path | None

Path to the created archive, defaults to None. If unspecified, a temporary file will be created.

None

Returns:

Type Description
tuple[int | None, Path, str, str]

The file descriptor of the created tempfile (if one was created), path to the archive, its hash and the hash of the tar archive’s content.

Source code in arkindex_worker/utils.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def create_tar_zst_archive(
    source: Path, destination: Path | None = None
) -> tuple[int | None, Path, str, str]:
    """Helper to create a TAR+ZST archive from a source folder.

    :param source: Path to the folder whose content should be archived.
    :param destination: Path to the created archive, defaults to None. If unspecified, a temporary file will be created.
    :return: The file descriptor of the created tempfile (if one was created), path to the archive, its hash and the hash of the tar archive's content.
    """
    # Create tar archive
    tar_fd, tar_archive, tar_hash = create_tar_archive(source)

    zst_fd, zst_archive, zst_hash = zstd_compress(tar_archive, destination)

    close_delete_file(tar_fd, tar_archive)

    return zst_fd, zst_archive, zst_hash, tar_hash