Skip to content

Nodes

dhcp

Nodes that implement the dHCP pipeline for fetal data.

Version from https://github.com/GerardMJuan/dhcp-structural-pipeline , which is a fork of the original dataset https://github.com/BioMedIA/dhcp-structural-pipeline with several fixes and changes

The docker image, where everything works "well", is: https://hub.docker.com/r/gerardmartijuan/dhcp-pipeline-multifact

TODO: specify the changes from one version to another.

dhcp_pipeline(T2, mask, gestational_age, pre_command='', dhcp_image='', threads=1, flag='all')

Run the dhcp segmentation pipeline on a single subject. The script needs to create the output folders and put the mask there so that the docker image can find it and doesn't run bet. TODO: don't do it that convoluted. TODO: Be able to input the number of threads

Flags can be either "-all", "-seg", or "-surf"

Source code in fetpype/nodes/dhcp.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def dhcp_pipeline(
    T2,
    mask,
    gestational_age,
    pre_command="",
    dhcp_image="",
    threads=1,
    flag="all",
):
    """Run the dhcp segmentation pipeline on a single subject.
    The script needs to create the output folders and put the mask
    there so that the docker image can find it and doesn't run bet.
    TODO: don't do it that convoluted.
    TODO: Be able to input the number of threads

    # Flags can be either "-all", "-seg", or "-surf"
    """
    import os
    import shutil

    output_dir = os.path.abspath("dhcp_output")
    os.makedirs(output_dir, exist_ok=True)

    # Basename of the T2 file
    recon_file_name = os.path.basename(T2)

    # Copy T2 to output dir
    shutil.copyfile(T2, os.path.join(output_dir, recon_file_name))

    # Copy mask to output dir with the correct name
    os.makedirs(os.path.join(output_dir, "segmentations"), exist_ok=True)

    # check if mask file exists. If not, create it
    shutil.copyfile(
        mask,
        os.path.join(
            output_dir,
            "segmentations",
            f"{recon_file_name.replace('.nii.gz', '')}_brain_mask.nii.gz",
        ),
    )

    if "docker" in pre_command:
        cmd = pre_command
        cmd += (
            f"-v {output_dir}:/data "
            f"{dhcp_image} "
            f"/data/{recon_file_name} "
            f"{gestational_age} "
            "-data-dir /data "
            f"-t {threads} "
            "-c 0 "
            f"{flag} "
        )

    elif "singularity" in pre_command:
        # Do we need FSL for this pipeline? add in the precommand
        cmd = pre_command + dhcp_image
        cmd += (
            f"/usr/local/src/structural-pipeline/fetal-pipeline.sh "
            f"{T2} "
            f"{gestational_age} "
            f"-data-dir "
            f"{output_dir} "
            f"-t {threads} "
            "-c 0 "
            f"{flag} "
        )

    else:
        raise ValueError(
            "pre_command must either contain docker or singularity."
        )

    print(cmd)
    os.system(cmd)

    # assert if the output files exist
    assert os.path.exists(
        os.path.join(
            output_dir,
            "segmentations",
            f"{recon_file_name.replace('.nii.gz', '')}_all_labels.nii.gz",
        )
    ), "Error, segmentations file does not exist"

    return output_dir

preprocessing

CropStacksAndMasks

Bases: BaseInterface

Interface to crop the field of view of an image and its mask.

This class provides functionality to crop a Nifti image and its corresponding mask to the bounding box defined by the mask. It also allows for adding boundaries around the cropped region.

Parameters:

Name Type Description Default
image str

Input image filename

required
mask input; str

Input mask filename

required
boundary input; int

Padding (in mm) to be set around the cropped image and mask.

required
is_enabled input; bool

Whether cropping and masking are enabled.

required
output_image output; str

Path to the cropped image.

required
output_mask output; str

Path to the cropped mask.

required

Examples:

>>> from fetpype.nodes.preprocessing import CropStacksAndMasks
>>> crop_input = CropStacksAndMasks()
>>> crop_input.inputs.image = 'sub-01_acq-haste_run-1_T2w.nii.gz'
>>> crop_input.inputs.mask = 'sub-01_acq-haste_run-1_T2w_mask.nii.gz'
>>> crop_input.run()
References
  • Michael Ebner's NiftyMIC repository: https://github.com/gift-surg/NiftyMIC
