Skip to content

Utils

utils_bids

create_datasource(output_query, data_dir, subjects=None, sessions=None, acquisitions=None, derivative=None, name='bids_datasource', extra_derivatives=None)

Create a datasource node that have iterables following BIDS format. By default, from a BIDSLayout, lists all the subjects (), finds their session numbers (, if any) and their acquisition type (, if any), and builds an iterable of tuples (sub, ses, acq) with all valid combinations.

If a list of subjects/sessions/acquisitions is provided, the BIDSLayout is not queried and the provided subjects/sessions/acquisitions are used as is.

If derivative is not None, the BIDSLayout will be queried for the specified derivative.

For example, if provided with subjects=["sub01", "sub02"], sessions=["01"], acq=["haste", "tru"], the datagrabber will attempt at retrieving all of the following combinations:

[("sub01", "01", "haste"), ("sub01", "01","tru"),
 ("sub02", "01", "haste"), ("sub02", "01","tru")]

Parameters:

Name Type Description Default
output_query dict

A dictionary specifying the output query for the BIDSDataGrabber.

required
data_dir str

The base directory of the BIDS dataset.

required
subjects list

List of subject IDs to include. If None, all subjects in the dataset are included.

None
sessions list

List of session IDs to include. If None, all sessions for each subject are included.

None
acquisitions list

List of acquisition types to include. If None, all acquisitions for each subject/session are included.

None
derivative str

The name of the derivative to query. If None, no derivative is queried.

None
name str

Name for the datasource node. Defaults to "bids_datasource".

'bids_datasource'
extra_derivatives list or str

Additional derivatives to include. If provided, these will be added to the BIDSDataGrabber.

None

Returns: pe.Node: A configured BIDSDataGrabber node that retrieves data according to the specified parameters.

Source code in fetpype/utils/utils_bids.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def create_datasource(
    output_query,
    data_dir,
    subjects=None,
    sessions=None,
    acquisitions=None,
    derivative=None,
    name="bids_datasource",
    extra_derivatives=None,
):
    """Create a datasource node that have iterables following BIDS format.
    By default, from a BIDSLayout, lists all the subjects (<sub>),
    finds their session numbers (<ses>, if any) and their acquisition
    type (<acq>, if any), and builds an iterable of tuples
    (sub, ses, acq) with all valid combinations.

    If a list of subjects/sessions/acquisitions is provided, the
    BIDSLayout is not queried and the provided
    subjects/sessions/acquisitions are used as is.

    If derivative is not None, the BIDSLayout will be queried for
    the specified derivative.

    For example, if provided with subjects=["sub01", "sub02"],
    sessions=["01"], acq=["haste", "tru"], the datagrabber will
    attempt at retrieving all of the following combinations:
    ```
    [("sub01", "01", "haste"), ("sub01", "01","tru"),
     ("sub02", "01", "haste"), ("sub02", "01","tru")]
    ```

    Args:
        output_query (dict): A dictionary specifying the output query
            for the BIDSDataGrabber.
        data_dir (str): The base directory of the BIDS dataset.
        subjects (list, optional): List of subject IDs to include.
            If None, all subjects in the dataset are included.
        sessions (list, optional): List of session IDs to include.
            If None, all sessions for each subject are included.
        acquisitions (list, optional): List of acquisition types to include.
            If None, all acquisitions for each subject/session are included.
        derivative (str, optional): The name of the derivative to query.
            If None, no derivative is queried.
        name (str, optional): Name for the datasource node. Defaults to
                            "bids_datasource".
        extra_derivatives (list or str, optional): Additional
            derivatives to include. If provided, these will be
            added to the BIDSDataGrabber.
    Returns:
        pe.Node: A configured BIDSDataGrabber node that retrieves data
        according to the specified parameters.
    """

    bids_datasource = pe.Node(
        interface=nio.BIDSDataGrabber(),
        name=name,
        synchronize=True,
    )

    bids_datasource.inputs.base_dir = data_dir
    if extra_derivatives is not None:
        bids_datasource.inputs.index_derivatives = True
        if isinstance(extra_derivatives, str):
            extra_derivatives = [extra_derivatives]
        bids_datasource.inputs.extra_derivatives = extra_derivatives
    bids_datasource.inputs.output_query = output_query

    layout = BIDSLayout(data_dir, validate=False)
    # Verbose
    print("BIDS layout:", layout)
    print("\t", layout.get_subjects())
    print("\t", layout.get_sessions())
    iterables = [("subject", "session", "acquisition"), []]

    existing_sub = layout.get_subjects()
    if subjects is None:
        subjects = existing_sub

    for sub in subjects:
        if sub not in existing_sub:
            print(f"Subject {sub} was not found.")

        existing_ses = layout.get_sessions(subject=sub)
        if sessions is None:
            sessions_subj = existing_ses
        else:
            sessions_subj = sessions
        # If no sessions are found, it is possible that there is no session.
        sessions_subj = [None] if len(sessions_subj) == 0 else sessions_subj
        for ses in sessions_subj:
            if ses is not None and ses not in existing_ses:
                print(
                    f"WARNING: Session {ses} was not found for subject {sub}."
                )
            existing_acq = layout.get_acquisition(subject=sub, session=ses)
            if acquisitions is None:
                acquisitions_subj = existing_acq
            else:
                acquisitions_subj = acquisitions
            # If there is no acquisition found, maybe the acquisition
            # tag was not specified.
            acquisitions_subj = (
                [None] if len(acquisitions_subj) == 0 else acquisitions_subj
            )
            for acq in acquisitions_subj:
                if acq is not None and acq not in existing_acq:
                    print(
                        f"WARNING: Acquisition {acq} was not found for "
                        f"subject {sub} session {ses}."
                    )

                iterables[1] += [(sub, ses, acq)]

    bids_datasource.iterables = iterables
    return bids_datasource

