Skip to content

Workflows

pipeline_fet

create_main_workflow(data_dir, masks_dir, out_dir, nipype_dir, subjects, sessions, acquisitions, cfg_path, nprocs, save_intermediates=False)

Instantiates and runs the entire workflow of the fetpype pipeline.

Parameters:

Name Type Description Default
data_dir str

Path to the BIDS directory that contains anatomical images.

required
out_dir str

Path to the output directory (will be created if not already existing). Previous outputs may be overriden.

required
nipype_dir str

Path to the nipype directory.

required
subjects list[str]

List of subject IDs matching the BIDS specification (e.g., sub-[SUB1], sub-[SUB2], ...).

required
sessions list[str]

List of session IDs matching the BIDS specification (e.g., ses-[SES1], ses-[SES2], ...).

required
acquisitions list[str]

List of acquisition names matching the BIDS specification (e.g., acq-[ACQ1], ...).

required
cfg_path str

Path to a hydra configuration file (YAML) specifying pipeline parameters.

required
nprocs int

Number of processes to be launched by MultiProc.

required
Source code in fetpype/workflows/pipeline_fet.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
def create_main_workflow(
    data_dir,
    masks_dir,
    out_dir,
    nipype_dir,
    subjects,
    sessions,
    acquisitions,
    cfg_path,
    nprocs,
    save_intermediates=False,
):
    """
    Instantiates and runs the entire workflow of the fetpype pipeline.

    Args:
        data_dir (str):
            Path to the BIDS directory that contains anatomical images.
        out_dir (str):
            Path to the output directory (will be created if not already
            existing). Previous outputs may be overriden.
        nipype_dir (str):
            Path to the nipype directory.
        subjects (list[str], optional):
            List of subject IDs matching the BIDS specification
            (e.g., sub-[SUB1], sub-[SUB2], ...).
        sessions (list[str], optional):
            List of session IDs matching the BIDS specification
            (e.g., ses-[SES1], ses-[SES2], ...).
        acquisitions (list[str], optional):
            List of acquisition names matching the BIDS specification
            (e.g., acq-[ACQ1], ...).
        cfg_path (str):
            Path to a hydra  configuration file (YAML) specifying pipeline
            parameters.
        nprocs (int):
            Number of processes to be launched by MultiProc.

    """

    cfg = init_and_load_cfg(cfg_path)
    data_dir, out_dir, nipype_dir = check_and_update_paths(
        data_dir, out_dir, nipype_dir, cfg
    )

    # Print the three paths
    print(f"Data directory: {data_dir}")
    print(f"Output directory: {out_dir}")
    print(f"Nipype directory: {nipype_dir}")

    load_masks = False
    if masks_dir is not None:
        # Check it exists
        if not os.path.exists(masks_dir):
            raise ValueError(
                f"Path to masks directory {masks_dir} does not exist."
            )
        masks_dir = os.path.abspath(masks_dir)
        load_masks = True

    check_valid_pipeline(cfg)

    # main_workflow
    main_workflow = pe.Workflow(name=get_pipeline_name(cfg))
    main_workflow.base_dir = nipype_dir
    fet_pipe = create_full_pipeline(cfg, load_masks)

    output_query = {
        "stacks": {
            "datatype": "anat",
            "suffix": "T2w",
            "extension": ["nii", ".nii.gz"],
        },
    }
    if load_masks:
        output_query["masks"] = {
            "datatype": "anat",
            "suffix": "mask",
            "extension": ["nii", ".nii.gz"],
        }

    # datasource
    datasource = create_datasource(
        output_query,
        data_dir,
        subjects,
        sessions,
        acquisitions,
        extra_derivatives=masks_dir,
    )

    input_data = pe.Workflow(name="input")

    output_fields = ["stacks"]
    if load_masks:
        output_fields.append("masks")
    output = pe.Node(
        niu.IdentityInterface(fields=output_fields), name="outputnode"
    )
    input_data.connect(datasource, "stacks", output, "stacks")
    if load_masks:
        input_data.connect(datasource, "masks", output, "masks")

    main_workflow.connect(
        input_data, "outputnode.stacks", fet_pipe, "inputnode.stacks"
    )
    if load_masks:
        main_workflow.connect(
            datasource, "outputnode.masks", fet_pipe, "inputnode.masks"
        )

    # Reconstruction data sink:
    pipeline_name = get_pipeline_name(cfg)

    # Preprocessing data sink:
    if save_intermediates:
        datasink_path_intermediate = os.path.join(out_dir, "preprocessing")
        os.makedirs(datasink_path_intermediate, exist_ok=True)
        create_description_file(
            datasink_path_intermediate, "preprocessing", cfg=cfg.reconstruction
        )

        # Create a datasink for the preprocessing pipeline
        preprocessing_datasink_denoised = create_bids_datasink(
            out_dir=datasink_path_intermediate,
            pipeline_name="preprocessing",  # Use combined name
            strip_dir=main_workflow.base_dir,
            name="preprocessing_datasink_denoised",
            desc_label="denoised",
        )
        preprocessing_datasink_masked = create_bids_datasink(
            out_dir=datasink_path_intermediate,
            pipeline_name="preprocessing",  # Use combined name
            strip_dir=main_workflow.base_dir,
            name="preprocessing_datasink_cropped",
            desc_label="cropped",
        )

        # Connect the pipeline to the datasinks
        main_workflow.connect(
            fet_pipe,
            "Preprocessing.outputnode.stacks",
            preprocessing_datasink_denoised,
            "@stacks",
        )
        main_workflow.connect(
            fet_pipe,
            "Preprocessing.outputnode.masks",
            preprocessing_datasink_masked,
            "@masks",
        )

    recon_datasink = create_bids_datasink(
        out_dir=out_dir,
        pipeline_name=pipeline_name,
        strip_dir=main_workflow.base_dir,
        name="final_recon_datasink",
        rec_label=cfg.reconstruction.pipeline,
    )

    # Create another datasink for the segmentation pipeline
    seg_datasink = create_bids_datasink(
        out_dir=out_dir,
        pipeline_name=pipeline_name,
        strip_dir=main_workflow.base_dir,
        name="final_seg_datasink",
        rec_label=cfg.reconstruction.pipeline,
        seg_label=cfg.segmentation.pipeline,
    )

    # Connect the pipeline to the datasink
    main_workflow.connect(
        fet_pipe, "outputnode.output_srr", recon_datasink, f"@{pipeline_name}"
    )
    main_workflow.connect(
        fet_pipe,
        "outputnode.output_seg",
        seg_datasink,
        f"@{cfg.segmentation.pipeline}",
    )

    if cfg.save_graph:
        main_workflow.write_graph(
            graph2use="colored",
            format="png",
            simple_form=True,
        )

    main_workflow.config["execution"] = {"remove_unnecessary_outputs": "false"}
    main_workflow.run(plugin="MultiProc", plugin_args={"n_procs": nprocs})