Source code in fetpype/nodes/preprocessing.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class CropStacksAndMasks(BaseInterface):
    """
    Interface to crop the field of view of an image and its mask.

    This class provides functionality to crop a Nifti image
    and its corresponding mask to the bounding box defined by
    the mask. It also allows for adding boundaries around the
    cropped region.

    Args:
        image (str): Input image filename
        mask (input; str): Input mask filename
        boundary (input; int):  Padding (in mm) to be set around
                                the cropped image and mask.
        is_enabled (input; bool): Whether cropping and masking are enabled.
        output_image (output; str): Path to the cropped image.
        output_mask (output; str): Path to the cropped mask.

    Examples:
        >>> from fetpype.nodes.preprocessing import CropStacksAndMasks
        >>> crop_input = CropStacksAndMasks()
        >>> crop_input.inputs.image = 'sub-01_acq-haste_run-1_T2w.nii.gz'
        >>> crop_input.inputs.mask = 'sub-01_acq-haste_run-1_T2w_mask.nii.gz'
        >>> crop_input.run() # doctest: +SKIP

    References:
        - Michael Ebner's NiftyMIC repository:
        https://github.com/gift-surg/NiftyMIC
    """

    input_spec = CropStacksAndMasksInputSpec
    output_spec = CropStacksAndMasksOutputSpec

    def _gen_filename(self, name):
        if name == "output_image":
            return os.path.abspath(os.path.basename(self.inputs.image))
        elif name == "output_mask":
            return os.path.abspath(os.path.basename(self.inputs.mask))
        return None

    def _crop_stack_and_mask(
        self,
        image_path,
        mask_path,
        boundary_i=0,
        boundary_j=0,
        boundary_k=0,
        unit="mm",
    ):
        """
        Crops the input image to the field of view given by the bounding box
        around its mask.

        Args:
            image_path (str): Path to a Nifti image.
            mask_path (str): Path to the corresponding Nifti mask.
            boundary_i (int):   Boundary to add to the bounding box in
                                the i direction.
            boundary_j (int):   Boundary to add to the bounding box in
                                the j direction.
            boundary_k (int):   Boundary to add to the bounding box in
                                the k direction.
            unit (str): The unit defining the dimension size in Nifti.

        Returns:
            image_cropped:  Image cropped to the bounding box of mask_ni,
                            including boundary.
            mask_cropped: Mask cropped to its bounding box.

        Notes:
            Code inspired by Michael Ebner:
            https://github.com/gift-surg/NiftyMIC/blob/master/niftymic/base/stack.py
        """
        print(f"Working on {image_path} and {mask_path}")
        image_ni = ni.load(image_path)
        mask_ni = ni.load(mask_path)

        image = image_ni.get_fdata()
        mask = mask_ni.get_fdata()

        assert all([i >= m] for i, m in zip(image.shape, mask.shape)), (
            "For a correct cropping, the image should be larger "
            "or equal to the mask."
        )

        # Get rectangular region surrounding the masked voxels
        [x_range, y_range, z_range] = self._get_rectangular_masked_region(mask)

        if np.array([x_range, y_range, z_range]).all() is None:
            print("Cropping to bounding box of mask led to an empty image.")
            return None

        if unit == "mm":
            spacing = image_ni.header.get_zooms()
            boundary_i = np.round(boundary_i / float(spacing[0]))
            boundary_j = np.round(boundary_j / float(spacing[1]))
            boundary_k = np.round(boundary_k / float(spacing[2]))

        shape = [min(im, m) for im, m in zip(image.shape, mask.shape)]
        x_range[0] = np.max([0, x_range[0] - boundary_i])
        x_range[1] = np.min([shape[0], x_range[1] + boundary_i])

        y_range[0] = np.max([0, y_range[0] - boundary_j])
        y_range[1] = np.min([shape[1], y_range[1] + boundary_j])

        z_range[0] = np.max([0, z_range[0] - boundary_k])
        z_range[1] = np.min([shape[2], z_range[1] + boundary_k])

        new_origin = list(
            ni.affines.apply_affine(
                image_ni.affine, [x_range[0], y_range[0], z_range[0]]
            )
        ) + [1]

        new_affine = image_ni.affine
        new_affine[:, -1] = new_origin

        image_cropped = image[
            x_range[0] : x_range[1],  # noqa: E203
            y_range[0] : y_range[1],  # noqa: E203
            z_range[0] : z_range[1],  # noqa: E203
        ]
        mask_cropped = mask[
            x_range[0] : x_range[1],  # noqa: E203
            y_range[0] : y_range[1],  # noqa: E203
            z_range[0] : z_range[1],  # noqa: E203
        ]

        image_cropped = ni.Nifti1Image(image_cropped, new_affine)
        mask_cropped = ni.Nifti1Image(mask_cropped, new_affine)
        ni.save(image_cropped, self._gen_filename("output_image"))
        ni.save(mask_cropped, self._gen_filename("output_mask"))

    def _get_rectangular_masked_region(
        self,
        mask: np.ndarray,
    ) -> tuple:
        """
        Computes the bounding box around the given mask.
        Code inspired by Michael Ebner:
        https://github.com/gift-surg/NiftyMIC/blob/master/niftymic/base/stack.py

        Args:
            mask (np.ndarray): Input mask.
            range_x (tuple): Pair defining x interval of mask in voxel space.
            range_y (tuple): Pair defining y interval of mask in voxel space.
            range_z (tuple): Pair defining z interval of mask in voxel space.

        Returns:
            tuple: A tuple containing the bounding box ranges for x, y, and z.

        """
        if np.sum(abs(mask)) == 0:
            return None, None, None
        shape = mask.shape
        # Define the dimensions along which to sum the data
        sum_axis = [(1, 2), (0, 2), (0, 1)]
        range_list = []

        # Non-zero elements of numpy array along the the 3 dimensions
        for i in range(3):
            sum_mask = np.sum(mask, axis=sum_axis[i])
            ran = np.nonzero(sum_mask)[0]

            low = np.max([0, ran[0]])
            high = np.min([shape[0], ran[-1] + 1])
            range_list.append(np.array([low, high]).astype(int))

        return range_list

    def _run_interface(self, runtime):
        if self.inputs.is_enabled:
            boundary = self.inputs.boundary
            self._crop_stack_and_mask(
                self.inputs.image,
                self.inputs.mask,
                boundary_i=boundary,
                boundary_j=boundary,
                boundary_k=boundary,
            )
        else:
            os.system(
                f"cp {self.inputs.image} "
                f"{self._gen_filename('output_image')}"
            )
            os.system(
                f"cp {self.inputs.mask} "
                f"{self._gen_filename('output_mask')}"
            )

    def _list_outputs(self):
        outputs = self._outputs().get()
        outputs["output_image"] = self._gen_filename("output_image")
        outputs["output_mask"] = self._gen_filename("output_mask")
        return outputs