create_bids_datasink(out_dir, pipeline_name, strip_dir, datatype='anat', name=None, rec_label=None, seg_label=None, desc_label=None, custom_subs=None, custom_regex_subs=None)

Creates a BIDS-compatible datasink using parameterization and regex substitutions. Organizes outputs into: /derivatives//sub-/[ses-/] /

Parameters:

Name Type Description Default
out_dir str

Base output directory (e.g., /path/to/project/derivatives)

required
pipeline_name str

Name of the pipeline (e.g., 'nesvor_bounti', 'preprocessing')

required
strip_dir str

Absolute path to the Nipype working directory base to strip (e.g., /path/to/nipype/workdir).

required
datatype str

BIDS datatype ('anat', etc.). Defaults to "anat".

'anat'
name str

Name for the datasink node. Defaults to None, which will use the pipeline name.

None
rec_label str

Reconstruction label (e.g., 'nesvor') for rec-... entity. Defaults to None.

None
seg_label str

Segmentation label (e.g., 'bounti ') for seg-... entity. Defaults to None.

None
desc_label str

Description label for desc-... entity

None
custom_subs list

List of custom simple substitutions to apply to output paths. Defaults to None.

None
custom_regex_subs list

List of custom regex substitutions to apply to output paths. Defaults to None.

None

Returns:

Name Type Description
datasink Node

A Nipype DataSink node configured for

BIDS-compatible output.

