Source code for pandas_plink._write

from pathlib import Path
from typing import Optional, Union

from numpy import arange, full
from pandas import DataFrame, array
from xarray import DataArray

from ._bed_write import write_bed

__all__ = ["write_plink1_bin"]


[docs]def write_plink1_bin( G: DataArray, bed: Union[str, Path], bim: Optional[Union[str, Path]] = None, fam: Optional[Union[str, Path]] = None, major: str = "variant", verbose: bool = True, ): """ Write PLINK 1 binary files into a data array. A PLINK 1 binary file set consists of three files: - BED: containing the genotype. - BIM: containing variant information. - FAM: containing sample information. The user must provide the genotype (dosage) via a :class:`xarray.DataArray` matrix with data type :const:`numpy.float32` or :const:`numpy.float64`. That matrix must have two named dimensions: **sample** and **variant**. The only allowed values for the genotype are: :const:`0`, :const:`1`, :const:`2`, and :data:`math.nan`. Examples -------- .. testsetup: >>> import os >>> import shutil >>> import tempfile >>> >>> old_path = os.getcwd() >>> tmp_path = tempfile.mkdtemp() >>> os.chdir(tmp_path) The following example produces a BED file with data. .. doctest:: >>> from xarray import DataArray >>> from pandas_plink import read_plink1_bin, write_plink1_bin >>> >>> G = DataArray( ... [[3.0, 2.0, 2.0], [0.0, 0.0, 1.0]], ... dims=["sample", "variant"], ... coords = dict( ... sample = ["boffy", "jolly"], ... fid = ("sample", ["humin"] * 2 ), ... ... variant = ["not", "sure", "what"], ... snp = ("variant", ["rs1", "rs2", "rs3"]), ... chrom = ("variant", ["1", "1", "2"]), ... a0 = ("variant", ['A', 'T', 'G']), ... a1 = ("variant", ['C', 'A', 'T']), ... ) ... ) >>> >>> print(G) <xarray.DataArray (sample: 2, variant: 3)> array([[3.00, 2.00, 2.00], [0.00, 0.00, 1.00]]) Coordinates: * sample (sample) <U5 'boffy' 'jolly' fid (sample) <U5 'humin' 'humin' * variant (variant) <U4 'not' 'sure' 'what' snp (variant) <U3 'rs1' 'rs2' 'rs3' chrom (variant) <U1 '1' '1' '2' a0 (variant) <U1 'A' 'T' 'G' a1 (variant) <U1 'C' 'A' 'T' >>> write_plink1_bin(G, "sample.bed", verbose=False) >>> >>> G = read_plink1_bin("sample.bed", verbose=False) >>> print(G) <xarray.DataArray 'genotype' (sample: 2, variant: 3)> dask.array<transpose, shape=(2, 3), dtype=float32, chunksize=(2, 3), chunktype=numpy.ndarray> Coordinates: (12/14) * sample (sample) object 'boffy' 'jolly' * variant (variant) <U8 'variant0' 'variant1' 'variant2' fid (sample) object 'humin' 'humin' iid (sample) object 'boffy' 'jolly' father (sample) object '?' '?' mother (sample) object '?' '?' ... ... chrom (variant) object '1' '1' '2' snp (variant) object 'rs1' 'rs2' 'rs3' cm (variant) float64 0.0 0.0 0.0 pos (variant) int32 0 0 0 a0 (variant) object 'A' 'T' 'G' a1 (variant) object 'C' 'A' 'T' The following example reads two BED files and two BIM files correspondig to chromosomes 11 and 12, and read a single FAM file whose filename is inferred from the BED filenames. It then saves the resulting matrix to disk. .. doctest:: >>> from os.path import join >>> from pandas_plink import read_plink1_bin, write_plink1_bin >>> from pandas_plink import get_data_folder >>> >>> G = read_plink1_bin(join(get_data_folder(), "chr*.bed"), verbose=False) >>> write_plink1_bin(G, "all.bed", verbose=False) >>> G = read_plink1_bin("all.bed", verbose=False) >>> print(G) <xarray.DataArray 'genotype' (sample: 14, variant: 1252)> dask.array<transpose, shape=(14, 1252), dtype=float32, chunksize=(14, 1024), chunktype=numpy.ndarray> Coordinates: (12/14) * sample (sample) object 'B001' 'B002' 'B003' ... 'B012' 'B013' 'B014' * variant (variant) <U11 'variant0' 'variant1' ... 'variant1251' fid (sample) object 'B001' 'B002' 'B003' ... 'B012' 'B013' 'B014' iid (sample) object 'B001' 'B002' 'B003' ... 'B012' 'B013' 'B014' father (sample) object '0' '0' '0' '0' '0' '0' ... '0' '0' '0' '0' '0' '0' mother (sample) object '0' '0' '0' '0' '0' '0' ... '0' '0' '0' '0' '0' '0' ... ... chrom (variant) object '11' '11' '11' '11' '11' ... '12' '12' '12' '12' snp (variant) object '316849996' '316874359' ... '373081507' cm (variant) float64 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 pos (variant) int32 157439 181802 248969 ... 27163741 27205125 27367844 a0 (variant) object 'C' 'G' 'G' 'C' 'C' 'T' ... 'A' 'G' 'A' 'T' 'G' a1 (variant) object 'T' 'C' 'C' 'T' 'T' 'A' ... 'G' 'A' 'T' 'C' 'A' .. testcleanup:: >>> os.chdir(old_path) >>> shutil.rmtree(tmp_path, ignore_errors=True) Parameters ---------- G Genotype matrix with metainformation about samples and variants. bed Path to a BED file. bim Path to a BIM file.It defaults to :const:`None`, in which case it will try to be inferred. fam Path to a FAM file. It defaults to :const:`None`, in which case it will try to be inferred. major It can be either :const:`"sample"` or :const:`"variant"` (recommended and default). Specify the matrix layout on the BED file. verbose :const:`True` for progress information; :const:`False` otherwise. """ if G.ndim != 2: raise ValueError("G has to be bidimensional") if set(list(G.dims)) != set(["sample", "variant"]): raise ValueError("G has to have both `sample` and `variant` dimensions.") if major not in ["sample", "variant"]: raise ValueError("Major can be either `sample` or `variant`.") G = G.transpose("sample", "variant") bed = Path(bed) if bim is None: bim = bed.with_suffix(".bim") if fam is None: fam = bed.with_suffix(".fam") bim = Path(bim) fam = Path(fam) G = _fill_sample(G) G = _fill_variant(G) write_bed(bed, G, major, verbose) _echo("Writing FAM... ", end="", disable=not verbose) _write_fam(fam, G) _echo("done.", disable=not verbose) _echo("Writing BIM... ", end="", disable=not verbose) _write_bim(bim, G) _echo("done.", disable=not verbose)
def _echo(msg: str, end: str = "\n", disable: bool = False): if not disable: print(msg, end=end, flush=True) def _fill_sample(G: DataArray) -> DataArray: from ._type import fam nsamples = G.sample.shape[0] if "fid" not in G.sample.coords: G = G.assign_coords(fid=("sample", G.sample.values)) if "iid" not in G.sample.coords: G = G.assign_coords(iid=("sample", G.sample.values)) if "father" not in G.sample.coords: G = G.assign_coords(father=("sample", array(["?"] * nsamples, fam["father"]))) if "mother" not in G.sample.coords: G = G.assign_coords(mother=("sample", array(["?"] * nsamples, fam["mother"]))) if "gender" not in G.sample.coords: G = G.assign_coords(gender=("sample", array(["0"] * nsamples, fam["gender"]))) if "trait" not in G.sample.coords: G = G.assign_coords(trait=("sample", array(["-0"] * nsamples, fam["trait"]))) return G def _fill_variant(G: DataArray) -> DataArray: from ._type import bim nvariants = G.variant.shape[0] if "chrom" not in G.variant.coords: G = G.assign_coords(chrom=("variant", array(["?"] * nvariants, bim["chrom"]))) if "snp" not in G.variant.coords: G = G.assign_coords(snp=("variant", array(arange(nvariants), bim["snp"]))) if "cm" not in G.variant.coords: G = G.assign_coords(cm=("variant", full(nvariants, 0.0, bim["cm"]))) if "pos" not in G.variant.coords: G = G.assign_coords(pos=("variant", full(nvariants, 0.0, bim["pos"]))) if "a0" not in G.variant.coords: G = G.assign_coords(a0=("variant", array(["?"] * nvariants, bim["a0"]))) if "a1" not in G.variant.coords: G = G.assign_coords(a1=("variant", array(["?"] * nvariants, bim["a1"]))) return G def _write_fam(filepath: Path, G: DataArray): from ._type import fam df = DataFrame() cols = [ ("fid", fam["fid"]), ("iid", fam["iid"]), ("father", fam["father"]), ("mother", fam["mother"]), ("gender", fam["gender"]), ("trait", fam["trait"]), ] for col, col_type in cols: df[col] = G.sample[col].values df[col] = df[col].astype(col_type) df.to_csv( filepath, index=False, sep="\t", header=False, encoding="ascii", line_terminator="\n", ) def _write_bim(filepath: Path, G: DataArray): from ._type import bim df = DataFrame() cols = [ ("chrom", bim["chrom"]), ("snp", bim["snp"]), ("cm", bim["cm"]), ("pos", bim["pos"]), ("a0", bim["a0"]), ("a1", bim["a1"]), ] for col, col_type in cols: df[col] = G.variant[col].values df[col] = df[col].astype(col_type) df.to_csv( filepath, index=False, sep="\t", header=False, encoding="ascii", line_terminator="\n", )