CheckAffineResStacksAndMasks

Bases: BaseInterface

Interface to check that the shape of stacks and masks are consistent. (e.g. no trailing dimensions of size 1) and stack ordering in the last dimension. If enabled, also checks that the resolution, affine, and shape of the stacks and masks are consistent. Discards the stack and mask if they are not.

Args:

stacks (input; list): List of input stacks.
masks (input; list): List of input masks.
is_enabled (input; bool): Whether the check is enabled.
output_stacks (output; list): List of stacks that passed the check.
output_masks (output; list): List of masks that passed the check.

Examples:

>>> from fetpype.nodes.preprocessing import CheckAffineResStacksAndMasks # noqa: E501
>>> check_input = CheckAffineResStacksAndMasks()
>>> check_input.inputs.stacks = ['sub-01_acq-haste_run-1_T2w.nii.gz']
>>> check_input.inputs.masks = ['sub-01_acq-haste_run-1_T2w_mask.nii.gz']  # noqa: E501
>>> check_input.run()
Source code in fetpype/nodes/preprocessing.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
class CheckAffineResStacksAndMasks(BaseInterface):
    """
    Interface to check that the shape of stacks and masks are consistent.
    (e.g. no trailing dimensions of size 1) and stack ordering in the last dimension.
    If enabled, also checks that the resolution, affine, and shape of the
    stacks and masks are consistent. Discards the stack and mask if they are
    not.

    Args:

        stacks (input; list): List of input stacks.
        masks (input; list): List of input masks.
        is_enabled (input; bool): Whether the check is enabled.
        output_stacks (output; list): List of stacks that passed the check.
        output_masks (output; list): List of masks that passed the check.

    Examples:
        >>> from fetpype.nodes.preprocessing import CheckAffineResStacksAndMasks # noqa: E501
        >>> check_input = CheckAffineResStacksAndMasks()
        >>> check_input.inputs.stacks = ['sub-01_acq-haste_run-1_T2w.nii.gz']
        >>> check_input.inputs.masks = ['sub-01_acq-haste_run-1_T2w_mask.nii.gz']  # noqa: E501
        >>> check_input.run() # doctest: +SKIP
    """

    input_spec = CheckAffineResStacksAndMasksInputSpec
    output_spec = CheckAffineResStacksAndMasksOutputSpec
    _results = {}

    def _squeeze_dim(self, arr, dim):
        if arr.shape[dim] == 1 and len(arr.shape) > 3:
            return np.squeeze(arr, axis=dim)
        return arr

    def compare_resolution_affine(self, r1, a1, r2, a2, s1, s2) -> bool:
        r1 = np.array(r1)
        a1 = np.array(a1)
        r2 = np.array(r2)
        a2 = np.array(a2)
        if s1 != s2:
            return False
        if r1.shape != r2.shape:
            return False
        if np.amax(np.abs(r1 - r2)) > 1e-3:
            return False
        if a1.shape != a2.shape:
            return False
        if np.amax(np.abs(a1 - a2)) > 1e-3:
            return False
        return True

    def check_inplane_pos(self, path, r1):
        """
        Check if the smallest dimension of the stack is the last one.
        """
        vx_str = " x ".join([f"{v:.2f}" for v in r1])
        assert r1[0] == r1[1], (
            f"Inconsistent voxel sizes at dimensions 0 and 1 "
            f"for {path} (voxel size = ({vx_str})). "
            "Are you sure that the data are "
            f"formatted as in-plane x in-plane x through-plane?"
        )

    def _run_interface(self, runtime):
        stacks_out = []
        masks_out = []
        for i, (imp, maskp) in enumerate(
            zip(self.inputs.stacks, self.inputs.masks)
        ):
            skip_stack = False
            out_stack = os.path.join(
                self._gen_filename("output_dir"), os.path.basename(imp)
            )
            out_mask = os.path.join(
                self._gen_filename("output_dir"),
                os.path.basename(maskp),
            )
            image_ni = ni.load(self.inputs.stacks[i])
            mask_ni = ni.load(self.inputs.masks[i])
            image = self._squeeze_dim(image_ni.get_fdata(), -1)
            mask = self._squeeze_dim(mask_ni.get_fdata(), -1)
            image_ni = ni.Nifti1Image(image, image_ni.affine, image_ni.header)
            mask_ni = ni.Nifti1Image(mask, mask_ni.affine, mask_ni.header)

            if self.inputs.is_enabled:
                im_res = image_ni.header["pixdim"][1:4]
                mask_res = mask_ni.header["pixdim"][1:4]
                im_aff = image_ni.affine
                mask_aff = mask_ni.affine
                im_shape = image_ni.shape
                mask_shape = mask_ni.shape
                self.check_inplane_pos(self.inputs.stacks[i], im_res)

                if not self.compare_resolution_affine(
                    im_res, im_aff, mask_res, mask_aff, im_shape, mask_shape
                ):
                    skip_stack = True
                    print(
                        f"Resolution/shape/affine mismatch -- "
                        f"Skipping the stack {os.path.basename(imp)} "
                        f"and mask {os.path.basename(maskp)}"
                    )
                if mask.sum() == 0:
                    skip_stack = True
                    print(
                        f"Mask {os.path.basename(maskp)} is empty -- "
                        f"Skipping the stack {os.path.basename(imp)} "
                        f"and mask {os.path.basename(maskp)}"
                    )

            if not skip_stack:
                ni.save(image_ni, out_stack)
                ni.save(mask_ni, out_mask)
                stacks_out.append(str(out_stack))
                masks_out.append(str(out_mask))
        self._results["output_stacks"] = stacks_out
        self._results["output_masks"] = masks_out
        if len(stacks_out) == 0:
            raise ValueError(
                "All stacks and masks were "
                "discarded during the metadata check."
            )
        return runtime

    def _gen_filename(self, name):

        if name == "output_dir":
            return os.path.abspath("")
        return None

    def _list_outputs(self):
        outputs = self._outputs().get()
        outputs["output_stacks"] = self._results.get(
            "output_stacks", self._gen_filename("output_stacks")
        )
        outputs["output_masks"] = self._results.get(
            "output_masks", self._gen_filename("output_masks")
        )
        return outputs