Source code in fetpype/utils/utils_bids.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def create_bids_datasink(
    out_dir,
    pipeline_name,
    strip_dir,
    datatype="anat",
    name=None,
    rec_label=None,
    seg_label=None,
    desc_label=None,
    custom_subs=None,
    custom_regex_subs=None,
):
    """
    Creates a BIDS-compatible datasink using parameterization and
    regex substitutions.
    Organizes outputs into:
    <out_dir>/derivatives/<pipeline_name>/sub-<ID>/[ses-<ID>/]
    <datatype>/<BIDS_filename>

    Args:
        out_dir (str): Base output directory
                        (e.g., /path/to/project/derivatives)
        pipeline_name (str): Name of the pipeline (e.g., 'nesvor_bounti',
                            'preprocessing')
        strip_dir (str): Absolute path to the Nipype working directory
            base to strip (e.g., /path/to/nipype/workdir).
        datatype (str, optional): BIDS datatype ('anat', etc.).
            Defaults to "anat".
        name (str, optional): Name for the datasink node.
            Defaults to None, which will use the pipeline name.
        rec_label (str, optional): Reconstruction label (e.g., 'nesvor')
            for rec-... entity. Defaults to None.
        seg_label (str, optional): Segmentation label (e.g., 'bounti
            ') for seg-... entity. Defaults to None.
        desc_label (str, optional): Description label for desc-... entity
        custom_subs (list, optional): List of custom simple substitutions
            to apply to output paths. Defaults to None.
        custom_regex_subs (list, optional): List of custom regex
            substitutions to apply to output paths. Defaults to None.

    Returns:
        datasink (nipype.Node): A Nipype DataSink node configured for
        BIDS-compatible output.

    """
    if not strip_dir:
        raise ValueError(
            "`strip_dir` (Nipype work dir base path) is required."
        )
    if name is None:
        name = f"{pipeline_name}_datasink"

    datasink = pe.Node(
        nio.DataSink(
            base_directory=out_dir, parameterization=True, strip_dir=strip_dir
        ),
        name=name,
    )

    regex_subs = []

    bids_derivatives_root = out_dir  # we already pass the bids derivatives
    escaped_bids_derivatives_root = re.escape(out_dir)

    if pipeline_name == "preprocessing":
        # ** Rule 1: Preprocessing Stacks (Denoised) **
        if desc_label == "denoised":
            regex_subs.append(
                (
                    (
                        rf"^{escaped_bids_derivatives_root}/"
                        rf".*?_?session_([^/]+)"
                        rf"_subject_([^/]+).*?/?_denoising.*/"
                        rf"(sub-[^_]+_ses-[^_]+(?:_run-\d+))?"
                        rf"_T2w_noise_corrected(\.nii\.gz|\.nii)$"
                    ),
                    (
                        rf"{bids_derivatives_root}/sub-\2/ses-\1/{datatype}"
                        rf"/\3_desc-denoised_T2w\4"
                    ),
                )
            )
        # ** Rule 2: Preprocessing Masks (Cropped) **
        if desc_label == "cropped":
            regex_subs.append(
                (
                    (
                        rf"^{escaped_bids_derivatives_root}/.*?_session_"
                        rf"([^/]+)_subject_([^/]+).*/?_cropping.*/"
                        rf"(sub-[^_]+_ses-[^_]+(?:_run-\d+)?)_"
                        rf"mask(\.nii\.gz|\.nii)$"
                    ),
                    (
                        rf"{bids_derivatives_root}/sub-\2/ses-\1/{datatype}"
                        rf"/\3_desc-cropped_mask\4"
                    ),
                )
            )

    # ** Rule 3: Reconstruction Output **
    if rec_label and not seg_label and pipeline_name != "preprocessing":
        regex_subs.append(
            (
                (
                    rf"^{escaped_bids_derivatives_root}/.*?_?session_([^/]+)"
                    rf"_subject_([^/]+).*/(?:[^/]+)(\.nii\.gz|\.nii)$"
                ),
                # Groups: \1=SESS, \2=SUBJ, \3=ext
                (
                    rf"{bids_derivatives_root}/sub-\2/ses-\1/"
                    rf"{datatype}/sub-\2_ses-\1_rec-{rec_label}_T2w\3"
                ),
            )
        )

    # ** Rule 4: Segmentation Output **
    if seg_label and rec_label and pipeline_name != "preprocessing":
        regex_subs.append(
            (
                (
                    rf"^{escaped_bids_derivatives_root}/"
                    rf".*?_?session_([^/]+)_subject_([^/]+).*/"
                    rf"input_srr-mask-brain_bounti-19(\.nii\.gz|\.nii)$"
                ),
                # Groups: \1=SESS, \2=SUBJ, \3=ext
                (
                    rf"{bids_derivatives_root}/sub-\2/ses-\1/{datatype}/"
                    rf"sub-\2_ses-\1_rec-{rec_label}_seg-{seg_label}_dseg\3"
                ),
            )
        )

    # Add more specific rules here if other file types need handling
    regex_subs.extend(
        [
            (r"sub-sub-", r"sub-"),  # Fix doubled sub prefix
            (r"ses-ses-", r"ses-"),  # Fix doubled ses prefix (just in case)
            (r"_+", "_"),  # Replace multiple underscores with single
            (r"(/)_", r"\1"),  # Remove underscore after slash if present
            (r"(_)\.", r"\."),  # Remove underscore before dot if present
            (r"-+", "-"),  # Replace multiple hyphens with single
            (r"//+", "/"),  # Fix double slashes
            (r"[\\/]$", ""),  # Remove trailing slash
            (
                r"_ses-None",
                "",
            ),  # Remove ses-None if session was optional/missing
            (
                r"(\.nii\.gz)\1+$",
                r"\1",
            ),  # Fix repeated extensions like .nii.gz.nii.gz
            (r"(\.nii)\1+$", r"\1"),  # Fix repeated extensions like .nii.nii
        ]
    )

    # Add custom regex substitutions
    if custom_regex_subs:
        regex_subs.extend(custom_regex_subs)

    datasink.inputs.regexp_substitutions = regex_subs

    # Add custom simple substitutions
    final_subs = []
    if custom_subs:
        final_subs.extend(custom_subs)
    datasink.inputs.substitutions = final_subs

    return datasink

