Skip to content

Task

arkindex_worker.worker.task

BaseWorker methods for tasks.

Classes

TaskMixin

Functions
list_artifacts
list_artifacts(task_id: uuid.UUID) -> Iterator[Artifact]

List artifacts associated to a task.

Parameters:

Name Type Description Default
task_id UUID

Task ID to find artifacts from.

required

Returns:

Type Description
Iterator[Artifact]

An iterator of Artifact objects built from the ListArtifacts API endpoint.

Source code in arkindex_worker/worker/task.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def list_artifacts(self, task_id: uuid.UUID) -> Iterator[Artifact]:
    """
    List artifacts associated to a task.

    :param task_id: Task ID to find artifacts from.
    :returns: An iterator of ``Artifact`` objects built from the ``ListArtifacts`` API endpoint.
    """
    assert task_id and isinstance(
        task_id, uuid.UUID
    ), "task_id shouldn't be null and should be an UUID"

    results = self.api_client.request("ListArtifacts", id=task_id)

    return map(Artifact, results)
download_artifact
download_artifact(
    task_id: uuid.UUID, artifact: Artifact
) -> DownloadedFile

Download an artifact content.

Parameters:

Name Type Description Default
task_id UUID

Task ID the Artifact is from.

required
artifact Artifact

Artifact to download content from.

required

Returns:

Type Description
DownloadedFile

A temporary file containing the Artifact downloaded from the DownloadArtifact API endpoint.

Source code in arkindex_worker/worker/task.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def download_artifact(
    self, task_id: uuid.UUID, artifact: Artifact
) -> DownloadedFile:
    """
    Download an artifact content.

    :param task_id: Task ID the Artifact is from.
    :param artifact: Artifact to download content from.
    :returns: A temporary file containing the ``Artifact`` downloaded from the ``DownloadArtifact`` API endpoint.
    """
    assert task_id and isinstance(
        task_id, uuid.UUID
    ), "task_id shouldn't be null and should be an UUID"
    assert artifact and isinstance(
        artifact, Artifact
    ), "artifact shouldn't be null and should be an Artifact"

    return self.api_client.request(
        "DownloadArtifact", id=task_id, path=artifact.path
    )