check_inplane_pos(path, r1)

Check if the smallest dimension of the stack is the last one.

Source code in fetpype/nodes/preprocessing.py
348
349
350
351
352
353
354
355
356
357
358
def check_inplane_pos(self, path, r1):
    """
    Check if the smallest dimension of the stack is the last one.
    """
    vx_str = " x ".join([f"{v:.2f}" for v in r1])
    assert r1[0] == r1[1], (
        f"Inconsistent voxel sizes at dimensions 0 and 1 "
        f"for {path} (voxel size = ({vx_str})). "
        "Are you sure that the data are "
        f"formatted as in-plane x in-plane x through-plane?"
    )

CheckAndSortStacksAndMasks

Bases: BaseInterface

Interface to check the input stacks and masks and make sure that all stacks have a corresponding mask.

Args:

stacks (input; list): List of input stacks.
masks (input; list): List of input masks.

output_stacks (output; list): List of stacks that passed the check.
output_masks (output; list): List of masks that passed the check.

Examples: >>> from fetpype.nodes.preprocessing import CheckAndSortStacksAndMasks >>> check_input = CheckAndSortStacksAndMasks() >>> check_input.inputs.stacks = ['sub-01_acq-haste_run-1_T2w.nii.gz'] >>> check_input.inputs.masks = ['sub-01_acq-haste_run-1_mask.nii.gz'] >>> check_input.run() # doctest: +SKIP