create_datasink(iterables, name='output', params_subs={}, params_regex_subs={})

Deprecated. Creates a data sink node for reformatting and organizing relevant outputs.

From: https://github.com/Macatools/macapype (adapted)

Parameters:

Name Type Description Default
iterables list or tuple

A collection of iterables, containing subject and session information.

required
name str

The name for the data sink container. Defaults to "output".

'output'
params_subs dict

A dictionary of parameter substitutions to apply to output paths. Defaults to an empty dictionary.

{}
params_regex_subs dict

A dictionary of regular expression-based substitutions to apply to output paths. Defaults to an empty dictionary.

{}

Returns:

Type Description

pe.Node: A Pipeline Engine Node representing the configured datasink.

Source code in fetpype/utils/utils_bids.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def create_datasink(
    iterables, name="output", params_subs={}, params_regex_subs={}
):
    """
    Deprecated. Creates a data sink node for reformatting and organizing
    relevant outputs.

    From: https://github.com/Macatools/macapype (adapted)

    Args:
        iterables (list or tuple): A collection of iterables, containing
                                   subject and session information.
        name (str, optional): The name for the data sink container.
                              Defaults to "output".
        params_subs (dict, optional): A dictionary of parameter substitutions
                                      to apply to output paths. Defaults to
                                      an empty dictionary.
        params_regex_subs (dict, optional): A dictionary of regular
                                            expression-based substitutions
                                            to apply to output paths.
                                            Defaults to an empty dictionary.

    Returns:
        pe.Node: A Pipeline Engine Node representing the configured datasink.
    """

    print("Datasink name: ", name)

    # Create the datasink node
    datasink = pe.Node(nio.DataSink(), name=name)

    # Generate subject folders with session and subject information
    subjFolders = [
        (
            "_acquisition_%s_session_%s_subject_%s" % (acq, ses, sub),
            "sub-%s/ses-%s/anat" % (sub, ses),
        )
        for (sub, ses, acq) in iterables[1]  # doublecheck
    ]

    print("subjFolders: ", subjFolders)

    # Load parameter substitutions from the 'subs.json' file
    json_subs = op.join(op.dirname(op.abspath(__file__)), "subs.json")
    dict_subs = json.load(open(json_subs, encoding="utf-8"))
    dict_subs.update(params_subs)  # Override with any provided substitutions

    subs = [(key, value) for key, value in dict_subs.items()]
    subjFolders.extend(subs)  # Add parameter substitutions to folders

    datasink.inputs.substitutions = subjFolders

    # Load regex-based substitutions from the 'regex_subs.json' file
    json_regex_subs = op.join(
        op.dirname(op.abspath(__file__)), "regex_subs.json"
    )
    dict_regex_subs = json.load(open(json_regex_subs, encoding="utf-8"))

    # Update with provided regex substitutions
    dict_regex_subs.update(params_regex_subs)

    regex_subs = [(key, value) for key, value in dict_regex_subs.items()]
    datasink.inputs.regexp_substitutions = regex_subs

    return datasink

get_gestational_age(bids_dir, T2)

Retrieve the gestational age for a specific subject from a BIDS dataset.

