Skip to content

Git & Gitlab support

arkindex_worker.git

Helper classes for workers that interact with Git repositories and the GitLab API.

Classes

GitlabHelper

GitlabHelper(
    project_id,
    gitlab_url,
    gitlab_token,
    branch,
    rebase_wait_period=1,
    delete_source_branch=True,
    max_rebase_tries=10,
)

Helper class to save files to GitLab repository

Parameters:

Name Type Description Default
project_id str

the id of the gitlab project

required
gitlab_url str

gitlab server url

required
gitlab_token str

gitlab private token of user with permission to accept merge requests

required
branch str

name of the branch to where the exported branch will be merged

required
rebase_wait_period Optional[int]

seconds to wait between each poll to check whether rebase has finished

1
delete_source_branch Optional[bool]

should delete the source branch after merging?

True
max_rebase_tries Optional[int]

max number of tries to rebase when merging before giving up

10
Source code in arkindex_worker/git.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    project_id: str,
    gitlab_url: str,
    gitlab_token: str,
    branch: str,
    rebase_wait_period: Optional[int] = 1,
    delete_source_branch: Optional[bool] = True,
    max_rebase_tries: Optional[int] = 10,
):
    """
    :param project_id: the id of the gitlab project
    :param gitlab_url: gitlab server url
    :param gitlab_token: gitlab private token of user with permission to accept merge requests
    :param branch: name of the branch to where the exported branch will be merged
    :param rebase_wait_period: seconds to wait between each poll to check whether rebase has finished
    :param delete_source_branch: should delete the source branch after merging?
    :param max_rebase_tries: max number of tries to rebase when merging before giving up
    """
    self.project_id = project_id
    self.gitlab_url = gitlab_url
    self.gitlab_token = str(gitlab_token).strip()
    self.branch = branch
    self.rebase_wait_period = rebase_wait_period
    self.delete_source_branch = delete_source_branch
    self.max_rebase_tries = max_rebase_tries

    logger.info("Creating a Gitlab client")
    self._api = gitlab.Gitlab(self.gitlab_url, private_token=self.gitlab_token)
    self.project = self._api.projects.get(self.project_id)
    self.is_rebase_finished = False
Functions
merge
merge(branch_name, title)

Create a merge request and try to merge. Always rebase first to avoid conflicts from MRs made in parallel

Parameters:

Name Type Description Default
branch_name str

Source branch name

required
title str

Title of the merge request

required

Returns:

Type Description
bool

Whether the branch was successfully merged

Source code in arkindex_worker/git.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def merge(self, branch_name: str, title: str) -> bool:
    """
    Create a merge request and try to merge.
    Always rebase first to avoid conflicts from MRs made in parallel
    :param branch_name: Source branch name
    :param title: Title of the merge request
    :return: Whether the branch was successfully merged
    """
    mr = None
    # always rebase first, because other workers might have merged already
    for i in range(self.max_rebase_tries):
        logger.info(f"Trying to merge, try nr: {i}")
        try:
            if mr is None:
                mr = self._create_merge_request(branch_name, title)

            mr.rebase()
            rebase_success = self._wait_for_rebase_to_finish(mr.iid)
            if not rebase_success:
                logger.error("Rebase failed, won't be able to merge!")
                return False

            mr.merge(should_remove_source_branch=self.delete_source_branch)
            logger.info("Merge successful")
            return True
        except gitlab.GitlabMRClosedError as e:
            if e.response_code == MR_HAS_CONFLICTS_ERROR_CODE:
                logger.info("Merge failed, trying to rebase and merge again.")
                continue
            else:
                logger.error(f"Merge was not successful: {e}")
                return False
        except gitlab.GitlabError as e:
            logger.error(f"Gitlab error: {e}")
            if 400 <= e.response_code < 500:
                # 4XX errors shouldn't be fixed by retrying
                raise e
        except requests.exceptions.ConnectionError as e:
            logger.error(f"Server connection error, will wait and retry: {e}")
            time.sleep(self.rebase_wait_period)

    return False

GitHelper

GitHelper(
    repo_url,
    git_dir,
    export_path,
    workflow_id,
    gitlab_helper,
    git_clone_wait_period=1,
)

A helper class for running git commands