Source code in fetpype/nodes/preprocessing.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
class CheckAndSortStacksAndMasks(BaseInterface):
    """
    Interface to check the input stacks and masks and make sure that
    all stacks have a corresponding mask.

    Args:

        stacks (input; list): List of input stacks.
        masks (input; list): List of input masks.

        output_stacks (output; list): List of stacks that passed the check.
        output_masks (output; list): List of masks that passed the check.
    Examples:
        >>> from fetpype.nodes.preprocessing import CheckAndSortStacksAndMasks
        >>> check_input = CheckAndSortStacksAndMasks()
        >>> check_input.inputs.stacks = ['sub-01_acq-haste_run-1_T2w.nii.gz']
        >>> check_input.inputs.masks = ['sub-01_acq-haste_run-1_mask.nii.gz']
        >>> check_input.run() # doctest: +SKIP
    """

    input_spec = CheckAndSortStacksAndMasksInputSpec
    output_spec = CheckAndSortStacksAndMasksOutputSpec
    _results = {}

    def _run_interface(self, runtime):

        # Check that stacks and masks run_ids match
        stacks_run = get_run_id(self.inputs.stacks)
        masks_run = get_run_id(self.inputs.masks)

        out_stacks = []
        out_masks = []
        for i, s in enumerate(stacks_run):
            in_stack = self.inputs.stacks[i]

            if s in masks_run:
                out_stack = os.path.join(
                    self._gen_filename("output_dir_stacks"),
                    os.path.basename(in_stack),
                )
                in_mask = self.inputs.masks[masks_run.index(s)]
                out_mask = os.path.join(
                    self._gen_filename("output_dir_masks"),
                    os.path.basename(in_mask),
                )
                out_stacks.append(out_stack)
                out_masks.append(out_mask)
            else:
                raise RuntimeError(
                    f"Stack {os.path.basename(self.inputs.stacks[i])} has "
                    f"no corresponding mask (existing IDs: {masks_run})."
                )

            os.system(f"cp {in_stack} " f"{out_stack}")
            os.system(f"cp {in_mask} " f"{out_mask}")
        self._results["output_stacks"] = out_stacks
        self._results["output_masks"] = out_masks
        return runtime

    def _gen_filename(self, name):
        if name == "output_dir_stacks":
            path = os.path.abspath("stacks")
            os.makedirs(path, exist_ok=True)
            return path
        elif name == "output_dir_masks":
            path = os.path.abspath("masks")
            os.makedirs(path, exist_ok=True)
            return path
        return None

    def _list_outputs(self):
        outputs = self._outputs().get()
        outputs["output_stacks"] = self._results["output_stacks"]
        outputs["output_masks"] = self._results["output_masks"]
        return outputs

run_prepro_cmd(input_stacks, cmd, is_enabled=True, input_masks=None, singularity_path=None, singularity_mount=None)

Run a preprocessing command on input stacks and masks.

Parameters:

Name Type Description Default
input_stacks str or list

Input stacks to process.

required
cmd str

Command to run, with tags for input and output.

required
is_enabled bool

Whether the command should be executed.

True
input_masks str or list

Input masks to process.

None
singularity_path str

Path to the Singularity executable.

None
singularity_mount str

Mount point for Singularity.

None

Returns: tuple: Output stacks and masks, if specified in the command. If only one of them is specified, returns that one. If none are specified, returns None.