Parameters:

Name Type Description Default
bids_dir

The file path to the root of the BIDS dataset, which must contain a 'participants.tsv' file.

required
T2

The path of the image. We can get the subject id from there if it follows a BIDS format.

required

Returns: gestational_age : The gestational age of the subject.

Source code in fetpype/utils/utils_bids.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def get_gestational_age(bids_dir, T2):
    """
    Retrieve the gestational age for a specific subject from a BIDS dataset.

    Args:
        bids_dir : The file path to the root of the BIDS dataset,
            which must contain a 'participants.tsv' file.
        T2 : The path of the image. We can get the subject id from there if
            it follows a BIDS format.
    Returns:
        gestational_age : The gestational age of the subject.

    """
    import pandas as pd
    import os

    participants_path = f"{bids_dir}/participants.tsv"

    try:
        df = pd.read_csv(participants_path, delimiter="\t")
    except FileNotFoundError:
        raise FileNotFoundError(f"participants.tsv not found in {bids_dir}")

    # TODO This T2[0] not really clean
    subject_id = os.path.basename(T2).split("_")[0]
    try:
        gestational_age = df.loc[
            df["participant_id"] == f"{subject_id}", "gestational_age"
        ].values[0]
    except KeyError:
        raise KeyError(
            "Column 'gestational_age' not found in participants.tsv"
        )
    except IndexError:
        raise IndexError(
            f"Subject sub-{subject_id} not found in participants.tsv"
        )

    return gestational_age

create_description_file(out_dir, algo, prev_desc=None, cfg=None)

Create a dataset_description.json file in the derivatives folder. TODO: should look for the extra parameters and also add them

Parameters:

Name Type Description Default
args

Dictionary containing the arguments passed to the script.

required
container_type

Type of container used to run the algorithm.

required
Source code in fetpype/utils/utils_bids.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def create_description_file(out_dir, algo, prev_desc=None, cfg=None):
    """Create a dataset_description.json file in the derivatives folder.
    TODO: should look for the extra parameters and also add them

    Args:
        args : Dictionary containing the arguments passed to the script.
        container_type : Type of container used to run the algorithm.
    """
    if not os.path.exists(os.path.join(out_dir, "dataset_description.json")):
        description = {
            "Name": algo,
            "Version": "1.0",
            "BIDSVersion": "1.7.0",
            "PipelineDescription": {
                "Name": algo,
            },
            "GeneratedBy": [
                {
                    "Name": algo,
                }
            ],
        }

        if prev_desc is not None:
            with open(prev_desc, "r") as f:
                prev_desc = json.load(f)
                description["GeneratedBy"].append({"Name": prev_desc["Name"]})
        if cfg is not None:
            description["Config"] = OmegaConf.to_container(cfg, resolve=True)
        with open(
            os.path.join(
                out_dir,
                "dataset_description.json",
            ),
            "w",
            encoding="utf-8",
        ) as outfile:
            json.dump(description, outfile, indent=4)

utils_docker

flatten_cfg(cfg, base='')

Flatten a nested configuration dictionary into a flat dictionary with keys as paths and values as the corresponding values. Args: cfg (dict): The configuration dictionary to flatten. base (str): The base path to prepend to the keys. Returns: generator: A generator that yields tuples of (path, value).

Source code in fetpype/utils/utils_docker.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def flatten_cfg(cfg, base=""):
    """
    Flatten a nested configuration dictionary into a flat dictionary
    with keys as paths and values as the corresponding values.
    Args:
        cfg (dict): The configuration dictionary to flatten.
        base (str): The base path to prepend to the keys.
    Returns:
        generator: A generator that yields tuples of (path, value).
    """
    for k, v in cfg.items():
        if isinstance(v, dict):
            yield from flatten_cfg(v, "/".join([base, k]))
        else:
            yield ("/".join([base, k]), v)

is_available_container(container_type, container_name)

Check if the container is available on the system. Args: container_type (str): The type of container, either 'docker' or 'singularity' container_name (str): The name of the container to check. Returns: bool: True if the container is available, False otherwise.

