# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
#
# Copyright 2021 The NiPreps Developers <nipreps@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
# https://www.nipreps.org/community/licensing/
#
"""
Functional workflow
===================
.. image :: _static/functional_workflow_source.svg
The functional workflow follows the following steps:
#. Sanitize (revise data types and xforms) input data, read
associated metadata and discard non-steady state frames.
#. :abbr:`HMC (head-motion correction)` based on ``3dvolreg`` from
AFNI -- :py:func:`hmc`.
#. Skull-stripping of the time-series (AFNI) --
:py:func:`fmri_bmsk_workflow`.
#. Calculate mean time-series, and :abbr:`tSNR (temporal SNR)`.
#. Spatial Normalization to MNI (ANTs) -- :py:func:`epi_mni_align`
#. Extraction of IQMs -- :py:func:`compute_iqms`.
#. Individual-reports generation -- :py:func:`individual_reports`.
This workflow is orchestrated by :py:func:`fmri_qc_workflow`.
"""
from mriqc import config
from nipype.interfaces import io as nio
from nipype.interfaces import utility as niu
from nipype.pipeline import engine as pe
[docs]def fmri_qc_workflow(name="funcMRIQC"):
"""
Initialize the (f)MRIQC workflow.
.. workflow::
import os.path as op
from mriqc.workflows.functional import fmri_qc_workflow
from mriqc.testing import mock_config
with mock_config():
wf = fmri_qc_workflow()
"""
from nipype.algorithms.confounds import TSNR, NonSteadyStateDetector
from nipype.interfaces.afni import TStat
from niworkflows.interfaces.header import SanitizeImage
workflow = pe.Workflow(name=name)
mem_gb = config.workflow.biggest_file_gb
dataset = config.workflow.inputs.get("bold", [])
config.loggers.workflow.info(
f"""\
Building functional MRIQC workflow for files: {", ".join(dataset)}."""
)
# Define workflow, inputs and outputs
# 0. Get data, put it in RAS orientation
inputnode = pe.Node(niu.IdentityInterface(fields=["in_file"]), name="inputnode")
inputnode.iterables = [("in_file", dataset)]
outputnode = pe.Node(
niu.IdentityInterface(
fields=["qc", "mosaic", "out_group", "out_dvars", "out_fd"]
),
name="outputnode",
)
non_steady_state_detector = pe.Node(
NonSteadyStateDetector(), name="non_steady_state_detector"
)
sanitize = pe.Node(SanitizeImage(), name="sanitize", mem_gb=mem_gb * 4.0)
sanitize.inputs.max_32bit = config.execution.float32
# Workflow --------------------------------------------------------
# 1. HMC: head motion correct
hmcwf = hmc()
# Set HMC settings
hmcwf.inputs.inputnode.fd_radius = config.workflow.fd_radius
# 2. Compute mean fmri
mean = pe.Node(
TStat(options="-mean", outputtype="NIFTI_GZ"),
name="mean",
mem_gb=mem_gb * 1.5,
)
# EPI to MNI registration
ema = epi_mni_align()
# Compute TSNR using nipype implementation
tsnr = pe.Node(TSNR(), name="compute_tsnr", mem_gb=mem_gb * 2.5)
# 7. Compute IQMs
iqmswf = compute_iqms()
# Reports
repwf = individual_reports()
# fmt: off
workflow.connect([
(inputnode, iqmswf, [("in_file", "inputnode.in_file")]),
(inputnode, sanitize, [("in_file", "in_file")]),
(inputnode, non_steady_state_detector, [("in_file", "in_file")]),
(non_steady_state_detector, sanitize, [("n_volumes_to_discard", "n_volumes_to_discard")]),
(sanitize, hmcwf, [("out_file", "inputnode.in_file")]),
(hmcwf, mean, [("outputnode.out_file", "in_file")]),
(hmcwf, tsnr, [("outputnode.out_file", "in_file")]),
(mean, ema, [("out_file", "inputnode.epi_mean")]),
(sanitize, iqmswf, [("out_file", "inputnode.in_ras")]),
(mean, iqmswf, [("out_file", "inputnode.epi_mean")]),
(hmcwf, iqmswf, [("outputnode.out_file", "inputnode.hmc_epi"),
("outputnode.out_fd", "inputnode.hmc_fd")]),
(tsnr, iqmswf, [("tsnr_file", "inputnode.in_tsnr")]),
(sanitize, repwf, [("out_file", "inputnode.in_ras")]),
(mean, repwf, [("out_file", "inputnode.epi_mean")]),
(tsnr, repwf, [("stddev_file", "inputnode.in_stddev")]),
(hmcwf, repwf, [("outputnode.out_fd", "inputnode.hmc_fd"),
("outputnode.out_file", "inputnode.hmc_epi")]),
(ema, repwf, [("outputnode.epi_parc", "inputnode.epi_parc"),
("outputnode.report", "inputnode.mni_report")]),
(non_steady_state_detector, iqmswf, [("n_volumes_to_discard", "inputnode.exclude_index")]),
(iqmswf, repwf, [("outputnode.out_file", "inputnode.in_iqms"),
("outputnode.out_dvars", "inputnode.in_dvars"),
("outputnode.outliers", "inputnode.outliers"),
("outputnode.meta_sidecar", "inputnode.meta_sidecar")]),
(hmcwf, outputnode, [("outputnode.out_fd", "out_fd")]),
])
# fmt: on
if config.workflow.fft_spikes_detector:
# fmt: off
workflow.connect([
(iqmswf, repwf, [("outputnode.out_spikes", "inputnode.in_spikes"),
("outputnode.out_fft", "inputnode.in_fft")]),
])
# fmt: on
if config.workflow.ica:
from niworkflows.interfaces.reportlets.segmentation import MELODICRPT
melodic = pe.Node(
MELODICRPT(
no_bet=True,
no_mask=True,
no_mm=True,
compress_report=False,
generate_report=True,
),
name="ICA",
mem_gb=max(mem_gb * 5, 8),
)
# fmt: off
workflow.connect([
(sanitize, melodic, [("out_file", "in_files")]),
(melodic, repwf, [("out_report", "inputnode.ica_report")])
])
# fmt: on
# population specific changes to brain masking
if config.workflow.species == "human":
skullstrip_epi = fmri_bmsk_workflow()
# fmt: off
workflow.connect([
(mean, skullstrip_epi, [("out_file", "inputnode.in_file")]),
(skullstrip_epi, ema, [("outputnode.out_file", "inputnode.epi_mask")]),
(skullstrip_epi, iqmswf, [("outputnode.out_file", "inputnode.brainmask")]),
(skullstrip_epi, repwf, [("outputnode.out_file", "inputnode.brainmask")]),
])
# fmt: on
if config.workflow.ica:
workflow.connect(
[(skullstrip_epi, melodic, [("outputnode.out_file", "report_mask")])]
)
else:
from .anatomical import _binarize
binarise_labels = pe.Node(
niu.Function(
input_names=["in_file", "threshold"],
output_names=["out_file"],
function=_binarize,
),
name="binarise_labels",
)
# fmt: off
workflow.connect([
(ema, binarise_labels, [("outputnode.epi_parc", "in_file")]),
(binarise_labels, iqmswf, [("out_file", "inputnode.brainmask")]),
(binarise_labels, repwf, [("out_file", "inputnode.brainmask")])
])
# fmt: on
if config.workflow.ica:
workflow.connect(
[(binarise_labels, melodic, [("out_file", "report_mask")])]
)
# Upload metrics
if not config.execution.no_sub:
from mriqc.interfaces.webapi import UploadIQMs
upldwf = pe.Node(UploadIQMs(), name="UploadMetrics")
upldwf.inputs.url = config.execution.webapi_url
upldwf.inputs.strict = config.execution.upload_strict
if config.execution.webapi_port:
upldwf.inputs.port = config.execution.webapi_port
# fmt: off
workflow.connect([
(iqmswf, upldwf, [("outputnode.out_file", "in_iqms")]),
])
# fmt: on
return workflow
[docs]def compute_iqms(name="ComputeIQMs"):
"""
Initialize the workflow that actually computes the IQMs.
.. workflow::
from mriqc.workflows.functional import compute_iqms
from mriqc.testing import mock_config
with mock_config():
wf = compute_iqms()
"""
from nipype.algorithms.confounds import ComputeDVARS
from nipype.interfaces.afni import OutlierCount, QualityIndex
from niworkflows.interfaces.bids import ReadSidecarJSON
from mriqc.interfaces import FunctionalQC, IQMFileSink
from mriqc.interfaces.reports import AddProvenance
from mriqc.interfaces.transitional import GCOR
from mriqc.workflows.utils import _tofloat, get_fwhmx
mem_gb = config.workflow.biggest_file_gb
workflow = pe.Workflow(name=name)
inputnode = pe.Node(
niu.IdentityInterface(
fields=[
"in_file",
"in_ras",
"epi_mean",
"brainmask",
"hmc_epi",
"hmc_fd",
"fd_thres",
"in_tsnr",
"metadata",
"exclude_index",
]
),
name="inputnode",
)
outputnode = pe.Node(
niu.IdentityInterface(
fields=[
"out_file",
"out_dvars",
"outliers",
"out_spikes",
"out_fft",
"meta_sidecar",
]
),
name="outputnode",
)
# Set FD threshold
inputnode.inputs.fd_thres = config.workflow.fd_thres
# Compute DVARS
dvnode = pe.Node(
ComputeDVARS(save_plot=False, save_all=True),
name="ComputeDVARS",
mem_gb=mem_gb * 3,
)
# AFNI quality measures
fwhm_interface = get_fwhmx()
fwhm = pe.Node(fwhm_interface, name="smoothness")
# fwhm.inputs.acf = True # add when AFNI >= 16
outliers = pe.Node(
OutlierCount(fraction=True, out_file="outliers.out"),
name="outliers",
mem_gb=mem_gb * 2.5,
)
quality = pe.Node(
QualityIndex(automask=True),
out_file="quality.out",
name="quality",
mem_gb=mem_gb * 3,
)
gcor = pe.Node(GCOR(), name="gcor", mem_gb=mem_gb * 2)
measures = pe.Node(FunctionalQC(), name="measures", mem_gb=mem_gb * 3)
# fmt: off
workflow.connect([
(inputnode, dvnode, [("hmc_epi", "in_file"),
("brainmask", "in_mask")]),
(inputnode, measures, [("epi_mean", "in_epi"),
("brainmask", "in_mask"),
("hmc_epi", "in_hmc"),
("hmc_fd", "in_fd"),
("fd_thres", "fd_thres"),
("in_tsnr", "in_tsnr")]),
(inputnode, fwhm, [("epi_mean", "in_file"),
("brainmask", "mask")]),
(inputnode, quality, [("hmc_epi", "in_file")]),
(inputnode, outliers, [("hmc_epi", "in_file"),
("brainmask", "mask")]),
(inputnode, gcor, [("hmc_epi", "in_file"),
("brainmask", "mask")]),
(dvnode, measures, [("out_all", "in_dvars")]),
(fwhm, measures, [(("fwhm", _tofloat), "in_fwhm")]),
(dvnode, outputnode, [("out_all", "out_dvars")]),
(outliers, outputnode, [("out_file", "outliers")])
])
# fmt: on
# Add metadata
meta = pe.Node(ReadSidecarJSON(), name="metadata", run_without_submitting=True)
addprov = pe.Node(
AddProvenance(modality="bold"),
name="provenance",
run_without_submitting=True,
)
# Save to JSON file
datasink = pe.Node(
IQMFileSink(
modality="bold",
out_dir=str(config.execution.output_dir),
dataset=config.execution.dsname,
),
name="datasink",
run_without_submitting=True,
)
# fmt: off
workflow.connect([
(inputnode, datasink, [("in_file", "in_file"),
("exclude_index", "dummy_trs")]),
(inputnode, meta, [("in_file", "in_file")]),
(inputnode, addprov, [("in_file", "in_file")]),
(meta, datasink, [("subject", "subject_id"),
("session", "session_id"),
("task", "task_id"),
("acquisition", "acq_id"),
("reconstruction", "rec_id"),
("run", "run_id"),
("out_dict", "metadata")]),
(addprov, datasink, [("out_prov", "provenance")]),
(outliers, datasink, [(("out_file", _parse_tout), "aor")]),
(gcor, datasink, [(("out", _tofloat), "gcor")]),
(quality, datasink, [(("out_file", _parse_tqual), "aqi")]),
(measures, datasink, [("out_qc", "root")]),
(datasink, outputnode, [("out_file", "out_file")]),
(meta, outputnode, [("out_dict", "meta_sidecar")]),
])
# fmt: on
# FFT spikes finder
if config.workflow.fft_spikes_detector:
from .utils import slice_wise_fft
spikes_fft = pe.Node(
niu.Function(
input_names=["in_file"],
output_names=["n_spikes", "out_spikes", "out_fft"],
function=slice_wise_fft,
),
name="SpikesFinderFFT",
)
# fmt: off
workflow.connect([
(inputnode, spikes_fft, [("in_ras", "in_file")]),
(spikes_fft, outputnode, [("out_spikes", "out_spikes"),
("out_fft", "out_fft")]),
(spikes_fft, datasink, [("n_spikes", "spikes_num")])
])
# fmt: on
return workflow
[docs]def individual_reports(name="ReportsWorkflow"):
"""
Write out individual reportlets.
.. workflow::
from mriqc.workflows.functional import individual_reports
from mriqc.testing import mock_config
with mock_config():
wf = individual_reports()
"""
from niworkflows.interfaces.plotting import FMRISummary
from niworkflows.interfaces.morphology import BinaryDilation, BinarySubtraction
from mriqc.interfaces import PlotMosaic, PlotSpikes, Spikes
from mriqc.interfaces.reports import IndividualReport
verbose = config.execution.verbose_reports
mem_gb = config.workflow.biggest_file_gb
pages = 5
extra_pages = int(verbose) * 4
workflow = pe.Workflow(name=name)
inputnode = pe.Node(
niu.IdentityInterface(
fields=[
"in_iqms",
"in_ras",
"hmc_epi",
"epi_mean",
"brainmask",
"hmc_fd",
"fd_thres",
"epi_parc",
"in_dvars",
"in_stddev",
"outliers",
"in_spikes",
"in_fft",
"mni_report",
"ica_report",
"meta_sidecar",
]
),
name="inputnode",
)
# Set FD threshold
inputnode.inputs.fd_thres = config.workflow.fd_thres
spmask = pe.Node(
niu.Function(
input_names=["in_file", "in_mask"],
output_names=["out_file", "out_plot"],
function=spikes_mask,
),
name="SpikesMask",
mem_gb=mem_gb * 3.5,
)
spikes_bg = pe.Node(
Spikes(no_zscore=True, detrend=False),
name="SpikesFinderBgMask",
mem_gb=mem_gb * 2.5,
)
# Generate crown mask
# Create the crown mask
dilated_mask = pe.Node(BinaryDilation(), name="dilated_mask")
subtract_mask = pe.Node(BinarySubtraction(), name="subtract_mask")
parcels = pe.Node(niu.Function(function=_carpet_parcellation), name="parcels")
bigplot = pe.Node(FMRISummary(), name="BigPlot", mem_gb=mem_gb * 3.5)
# fmt: off
workflow.connect([
(inputnode, spikes_bg, [("in_ras", "in_file")]),
(inputnode, spmask, [("in_ras", "in_file")]),
(inputnode, bigplot, [("hmc_epi", "in_func"),
("hmc_fd", "fd"),
("fd_thres", "fd_thres"),
("in_dvars", "dvars"),
("outliers", "outliers"),
(("meta_sidecar", _get_tr), "tr")]),
(inputnode, parcels, [("epi_parc", "segmentation")]),
(inputnode, dilated_mask, [("brainmask", "in_mask")]),
(inputnode, subtract_mask, [("brainmask", "in_subtract")]),
(dilated_mask, subtract_mask, [("out_mask", "in_base")]),
(subtract_mask, parcels, [("out_mask", "crown_mask")]),
(parcels, bigplot, [("out", "in_segm")]),
(spikes_bg, bigplot, [("out_tsz", "in_spikes_bg")]),
(spmask, spikes_bg, [("out_file", "in_mask")]),
])
# fmt: on
mosaic_mean = pe.Node(
PlotMosaic(out_file="plot_func_mean_mosaic1.svg", cmap="Greys_r"),
name="PlotMosaicMean",
)
mosaic_stddev = pe.Node(
PlotMosaic(out_file="plot_func_stddev_mosaic2_stddev.svg", cmap="viridis"),
name="PlotMosaicSD",
)
mplots = pe.Node(
niu.Merge(
pages
+ extra_pages
+ int(config.workflow.fft_spikes_detector)
+ int(config.workflow.ica)
),
name="MergePlots",
)
rnode = pe.Node(IndividualReport(), name="GenerateReport")
# Link images that should be reported
dsplots = pe.Node(
nio.DataSink(
base_directory=str(config.execution.output_dir),
parameterization=False,
),
name="dsplots",
run_without_submitting=True,
)
# fmt: off
workflow.connect([
(inputnode, rnode, [("in_iqms", "in_iqms")]),
(inputnode, mosaic_mean, [("epi_mean", "in_file")]),
(inputnode, mosaic_stddev, [("in_stddev", "in_file")]),
(mosaic_mean, mplots, [("out_file", "in1")]),
(mosaic_stddev, mplots, [("out_file", "in2")]),
(bigplot, mplots, [("out_file", "in3")]),
(mplots, rnode, [("out", "in_plots")]),
(rnode, dsplots, [("out_file", "@html_report")]),
])
# fmt: on
if config.workflow.fft_spikes_detector:
mosaic_spikes = pe.Node(
PlotSpikes(
out_file="plot_spikes.svg",
cmap="viridis",
title="High-Frequency spikes",
),
name="PlotSpikes",
)
# fmt: off
workflow.connect([
(inputnode, mosaic_spikes, [("in_ras", "in_file"),
("in_spikes", "in_spikes"),
("in_fft", "in_fft")]),
(mosaic_spikes, mplots, [("out_file", "in4")])
])
# fmt: on
if config.workflow.ica:
page_number = 4 + config.workflow.fft_spikes_detector
# fmt: off
workflow.connect([
(inputnode, mplots, [("ica_report", "in%d" % page_number)])
])
# fmt: on
if not verbose:
return workflow
mosaic_zoom = pe.Node(
PlotMosaic(out_file="plot_anat_mosaic1_zoomed.svg", cmap="Greys_r"),
name="PlotMosaicZoomed",
)
mosaic_noise = pe.Node(
PlotMosaic(
out_file="plot_anat_mosaic2_noise.svg",
only_noise=True,
cmap="viridis_r",
),
name="PlotMosaicNoise",
)
# Verbose-reporting goes here
from ..interfaces.viz import PlotContours
plot_bmask = pe.Node(
PlotContours(
display_mode="z",
levels=[0.5],
colors=["r"],
cut_coords=10,
out_file="bmask",
),
name="PlotBrainmask",
)
# fmt: off
workflow.connect([
(inputnode, plot_bmask, [("epi_mean", "in_file"),
("brainmask", "in_contours")]),
(inputnode, mosaic_zoom, [("epi_mean", "in_file"),
("brainmask", "bbox_mask_file")]),
(inputnode, mosaic_noise, [("epi_mean", "in_file")]),
(mosaic_zoom, mplots, [("out_file", "in%d" % (pages + 1))]),
(mosaic_noise, mplots, [("out_file", "in%d" % (pages + 2))]),
(plot_bmask, mplots, [("out_file", "in%d" % (pages + 3))]),
(inputnode, mplots, [("mni_report", "in%d" % (pages + 4))]),
])
# fmt: on
return workflow
[docs]def fmri_bmsk_workflow(name="fMRIBrainMask"):
"""
Compute a brain mask for the input :abbr:`fMRI (functional MRI)` dataset.
.. workflow::
from mriqc.workflows.functional import fmri_bmsk_workflow
from mriqc.testing import mock_config
with mock_config():
wf = fmri_bmsk_workflow()
"""
from nipype.interfaces.afni import Automask
workflow = pe.Workflow(name=name)
inputnode = pe.Node(niu.IdentityInterface(fields=["in_file"]), name="inputnode")
outputnode = pe.Node(niu.IdentityInterface(fields=["out_file"]), name="outputnode")
afni_msk = pe.Node(Automask(outputtype="NIFTI_GZ"), name="afni_msk")
# Connect brain mask extraction
# fmt: off
workflow.connect([
(inputnode, afni_msk, [("in_file", "in_file")]),
(afni_msk, outputnode, [("out_file", "out_file")])
])
# fmt: on
return workflow
[docs]def hmc(name="fMRI_HMC"):
"""
Create a :abbr:`HMC (head motion correction)` workflow for fMRI.
.. workflow::
from mriqc.workflows.functional import hmc
from mriqc.testing import mock_config
with mock_config():
wf = hmc()
"""
from nipype.algorithms.confounds import FramewiseDisplacement
from nipype.interfaces.afni import Despike, Refit, Volreg
mem_gb = config.workflow.biggest_file_gb
workflow = pe.Workflow(name=name)
inputnode = pe.Node(
niu.IdentityInterface(fields=["in_file", "fd_radius"]),
name="inputnode",
)
outputnode = pe.Node(
niu.IdentityInterface(fields=["out_file", "out_fd"]), name="outputnode"
)
# calculate hmc parameters
hmc = pe.Node(
Volreg(args="-Fourier -twopass", zpad=4, outputtype="NIFTI_GZ"),
name="motion_correct",
mem_gb=mem_gb * 2.5,
)
# Compute the frame-wise displacement
fdnode = pe.Node(
FramewiseDisplacement(normalize=False, parameter_source="AFNI"),
name="ComputeFD",
)
# fmt: off
workflow.connect([
(inputnode, fdnode, [("fd_radius", "radius")]),
(hmc, outputnode, [("out_file", "out_file")]),
(hmc, fdnode, [("oned_file", "in_file")]),
(fdnode, outputnode, [("out_file", "out_fd")]),
])
# fmt: on
# despiking, and deoblique
deoblique_node = pe.Node(Refit(deoblique=True), name="deoblique")
despike_node = pe.Node(Despike(outputtype="NIFTI_GZ"), name="despike")
if config.workflow.despike and config.workflow.deoblique:
# fmt: off
workflow.connect([
(inputnode, despike_node, [("in_file", "in_file")]),
(despike_node, deoblique_node, [("out_file", "in_file")]),
(deoblique_node, hmc, [("out_file", "in_file")]),
])
# fmt: on
elif config.workflow.despike:
# fmt: off
workflow.connect([
(inputnode, despike_node, [("in_file", "in_file")]),
(despike_node, hmc, [("out_file", "in_file")]),
])
# fmt: on
elif config.workflow.deoblique:
# fmt: off
workflow.connect([
(inputnode, deoblique_node, [("in_file", "in_file")]),
(deoblique_node, hmc, [("out_file", "in_file")]),
])
# fmt: on
else:
# fmt: off
workflow.connect([
(inputnode, hmc, [("in_file", "in_file")]),
])
# fmt: on
return workflow
[docs]def epi_mni_align(name="SpatialNormalization"):
"""
Estimate the transform that maps the EPI space into MNI152NLin2009cAsym.
The input epi_mean is the averaged and brain-masked EPI timeseries
Returns the EPI mean resampled in MNI space (for checking out registration) and
the associated "lobe" parcellation in EPI space.
.. workflow::
from mriqc.workflows.functional import epi_mni_align
from mriqc.testing import mock_config
with mock_config():
wf = epi_mni_align()
"""
from nipype.interfaces.ants import ApplyTransforms, N4BiasFieldCorrection
from niworkflows.interfaces.reportlets.registration import (
SpatialNormalizationRPT as RobustMNINormalization,
)
from templateflow.api import get as get_template
# Get settings
testing = config.execution.debug
n_procs = config.nipype.nprocs
ants_nthreads = config.nipype.omp_nthreads
workflow = pe.Workflow(name=name)
inputnode = pe.Node(
niu.IdentityInterface(fields=["epi_mean", "epi_mask"]),
name="inputnode",
)
outputnode = pe.Node(
niu.IdentityInterface(fields=["epi_mni", "epi_parc", "report"]),
name="outputnode",
)
n4itk = pe.Node(
N4BiasFieldCorrection(dimension=3, copy_header=True), name="SharpenEPI"
)
norm = pe.Node(
RobustMNINormalization(
explicit_masking=False,
flavor="testing" if testing else "precise",
float=config.execution.ants_float,
generate_report=True,
moving="boldref",
num_threads=ants_nthreads,
reference="boldref",
template=config.workflow.template_id,
),
name="EPI2MNI",
num_threads=n_procs,
mem_gb=3,
)
if config.workflow.species.lower() == "human":
norm.inputs.reference_image = str(
get_template(config.workflow.template_id, resolution=2, suffix="boldref")
)
norm.inputs.reference_mask = str(
get_template(
config.workflow.template_id,
resolution=2,
desc="brain",
suffix="mask",
)
)
# adapt some population-specific settings
else:
from nirodents.workflows.brainextraction import _bspline_grid
n4itk.inputs.shrink_factor = 1
n4itk.inputs.n_iterations = [50] * 4
norm.inputs.reference_image = str(
get_template(config.workflow.template_id, suffix="T2w")
)
norm.inputs.reference_mask = str(
get_template(
config.workflow.template_id,
desc="brain",
suffix="mask",
)[0]
)
bspline_grid = pe.Node(
niu.Function(function=_bspline_grid), name="bspline_grid"
)
# fmt: off
workflow.connect([
(inputnode, bspline_grid, [('epi_mean', 'in_file')]),
(bspline_grid, n4itk, [('out', 'args')])
])
# fmt: on
# Warp segmentation into EPI space
invt = pe.Node(
ApplyTransforms(
float=True,
dimension=3,
default_value=0,
interpolation="MultiLabel",
),
name="ResampleSegmentation",
)
if config.workflow.species.lower() == "human":
invt.inputs.input_image = str(
get_template(
config.workflow.template_id,
resolution=1,
desc="carpet",
suffix="dseg",
)
)
else:
invt.inputs.input_image = str(
get_template(
config.workflow.template_id,
suffix="dseg",
)[-1]
)
# fmt: off
workflow.connect([
(inputnode, invt, [("epi_mean", "reference_image")]),
(inputnode, n4itk, [("epi_mean", "input_image")]),
(n4itk, norm, [("output_image", "moving_image")]),
(norm, invt, [
("inverse_composite_transform", "transforms")]),
(invt, outputnode, [("output_image", "epi_parc")]),
(norm, outputnode, [("warped_image", "epi_mni"),
("out_report", "report")]),
])
# fmt: on
if config.workflow.species.lower() == "human":
workflow.connect([(inputnode, norm, [("epi_mask", "moving_mask")])])
return workflow
[docs]def spikes_mask(in_file, in_mask=None, out_file=None):
"""Calculate a mask in which check for :abbr:`EM (electromagnetic)` spikes."""
import os.path as op
import nibabel as nb
import numpy as np
from nilearn.image import mean_img
from nilearn.plotting import plot_roi
from scipy import ndimage as nd
if out_file is None:
fname, ext = op.splitext(op.basename(in_file))
if ext == ".gz":
fname, ext2 = op.splitext(fname)
ext = ext2 + ext
out_file = op.abspath("{}_spmask{}".format(fname, ext))
out_plot = op.abspath("{}_spmask.pdf".format(fname))
in_4d_nii = nb.load(in_file)
orientation = nb.aff2axcodes(in_4d_nii.affine)
if in_mask:
mask_data = nb.load(in_mask).get_data()
a = np.where(mask_data != 0)
bbox = (
np.max(a[0]) - np.min(a[0]),
np.max(a[1]) - np.min(a[1]),
np.max(a[2]) - np.min(a[2]),
)
longest_axis = np.argmax(bbox)
# Input here is a binarized and intersected mask data from previous section
dil_mask = nd.binary_dilation(
mask_data, iterations=int(mask_data.shape[longest_axis] / 9)
)
rep = list(mask_data.shape)
rep[longest_axis] = -1
new_mask_2d = dil_mask.max(axis=longest_axis).reshape(rep)
rep = [1, 1, 1]
rep[longest_axis] = mask_data.shape[longest_axis]
new_mask_3d = np.logical_not(np.tile(new_mask_2d, rep))
else:
new_mask_3d = np.zeros(in_4d_nii.shape[:3]) == 1
if orientation[0] in ["L", "R"]:
new_mask_3d[0:2, :, :] = True
new_mask_3d[-3:-1, :, :] = True
else:
new_mask_3d[:, 0:2, :] = True
new_mask_3d[:, -3:-1, :] = True
mask_nii = nb.Nifti1Image(
new_mask_3d.astype(np.uint8), in_4d_nii.affine, in_4d_nii.header
)
mask_nii.to_filename(out_file)
plot_roi(mask_nii, mean_img(in_4d_nii), output_file=out_plot)
return out_file, out_plot
def _mean(inlist):
import numpy as np
return np.mean(inlist)
def _parse_tqual(in_file):
import numpy as np
with open(in_file, "r") as fin:
lines = fin.readlines()
return np.mean([float(line.strip()) for line in lines if not line.startswith("++")])
def _parse_tout(in_file):
import numpy as np
data = np.loadtxt(in_file) # pylint: disable=no-member
return data.mean()
def _carpet_parcellation(segmentation, crown_mask):
"""Generate the union of two masks."""
from pathlib import Path
import numpy as np
import nibabel as nb
img = nb.load(segmentation)
lut = np.zeros((256,), dtype="uint8")
lut[100:201] = 1 # Ctx GM
lut[30:99] = 2 # dGM
lut[1:11] = 3 # WM+CSF
lut[255] = 4 # Cerebellum
# Apply lookup table
seg = lut[np.asanyarray(img.dataobj, dtype="uint16")]
seg[np.asanyarray(nb.load(crown_mask).dataobj, dtype=int) > 0] = 5
outimg = img.__class__(seg.astype("uint8"), img.affine, img.header)
outimg.set_data_dtype("uint8")
out_file = Path("segments.nii.gz").absolute()
outimg.to_filename(out_file)
return str(out_file)
def _get_tr(meta_dict):
return meta_dict.get("RepetitionTime", None)