Source code in fetpype/nodes/preprocessing.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
def run_prepro_cmd(
    input_stacks,
    cmd,
    is_enabled=True,
    input_masks=None,
    singularity_path=None,
    singularity_mount=None,
):
    """
    Run a preprocessing command on input stacks and masks.

    Args:
        input_stacks (str or list): Input stacks to process.
        cmd (str): Command to run, with tags for input and output.
        is_enabled (bool): Whether the command should be executed.
        input_masks (str or list, optional): Input masks to process.
        singularity_path (str, optional): Path to the Singularity executable.
        singularity_mount (str, optional): Mount point for Singularity.
    Returns:
        tuple: Output stacks and masks, if specified in the command.
               If only one of them is specified, returns that one.
               If none are specified, returns None.

    """
    import os
    from fetpype import VALID_PREPRO_TAGS
    import subprocess

    # Important for mapnodes
    unlist_stacks = False
    unlist_masks = False

    if isinstance(input_stacks, str):
        input_stacks = [input_stacks]
        unlist_stacks = True
    if isinstance(input_masks, str):
        input_masks = [input_masks]
        unlist_masks = True

    from fetpype.nodes import is_valid_cmd, get_directory, get_mount_docker

    print(input_stacks, cmd, is_enabled, input_masks)
    is_valid_cmd(cmd, VALID_PREPRO_TAGS)
    if "<output_stacks>" not in cmd and "<output_masks>" not in cmd:
        raise RuntimeError(
            "No output stacks or masks specified in the command. "
            "Please specify <output_stacks> and/or <output_masks>."
        )

    if is_enabled:
        output_dir = os.path.join(os.getcwd(), "output")
        in_stacks_dir = get_directory(input_stacks)
        in_stacks = " ".join(input_stacks)

        in_masks = ""
        in_masks_dir = None
        if input_masks is not None:
            in_masks_dir = get_directory(input_masks)
            in_masks = " ".join(input_masks)

        output_stacks = None
        output_masks = None

        # In cmd, there will be things contained in <>.
        # Check that everything that is in <> is in valid_tags
        # If not, raise an error

        # Replace the tags in the command
        cmd = cmd.replace("<input_stacks>", in_stacks)
        cmd = cmd.replace("<input_masks>", in_masks)
        if "<output_stacks>" in cmd:
            output_stacks = [
                os.path.join(output_dir, os.path.basename(stack))
                for stack in input_stacks
            ]
            cmd = cmd.replace("<output_stacks>", " ".join(output_stacks))
        if "<output_masks>" in cmd:
            if input_masks:
                output_masks = [
                    os.path.join(output_dir, os.path.basename(mask))
                    for mask in input_masks
                ]
            else:
                output_masks = [
                    os.path.join(output_dir, os.path.basename(stack)).replace(
                        "_T2w", "_mask"
                    )
                    for stack in input_stacks
                ]
            cmd = cmd.replace("<output_masks>", " ".join(output_masks))

        if "<mount>" in cmd:
            mount_cmd = get_mount_docker(
                in_stacks_dir, in_masks_dir, output_dir
            )
            cmd = cmd.replace("<mount>", mount_cmd)
        if "<singularity_path>" in cmd:
            # assume that if we have a singularity path,
            # we are using singularity and the
            # parameter has been set in the config file
            cmd = cmd.replace("<singularity_path>", singularity_path)
        if "<singularity_mount>" in cmd:
            # assume that if we have a singularity mount path,
            # we are using singularity and the
            # parameter has been set in the config file
            cmd = cmd.replace("<singularity_mount>", singularity_mount)

        print(f"Running command:\n {cmd}")
        try:
            subprocess.run(
                cmd, shell=True, check=True, text=True, capture_output=True
            )
        except subprocess.CalledProcessError as e:
            if e.stderr:
                msg = f"Error output:\n{e.stderr.strip()}"
            elif e.stdout:
                msg = f"Container stdout:\n{e.stdout.strip()}"
            else:
                msg = "No error message from container"
            raise RuntimeError(
                f"Container call failed with exit code {e.returncode}.\n"
                f"Command: {getattr(e, 'cmd', cmd)}\n"
                f"{msg}"
            ) from e

    else:
        output_stacks = input_stacks if "<output_stacks>" in cmd else None
        output_masks = input_masks if "<output_masks>" in cmd else None

    if output_stacks is not None and unlist_stacks:
        assert (
            len(output_stacks) == 1
        ), "More than one stack was returned, but unlist_stacks is True."
        output_stacks = output_stacks[0]
    if output_masks is not None and unlist_masks:
        assert (
            len(output_masks) == 1
        ), "More than one mask was returned, but unlist_masks is True."
        output_masks = output_masks[0]
    if output_stacks is not None and output_masks is not None:
        return output_stacks, output_masks
    elif output_stacks is not None:
        return output_stacks
    elif output_masks is not None:
        return output_masks

reconstruction

run_recon_cmd(input_stacks, input_masks, cmd, cfg, singularity_path=None, singularity_mount=None)

Run a reconstruction command with the given input stacks and masks.

Args:

input_stacks (list): List of input stack file paths.
input_masks (list): List of input mask file paths.
cmd (str): Command to run, with placeholders for input and output.
cfg (object): Configuration object containing output directory
                and resolution.
singularity_path (str, optional): Path to the Singularity executable.
singularity_mount (str, optional): Mount point for Singularity.

Returns: str: Path to the output volume after reconstruction.