Source code in fetpype/utils/utils_docker.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def is_available_container(container_type, container_name):
    """
    Check if the container is available on the system.
    Args:
        container_type (str):   The type of container, either 'docker'
                                or 'singularity'
        container_name (str): The name of the container to check.
    Returns:
        bool: True if the container is available, False otherwise.
    """
    if container_type == "docker":
        try:
            subprocess.run(
                [container_type, "inspect", container_name],
                check=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
        except subprocess.CalledProcessError:
            return False
        else:
            return True
    elif container_type == "singularity":
        if os.path.isfile(container_name):
            return True
        else:
            return False
    else:
        raise ValueError(
            f"Container type {container_type} not supported. "
            "Please use 'docker' or 'singularity'."
        )

retrieve_container(container_type, container_name)

Retrieve the container from the registry.

Args: container_type (str): The type of container, either 'docker' or 'singularity' container_name (str): The name of the container to retrieve.

Source code in fetpype/utils/utils_docker.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def retrieve_container(container_type, container_name):
    """
        Retrieve the container from the registry.
    Args:
        container_type (str):   The type of container, either 'docker' or
                                'singularity'
        container_name (str): The name of the container to retrieve.

    """
    if container_type == "docker":

        cmd = [container_type, "pull", container_name]
        print(f"Running {' '.join(cmd)}")
        process = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )
        for line in process.stdout:
            print(line, end="")

        process.wait()
        if process.returncode != 0:
            for line in process.stderr:
                print(line, end="")
            raise subprocess.CalledProcessError(
                process.returncode,
                cmd,
                output=process.stdout.read(),
                stderr=process.stderr.read(),
            )
    elif container_type == "singularity":
        raise NotImplementedError
    else:
        raise ValueError(
            f"Container type {container_type} not supported. "
            "Please use 'docker' or 'singularity'."
        )

check_container_commands(container_type, cfg)

Check if the required docker or singularity images are available on the system.

Parameters:

Name Type Description Default
container_type str

The type of container, either 'docker' or 'singularity'

required
cfg dict

The configuration dictionary containing the container names.

required
Source code in fetpype/utils/utils_docker.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def check_container_commands(container_type, cfg):
    """
    Check if the required docker or singularity images are available
    on the system.

    Args:
        container_type (str):   The type of container, either 'docker' or
                                'singularity'
        cfg (dict): The configuration dictionary containing the
                    container names.

    """
    # Check if the container_type is valid
    if container_type not in ["docker", "singularity"]:
        raise ValueError(
            f"Container type {container_type} not supported. "
            "Please use 'docker' or 'singularity'."
        )

    # Iterate the nested config dictionary
    cfg_dict = dict(flatten_cfg(cfg))
    container_names = {}
    for k, v in cfg_dict.items():
        # Return the word that is after the <mount> tag in the string v
        if container_type == "docker" and "docker" in k:
            docker_name = v.split("<mount>")[-1].split()[0]
            container_names[k] = docker_name
        elif container_type == "singularity" and "singularity" in k:
            # Find a string that ends with .sif
            if v is None:
                continue
            print("CHECKING PATH IN", k, v)
            singularity_name = [s for s in v.split(" ") if s.endswith(".sif")][
                0
            ]
            container_names[k] = singularity_name

    container_names_list = defaultdict(list)
    for k, v in container_names.items():
        container_names_list[v].append(k)

    # Check which containers are missing
    missing_containers = []
    for k, v in container_names_list.items():
        print(f"Checking {container_type} {k} -- Used by {', '.join(v)}")
        if not is_available_container(container_type, k):
            print("\tSTATUS: NOT FOUND")
            missing_containers.append(k)
        else:
            print("\tSTATUS: AVAILABLE")

    # Retrieve the missing containers
    if len(missing_containers) > 0 and container_type == "docker":
        var = input(
            f"Would you like me to retrieve the missing containers "
            f"{', '.join(missing_containers)}? (y/n) "
        )
        if var == "y":
            for k in missing_containers:
                retrieve_container(container_type, k)

        else:
            print("Exiting...")
            sys.exit(1)
    elif len(missing_containers) > 0 and container_type == "singularity":
        raise NotImplementedError(
            "Automated container retrieval for singularity is "
            "not implemented yet."
        )
        sys.exit(1)