pipeline_rec

create_rec_workflow(data_dir, masks_dir, out_dir, nipype_dir, subjects, sessions, acquisitions, cfg_path, nprocs)

Instantiates and runs the entire workflow of the fetpype pipeline.

Parameters:

Name Type Description Default
data_dir str

Path to the BIDS directory that contains anatomical images.

required
masks_dir str

Path to the BIDS directory that contains brain masks.

required
out_dir str

Path to the output directory (will be created if not already existing). Previous outputs may be overriden.

required
nipype_dir str

Path to the nipype directory.

required
subjects list[str]

List of subject IDs matching the BIDS specification (e.g., sub-[SUB1], sub-[SUB2], ...).

required
sessions list[str]

List of session IDs matching the BIDS specification (e.g., ses-[SES1], ses-[SES2], ...).

required
acquisitions list[str]

List of acquisition names matching the BIDS specification (e.g., acq-[ACQ1], ...).

required
cfg_path str

Path to a hydra configuration file (YAML) specifying pipeline parameters.

required
nprocs int

Number of processes to be launched by MultiProc.

required
Source code in fetpype/workflows/pipeline_rec.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def create_rec_workflow(
    data_dir,
    masks_dir,
    out_dir,
    nipype_dir,
    subjects,
    sessions,
    acquisitions,
    cfg_path,
    nprocs,
):
    """
    Instantiates and runs the entire workflow of the fetpype pipeline.

    Args:
        data_dir (str):
            Path to the BIDS directory that contains anatomical images.
        masks_dir (str):
            Path to the BIDS directory that contains brain masks.
        out_dir (str):
            Path to the output directory (will be created if not already
            existing). Previous outputs may be overriden.
        nipype_dir (str):
            Path to the nipype directory.
        subjects (list[str], optional):
            List of subject IDs matching the BIDS specification
            (e.g., sub-[SUB1], sub-[SUB2], ...).
        sessions (list[str], optional):
            List of session IDs matching the BIDS specification
            (e.g., ses-[SES1], ses-[SES2], ...).
        acquisitions (list[str], optional):
            List of acquisition names matching the BIDS specification
            (e.g., acq-[ACQ1], ...).
        cfg_path (str):
            Path to a hydra  configuration file (YAML) specifying pipeline
            parameters.
        nprocs (int):
            Number of processes to be launched by MultiProc.

    """

    cfg = init_and_load_cfg(cfg_path)

    data_dir, out_dir, nipype_dir = check_and_update_paths(
        data_dir, out_dir, nipype_dir, cfg
    )

    load_masks = False
    if masks_dir is not None:
        # Check it exists
        if not os.path.exists(masks_dir):
            raise ValueError(
                f"Path to masks directory {masks_dir} does not exist."
            )
        masks_dir = os.path.abspath(masks_dir)
        load_masks = True
    check_valid_pipeline(cfg)
    # if general, pipeline is not in params ,create it and set it to niftymic

    # main_workflow
    main_workflow = pe.Workflow(name=get_pipeline_name(cfg))
    main_workflow.base_dir = nipype_dir
    fet_pipe = create_rec_pipeline(cfg, load_masks)

    output_query = {
        "stacks": {
            "datatype": "anat",
            "suffix": "T2w",
            "extension": ["nii", ".nii.gz"],
        },
    }
    if load_masks:
        output_query["masks"] = {
            "datatype": "anat",
            "suffix": "mask",
            "extension": ["nii", ".nii.gz"],
        }

    # datasource
    datasource = create_datasource(
        output_query,
        data_dir,
        subjects,
        sessions,
        acquisitions,
        extra_derivatives=masks_dir,
    )
    main_workflow.connect(datasource, "stacks", fet_pipe, "inputnode.stacks")
    if load_masks:
        main_workflow.connect(datasource, "masks", fet_pipe, "inputnode.masks")

    # DataSink

    # Reconstruction data sink:
    pipeline_name = cfg.reconstruction.pipeline
    create_description_file(out_dir, pipeline_name, cfg=cfg.reconstruction)

    datasink = create_bids_datasink(
        out_dir=out_dir,
        pipeline_name=pipeline_name,
        strip_dir=main_workflow.base_dir,
        name="final_recon_datasink",
        rec_label=cfg.reconstruction.pipeline,
    )
    # datasink.inputs.base_directory = datasink_path

    # Connect the pipeline to the datasink
    main_workflow.connect(
        fet_pipe, "outputnode.output_srr", datasink, f"@{pipeline_name}"
    )

    if cfg.save_graph:
        main_workflow.write_graph(
            graph2use="colored",
            format="png",
            simple_form=True,
        )

    main_workflow.config["execution"] = {"remove_unnecessary_outputs": "false"}
    main_workflow.run(plugin="MultiProc", plugin_args={"n_procs": nprocs})