At the beginning of the workflow call run_clone_in_background. When all the files are ready to be added to git then call save_files to move the files in to the git repository and try to push them.

Examples

in worker.configure() configure the git helper and start the cloning:

gitlab = GitlabHelper(...)
prepare_git_key(...)
self.git_helper = GitHelper(workflow_id=workflow_id, gitlab_helper=gitlab, ...)
self.git_helper.run_clone_in_background()

at the end of the workflow (at the end of worker.run()) push the files to git:

self.git_helper.save_files(self.out_dir)

Parameters:

Name Type Description Default
repo_url

the url of the git repository where the export will be pushed

required
git_dir

the directory where to clone the git repository

required
export_path

the path inside the git repository where to put the exported files

required
workflow_id

the process id to see the workflow graph in the frontend

required
gitlab_helper GitlabHelper

helper for gitlab

required
git_clone_wait_period

check if clone has finished every N seconds at the end of the workflow

1
Source code in arkindex_worker/git.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def __init__(
    self,
    repo_url,
    git_dir,
    export_path,
    workflow_id,
    gitlab_helper: GitlabHelper,
    git_clone_wait_period=1,
):
    """

    :param repo_url: the url of the git repository where the export will be pushed
    :param git_dir: the directory where to clone the git repository
    :param export_path: the path inside the git repository where to put the exported files
    :param workflow_id: the process id to see the workflow graph in the frontend
    :param gitlab_helper: helper for gitlab
    :param git_clone_wait_period: check if clone has finished every N seconds at the end of the workflow
    """
    logger.info("Creating git helper")
    self.repo_url = repo_url
    self.git_dir = Path(git_dir)
    self.export_path = self.git_dir / export_path
    self.workflow_id = workflow_id
    self.gitlab_helper = gitlab_helper
    self.git_clone_wait_period = git_clone_wait_period
    self.is_clone_finished = False
    self.cmd = None
    self.success = None
    self.exit_code = None

    self.git_dir.mkdir(parents=True, exist_ok=True)
    # run git commands outside of the repository (no need to change dir)
    self._git = sh.git.bake("-C", self.git_dir)
Functions
run_clone_in_background
run_clone_in_background()

Clones the git repository in the background in to the self.git_dir directory.

self.is_clone_finished can be used to know whether the cloning has finished or not.

Source code in arkindex_worker/git.py
286
287
288
289
290
291
292
293
294
295
296
297
298
def run_clone_in_background(self):
    """
    Clones the git repository in the background in to the self.git_dir directory.

    `self.is_clone_finished` can be used to know whether the cloning has finished
    or not.
    """
    logger.info(f"Starting clone {self.repo_url} in background")
    cmd = sh.git.clone(
        self.repo_url, self.git_dir, _bg=True, _done=self._clone_done
    )
    logger.info(f"Continuing clone {self.repo_url} in background")
    return cmd
save_files
save_files(export_out_dir)

Move files in export_out_dir to the cloned git repository and try to merge the created files if possible.

Parameters:

Name Type Description Default
export_out_dir Path

Path to the files to be saved

required

Raises:

Type Description
sh.ErrorReturnCode

description

Exception

description

Source code in arkindex_worker/git.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def save_files(self, export_out_dir: Path):
    """
    Move files in export_out_dir to the cloned git repository
    and try to merge the created files if possible.
    :param export_out_dir: Path to the files to be saved
    :raises sh.ErrorReturnCode: _description_
    :raises Exception: _description_
    """
    self._wait_for_clone_to_finish()

    # move exported files to git directory
    file_count = self._move_files_to_git(export_out_dir)

    # use timestamp to avoid branch name conflicts with multiple chunks
    current_timestamp = datetime.isoformat(datetime.now())
    # ":" is not allowed in a branch name
    branch_timestamp = current_timestamp.replace(":", ".")
    # add files to a new branch
    branch_name = f"workflow_{self.workflow_id}_{branch_timestamp}"
    self._git.checkout("-b", branch_name)
    self._git.add("-A")
    try:
        self._git.commit(
            "-m",
            f"Exported files from workflow: {self.workflow_id} at {current_timestamp}",
        )
    except sh.ErrorReturnCode as e:
        if NOTHING_TO_COMMIT_MSG in str(e.stdout):
            logger.warning("Nothing to commit (no changes)")
            return
        else:
            logger.error(f"Commit failed:: {e}")
            raise e

    # count the number of lines in the output
    wc_cmd_out = str(
        sh.wc(self._git.show("--stat", "--name-status", "--oneline", "HEAD"), "-l")
    )
    # -1 because the of the git command header
    files_committed = int(wc_cmd_out.strip()) - 1
    logger.info(f"Committed {files_committed} files")
    if file_count != files_committed:
        logger.warning(
            f"Of {file_count} added files only {files_committed} were committed"
        )

    self._git.push("-u", "origin", "HEAD")

    if self.gitlab_helper:
        try:
            self.gitlab_helper.merge(branch_name, f"Merge {branch_name}")
        except Exception as e:
            logger.error(f"Merge failed: {e}")
            raise e
    else:
        logger.info(
            "No gitlab_helper defined, not trying to merge the pushed branch"
        )