Source code in fetpype/nodes/reconstruction.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def run_recon_cmd(
    input_stacks,
    input_masks,
    cmd,
    cfg,
    singularity_path=None,
    singularity_mount=None,
):
    """
    Run a reconstruction command with the given input stacks and masks.

    Args:

        input_stacks (list): List of input stack file paths.
        input_masks (list): List of input mask file paths.
        cmd (str): Command to run, with placeholders for input and output.
        cfg (object): Configuration object containing output directory
                        and resolution.
        singularity_path (str, optional): Path to the Singularity executable.
        singularity_mount (str, optional): Mount point for Singularity.
    Returns:
        str: Path to the output volume after reconstruction.
    """
    import os
    import subprocess
    import numpy as np
    import nibabel as nib
    import traceback
    from fetpype import VALID_RECON_TAGS as VALID_TAGS
    from fetpype.nodes import is_valid_cmd, get_directory, get_mount_docker

    is_valid_cmd(cmd, VALID_TAGS)
    output_dir = os.path.join(os.getcwd(), "recon")
    output_volume = os.path.join(output_dir, "recon.nii.gz")
    in_stacks_dir = get_directory(input_stacks)
    in_stacks = " ".join(input_stacks)
    in_masks_dir = get_directory(input_masks)
    in_masks = " ".join(input_masks)

    # In cmd, there will be things contained in <>.
    # Check that everything that is in <> is in valid_tags
    # If not, raise an error

    # Replace the tags in the command
    cmd = cmd.replace("<input_stacks>", in_stacks)
    cmd = cmd.replace("<input_dir>", in_stacks_dir)
    cmd = cmd.replace("<input_masks>", in_masks)
    cmd = cmd.replace("<input_masks_dir>", in_masks_dir)
    if "<output_volume>" in cmd:
        cmd = cmd.replace("<output_volume>", output_volume)
    if "<output_dir>" in cmd:
        cmd = cmd.replace("<output_dir>", output_dir)
        # Assert that args.path_to_output is defined
        assert cfg.path_to_output is not None, (
            "<output_dir> found in the command of reconstruction, "
            "but path_to_output is not defined."
        )
        output_volume = os.path.join(output_dir, cfg.path_to_output)
    if "<input_tp>" in cmd:
        try:
            input_tp = np.round(
                np.mean(
                    [
                        nib.load(stack).header.get_zooms()[2]
                        for stack in input_stacks
                    ]
                ),
                1,
            )
            cmd = cmd.replace("<input_tp>", str(input_tp))
        except Exception as e:

            raise ValueError(
                f"Error when calculating <input_tp>: {e}"
                f"\n{traceback.format_exc()}"
            )

    if "<singularity_path>" in cmd:
        # assume that if we have a singularity path,
        # we are using singularity and the
        # parameter has been set in the config file
        cmd = cmd.replace("<singularity_path>", singularity_path)
    if "<singularity_mount>" in cmd:
        # assume that if we have a singularity mount path,
        # we are using singularity and the
        # parameter has been set in the config file
        cmd = cmd.replace("<singularity_mount>", singularity_mount)

    if "<output_res>" in cmd:
        output_res = cfg.output_resolution
        cmd = cmd.replace("<output_res>", str(output_res))
    if "<mount>" in cmd:
        mount_cmd = get_mount_docker(in_stacks_dir, in_masks_dir, output_dir)
        cmd = cmd.replace("<mount>", mount_cmd)
    print(f"Running command:\n {cmd}")
    try:
        subprocess.run(
            cmd, shell=True, check=True, text=True, capture_output=True
        )
    except subprocess.CalledProcessError as e:
        if e.stderr:
            msg = f"Error output:\n{e.stderr.strip()}"
        elif e.stdout:
            msg = f"Container stdout:\n{e.stdout.strip()}"
        else:
            msg = "No error message from container"
        raise RuntimeError(
            f"Container call failed with exit code {e.returncode}.\n"
            f"Command: {getattr(e, 'cmd', cmd)}\n"
            f"{msg}"
        ) from e
    return output_volume

segmentation

run_seg_cmd(input_srr, cmd, cfg, singularity_path=None, singularity_mount=None, singularity_home=None)

Run a segmentation command with the given input SRR.

Parameters:

Name Type Description Default
input_srr str or list

Path to the input SRR file or a list containing a single SRR file.

required
cmd str

Command to run, with placeholders for input and output.

required
cfg object

Configuration object containing output directory.

required
singularity_path str

Path to the Singularity executable.

None
singularity_mount str

Mount point for Singularity.

None

Returns: str: Path to the output segmentation file after running the command.