pipeline_seg

create_seg_workflow(data_dir, out_dir, nipype_dir, subjects, sessions, acquisitions, cfg_path, nprocs, ignore_checks=False)

Instantiates and runs the entire workflow of the fetpype pipeline.

Parameters:

Name Type Description Default
data_dir str

Path to the BIDS directory that contains anatomical images.

required
out_dir str

Path to the output directory (will be created if not already existing). Previous outputs may be overriden.

required
nipype_dir str

Path to the nipype directory.

required
subjects list[str]

List of subject IDs matching the BIDS specification (e.g., sub-[SUB1], sub-[SUB2], ...).

required
sessions list[str]

List of session IDs matching the BIDS specification (e.g., ses-[SES1], ses-[SES2], ...).

required
acquisitions list[str]

List of acquisition names matching the BIDS specification (e.g., acq-[ACQ1], ...).

required
cfg_path str

Path to a hydra configuration file (YAML) specifying pipeline parameters.

required
nprocs int

Number of processes to be launched by MultiProc.

required
Source code in fetpype/workflows/pipeline_seg.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def create_seg_workflow(
    data_dir,
    out_dir,
    nipype_dir,
    subjects,
    sessions,
    acquisitions,
    cfg_path,
    nprocs,
    ignore_checks=False,
):
    """
    Instantiates and runs the entire workflow of the fetpype pipeline.

    Args:
        data_dir (str):
            Path to the BIDS directory that contains anatomical images.
        out_dir (str):
            Path to the output directory (will be created if not already
            existing). Previous outputs may be overriden.
        nipype_dir (str):
            Path to the nipype directory.
        subjects (list[str], optional):
            List of subject IDs matching the BIDS specification
            (e.g., sub-[SUB1], sub-[SUB2], ...).
        sessions (list[str], optional):
            List of session IDs matching the BIDS specification
            (e.g., ses-[SES1], ses-[SES2], ...).
        acquisitions (list[str], optional):
            List of acquisition names matching the BIDS specification
            (e.g., acq-[ACQ1], ...).
        cfg_path (str):
            Path to a hydra  configuration file (YAML) specifying pipeline
            parameters.
        nprocs (int):
            Number of processes to be launched by MultiProc.

    """

    cfg = init_and_load_cfg(cfg_path)
    data_dir, out_dir, nipype_dir = check_and_update_paths(
        data_dir, out_dir, nipype_dir, cfg
    )
    check_valid_pipeline(cfg)
    # if general, pipeline is not in params ,create it and set it to niftymic

    data_desc = os.path.join(data_dir, "dataset_description.json")
    if not ignore_checks:
        if os.path.exists(data_desc):
            with open(data_desc, "r") as f:
                data_desc = json.load(f)
            name = data_desc.get("Name", None)
            if "_" in name:
                name = name.split("_")[0]
            if name not in VALID_RECONSTRUCTION:
                raise ValueError(
                    f"Method name <{data_desc['Name']}> is not a valid "
                    "reconstruction method. Are you sure that you are "
                    "passing a reconstructed dataset?\n"
                    "If you know what you are doing, you can ignore "
                    "this error by adding --ignore_check to the command line."
                )
        else:
            raise ValueError(
                f"dataset_description.json file not found in {data_dir}. "
                "Please provide a valid BIDS directory."
            )
    # main_workflow
    main_workflow = pe.Workflow(name=get_pipeline_name(cfg))
    main_workflow.base_dir = nipype_dir
    fet_pipe = create_seg_pipeline(cfg)

    output_query = {
        "srr_volume": {
            "datatype": "anat",
            "suffix": "T2w",
            "extension": ["nii", ".nii.gz"],
        }
    }

    # datasource
    datasource = create_datasource(
        output_query,
        data_dir,
        subjects,
        sessions,
        acquisitions,
    )

    # in both cases we connect datsource outputs to main pipeline
    main_workflow.connect(datasource, "srr_volume",
                          fet_pipe, "inputnode.srr_volume")

    # DataSink

    # Segmentation data sink:
    pipeline_name = cfg.segmentation.pipeline
    datasink_path = os.path.join(out_dir, pipeline_name)
    # Create json file to make it BIDS compliant if doesnt exist
    # Eventually, add all parameters to the json file
    os.makedirs(datasink_path, exist_ok=True)

    # Create datasink
    pipeline_name = cfg.segmentation.pipeline
    os.makedirs(datasink_path, exist_ok=True)
    prev_desc = os.path.join(data_dir, "dataset_description.json")
    if not os.path.exists(prev_desc):
        prev_desc = None

    create_description_file(out_dir, pipeline_name,
                            prev_desc, cfg.segmentation)
    # Create another datasink for the segmentation pipeline
    seg_datasink = create_bids_datasink(
        out_dir=out_dir,
        pipeline_name=pipeline_name,
        strip_dir=main_workflow.base_dir,
        name="final_seg_datasink",
        rec_label=cfg.reconstruction.pipeline,
        seg_label=cfg.segmentation.pipeline,
    )
    # Add the base directory

    # Connect the pipeline to the datasink
    main_workflow.connect(
        fet_pipe, "outputnode.output_seg", seg_datasink, pipeline_name
    )

    if cfg.save_graph:
        main_workflow.write_graph(
            graph2use="colored",
            format="png",
            simple_form=True,
        )
    main_workflow.config["execution"] = {"remove_unnecessary_outputs": "false"}
    main_workflow.run(plugin="MultiProc", plugin_args={"n_procs": nprocs})