Functions

make_backup

make_backup(path)

Create a backup file in the same directory with timestamp as suffix “.bak_{timestamp}”

Parameters:

Name Type Description Default
path str

Path to the file to be backed up

required
Source code in arkindex_worker/git.py
154
155
156
157
158
159
160
161
162
163
164
165
166
def make_backup(path: str):
    """
    Create a backup file in the same directory with timestamp as suffix ".bak_{timestamp}"
    :param path: Path to the file to be backed up
    """
    path = Path(path)
    if not path.exists():
        raise ValueError(f"No file to backup! File not found: {path}")
    # timestamp with milliseconds
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")[:-3]
    backup_path = Path(str(path) + f".bak_{timestamp}")
    shutil.copy(path, backup_path)
    logger.info(f"Made a backup {backup_path}")

prepare_git_key

prepare_git_key(
    private_key,
    known_hosts,
    private_key_path="~/.ssh/id_ed25519",
    known_hosts_path="~/.ssh/known_hosts",
)

Prepare the git keys (put them in to the correct place) so that git could be used. Fixes some whitespace problems that come from arkindex secrets store (Django admin).

Also creates a backup of the previous keys if they exist, to avoid losing the original keys of the developers.

Parameters:

Name Type Description Default
private_key str

git private key contents

required
known_hosts str

git known_hosts contents

required
private_key_path Optional[str]

path where to put the private key

'~/.ssh/id_ed25519'
known_hosts_path Optional[str]

path where to put the known_hosts

'~/.ssh/known_hosts'
Source code in arkindex_worker/git.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def prepare_git_key(
    private_key: str,
    known_hosts: str,
    private_key_path: Optional[str] = "~/.ssh/id_ed25519",
    known_hosts_path: Optional[str] = "~/.ssh/known_hosts",
):
    """
    Prepare the git keys (put them in to the correct place) so that git could be used.
    Fixes some whitespace problems that come from arkindex secrets store (Django admin).

    Also creates a backup of the previous keys if they exist, to avoid losing the
    original keys of the developers.

    :param private_key: git private key contents
    :param known_hosts: git known_hosts contents
    :param private_key_path: path where to put the private key
    :param known_hosts_path: path where to put the known_hosts
    """
    # secrets admin UI seems to strip the trailing whitespace
    # but git requires the key file to have a new line at the end
    # for some reason uses CRLF line endings, but git doesn't like that
    private_key = private_key.replace("\r", "") + "\n"
    known_hosts = known_hosts.replace("\r", "") + "\n"

    private_key_path = Path(private_key_path).expanduser()
    known_hosts_path = Path(known_hosts_path).expanduser()

    if private_key_path.exists():
        if private_key_path.read_text() != private_key:
            make_backup(private_key_path)

    if known_hosts_path.exists():
        if known_hosts_path.read_text() != known_hosts:
            make_backup(known_hosts_path)

    private_key_path.write_text(private_key)
    # private key must be private, otherwise git will fail
    # expecting octal for permissions
    private_key_path.chmod(0o600)
    known_hosts_path.write_text(known_hosts)

    logger.info(f"Private key size after: {private_key_path.stat().st_size}")
    logger.info(f"Known size after: {known_hosts_path.stat().st_size}")