Source code in fetpype/nodes/segmentation.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def run_seg_cmd(
    input_srr,
    cmd,
    cfg,
    singularity_path=None,
    singularity_mount=None,
    singularity_home=None,
):
    """
    Run a segmentation command with the given input SRR.

    Args:
        input_srr (str or list): Path to the input SRR file or a list
                                containing a single SRR file.
        cmd (str): Command to run, with placeholders for input and output.
        cfg (object): Configuration object containing output directory.
        singularity_path (str, optional): Path to the Singularity executable.
        singularity_mount (str, optional): Mount point for Singularity.
    Returns:
        str: Path to the output segmentation file after running the command.

    """
    import os
    import subprocess
    from fetpype import VALID_SEG_TAGS as VALID_TAGS
    from fetpype.nodes import is_valid_cmd, get_mount_docker

    is_valid_cmd(cmd, VALID_TAGS)

    # Check if input_srr is a directory or a file
    if isinstance(input_srr, list):
        if len(input_srr) == 1:
            input_srr = input_srr[0]
        else:
            raise ValueError(
                "input_srr is a list, and contains multiple elements. "
                "It should be a single element."
            )
    # Copy input_srr to input_directory
    # Avoid mounting problematic directories
    input_srr_dir = os.path.join(os.getcwd(), "seg/input")
    os.makedirs(input_srr_dir, exist_ok=True)
    os.system(f"cp {input_srr} {input_srr_dir}/input_srr.nii.gz")
    input_srr = os.path.join(input_srr_dir, "input_srr.nii.gz")

    output_dir = os.path.join(os.getcwd(), "seg/out")
    seg = os.path.join(output_dir, "seg.nii.gz")

    # In cmd, there will be things contained in <>.
    # Check that everything that is in <> is in valid_tags
    # If not, raise an error

    # Replace the tags in the command
    cmd = cmd.replace("<input_srr>", input_srr)
    cmd = cmd.replace("<input_dir>", input_srr_dir)
    cmd = cmd.replace("<output_seg>", seg)
    if "<output_dir>" in cmd:
        cmd = cmd.replace("<output_dir>", output_dir)
        # Assert that args.path_to_output is defined
        assert cfg.path_to_output is not None, (
            "<output_dir> found in the command of reconstruction, "
            " but path_to_output is not defined."
        )

        seg = os.path.join(output_dir, cfg.path_to_output)
        if "<basename>" in seg:
            # Remove all extensions from the basename
            # (handles .nii.gz correctly)
            basename = os.path.basename(input_srr)
            # Remove all extensions (handles both .nii.gz and .nii cases)
            basename_no_ext = basename.split(".")[0]
            seg = seg.replace("<basename>", basename_no_ext)
    if "<mount>" in cmd:
        mount_cmd = get_mount_docker(input_srr_dir, output_dir)
        cmd = cmd.replace("<mount>", mount_cmd)
    if "<singularity_path>" in cmd:
        # assume that if we have a singularity path,
        # we are using singularity and the
        # parameter has been set in the config file
        cmd = cmd.replace("<singularity_path>", singularity_path)
    if "<singularity_mount>" in cmd:
        # assume that if we have a singularity mount path,
        # we are using singularity and the
        # parameter has been set in the config file
        cmd = cmd.replace("<singularity_mount>", singularity_mount)

    if "<singularity_home>" in cmd:
        # assume that if we have a singularity mount path,
        # we are using singularity and the
        # parameter has been set in the config file
        cmd = cmd.replace("<singularity_home>", singularity_home)
    print(f"Running command:\n {cmd}")
    try:
        subprocess.run(
            cmd, shell=True, check=True, text=True, capture_output=True
        )
    except subprocess.CalledProcessError as e:
        if e.stderr:
            msg = f"Error output:\n{e.stderr.strip()}"
        elif e.stdout:
            msg = f"Container stdout:\n{e.stdout.strip()}"
        else:
            msg = "No error message from container"
        raise RuntimeError(
            f"Container call failed with exit code {e.returncode}.\n"
            f"Command: {getattr(e, 'cmd', cmd)}\n"
            f"{msg}"
        ) from e
    return seg

utils

get_directory(entry)

Get the directory of an entry, to be mounted on docker If entry is a list, it returns the common path. If entry is a string, it returns the dirname.

Source code in fetpype/nodes/utils.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_directory(entry):
    """
    Get the directory of an entry, to be mounted on docker
    If entry is a list, it returns the common path.
    If entry is a string, it returns the dirname.
    """
    if isinstance(entry, list):
        if len(entry) == 1:
            return os.path.dirname(entry[0])
        return os.path.commonpath(entry)

    elif isinstance(entry, str):
        return os.path.dirname(entry)
    else:
        raise TypeError(f"Type {type(entry)} not supported")

get_mount_docker(*args)

Build the string for the folders to be mounted on the docker image. The folders to be mounted are defined in _mount_keys.

Source code in fetpype/nodes/utils.py
26
27
28
29
30
31
32
33
34
35
36
37
def get_mount_docker(*args):
    """
    Build the string for the folders to be mounted on the
    docker image. The folders to be mounted are defined
    in _mount_keys.
    """
    mount_args = []
    for arg in args:
        if arg is not None:
            os.makedirs(arg, exist_ok=True)
            mount_args.append(arg)
    return " ".join([f"-v {arg}:{arg}" for arg in mount_args])

get_run_id(file_list)

Get the run ID from the file name.

Source code in fetpype/nodes/utils.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_run_id(file_list):
    """
    Get the run ID from the file name.
    """
    runs = []
    for file in file_list:
        try:
            runs.append(re.search(r"run-([^\W_]+)_", file).group(1))
        except Exception as e:
            raise ValueError(
                f"run ID not found in file name: {file}. Error: {e}"
            )
    return runs