# -*- coding: utf-8 -*-
from os import path
from re import sub, split, match
from six import StringIO
from tempfile import mkdtemp
from itertools import groupby
from collections import defaultdict
try:
from urllib.request import urlopen
from urllib.error import URLError
except ImportError:
from urllib2 import urlopen, URLError
from pandas import DataFrame, read_csv
from .GEOTypes import GSE, GSM, GPL, GDS, GDSSubset, GEODatabase
from six import iteritems
from . import utils
from .logger import geoparse_logger as logger
[docs]class UnknownGEOTypeException(Exception):
"""Raised when the GEO type that do not correspond to any known."""
pass
[docs]class NoEntriesException(Exception):
"""Raised when no entries could be found in the SOFT file."""
pass
[docs]def get_GEO(geo=None, filepath=None, destdir="./", how='full',
annotate_gpl=False, geotype=None, include_data=False, silent=False,
aspera=False, partial=None):
"""Get the GEO entry.
The GEO entry is taken directly from the GEO database or read it from SOFT
file.
Args:
geo (:obj:`str`): GEO database identifier.
filepath (:obj:`str`): Path to local SOFT file. Defaults to None.
destdir (:obj:`str`, optional): Directory to download data. Defaults to
None.
how (:obj:`str`, optional): GSM download mode. Defaults to "full".
annotate_gpl (:obj:`bool`, optional): Download the GPL annotation
instead of regular GPL. If not available, fallback to regular GPL
file. Defaults to False.
geotype (:obj:`str`, optional): Type of GEO entry. By default it is
inferred from the ID or the file name.
include_data (:obj:`bool`, optional): Full download of GPLs including
series and samples. Defaults to False.
silent (:obj:`bool`, optional): Do not print anything. Defaults to
False.
aspera (:obj:`bool`, optional): EXPERIMENTAL Download using Aspera
Connect. Follow Aspera instructions for further details. Defaults
to False.
partial (:obj:'iterable', optional): A list of accession IDs of GSMs
to be partially extracted from GPL, works only if a file/accession
is a GPL.
Returns:
:obj:`GEOparse.BaseGEO`: A GEO object of given type.
"""
if geo is None and filepath is None:
raise Exception("You have to specify filename or GEO accession!")
if geo is not None and filepath is not None:
raise Exception("You can specify filename or GEO accession - not both!")
if silent:
logger.setLevel(100) # More than critical
if filepath is None:
filepath, geotype = get_GEO_file(geo, destdir=destdir, how=how,
annotate_gpl=annotate_gpl,
include_data=include_data,
silent=silent,
aspera=aspera)
else:
if geotype is None:
geotype = path.basename(filepath)[:3]
logger.info("Parsing %s: " % filepath)
if geotype.upper() == "GSM":
return parse_GSM(filepath)
elif geotype.upper() == "GSE":
return parse_GSE(filepath)
elif geotype.upper() == 'GPL':
return parse_GPL(filepath, partial=partial)
elif geotype.upper() == 'GDS':
return parse_GDS(filepath)
else:
raise ValueError(("Unknown GEO type: %s. Available types: GSM, GSE, "
"GPL and GDS.") % geotype.upper())
[docs]def get_GEO_file(geo, destdir=None, annotate_gpl=False, how="full",
include_data=False, silent=False, aspera=False):
"""Download corresponding SOFT file given GEO accession.
Args:
geo (:obj:`str`): GEO database identifier.
destdir (:obj:`str`, optional): Directory to download data. Defaults to
None.
annotate_gpl (:obj:`bool`, optional): Download the GPL annotation
instead of regular GPL. If not available, fallback to regular GPL
file. Defaults to False.
how (:obj:`str`, optional): GSM download mode. Defaults to "full".
include_data (:obj:`bool`, optional): Full download of GPLs including
series and samples. Defaults to False.
silent (:obj:`bool`, optional): Do not print anything. Defaults to
False.
aspera (:obj:`bool`, optional): EXPERIMENTAL Download using Aspera
Connect. Follow Aspera instructions for further details. Defaults
to False.
Returns:
:obj:`2-tuple` of :obj:`str` and :obj:`str`: Path to downloaded file and
and the type of GEO object.
"""
geo = geo.upper()
geotype = geo[:3]
range_subdir = sub(r"\d{1,3}$", "nnn", geo)
if destdir is None:
tmpdir = mkdtemp()
logger.info("No destination directory specified."
" Temporary files will be downloaded at %s" % tmpdir)
else:
tmpdir = destdir
utils.mkdir_p(tmpdir)
if geotype == "GDS":
gseurl = ("ftp://ftp.ncbi.nlm.nih.gov/geo/"
"{root}/{range_subdir}/{record}/soft/{record_file}")
url = gseurl.format(root="datasets",
range_subdir=range_subdir,
record=geo,
record_file="%s.soft.gz" % geo)
filepath = path.join(tmpdir, "{record}.soft.gz".format(record=geo))
elif geotype == "GSE":
gseurl = ("ftp://ftp.ncbi.nlm.nih.gov/geo/"
"{root}/{range_subdir}/{record}/soft/{record_file}")
url = gseurl.format(root="series",
range_subdir=range_subdir,
record=geo,
record_file="%s_family.soft.gz" % geo)
filepath = path.join(tmpdir,
"{record}_family.soft.gz".format(record=geo))
elif geotype == "GSM":
gsmurl = ("http://www.ncbi.nlm.nih.gov/geo/query/acc.cgi"
"?targ=self&acc={record}&form=text&view={how}")
url = gsmurl.format(record=geo, how=how)
filepath = path.join(tmpdir, "{record}.txt".format(record=geo))
elif geotype == "GPL":
if annotate_gpl:
gplurl = ("ftp://ftp.ncbi.nlm.nih.gov/geo/"
"{root}/{range_subdir}/{record}/annot/{record_file}")
url = gplurl.format(root="platforms",
range_subdir=range_subdir,
record=geo,
record_file="%s.annot.gz" % geo)
filepath = path.join(tmpdir, "{record}.annot.gz".format(record=geo))
if not path.isfile(filepath):
try:
logger.info("Downloading %s to %s" % (url, filepath))
utils.download_from_url(url, filepath, silent=silent,
aspera=aspera)
return filepath, geotype
except URLError:
logger.info(("Annotations for %s are not available, "
"trying submitter GPL") % geo)
else:
logger.info("File already exist: using local version.")
return filepath, geotype
if include_data:
url = ("ftp://ftp.ncbi.nlm.nih.gov/geo/platforms/"
"{0}/{1}/soft/{1}_family.soft.gz").format(
range_subdir,
geo)
filepath = path.join(tmpdir,
"{record}_family.soft.gz".format(record=geo))
else:
gplurl = ("http://www.ncbi.nlm.nih.gov/geo/query/acc.cgi"
"?targ=self&acc={record}&form=text&view={how}")
url = gplurl.format(record=geo, how=how)
filepath = path.join(tmpdir, "{record}.txt".format(record=geo))
if not path.isfile(filepath):
logger.info("Downloading %s to %s" % (url, filepath))
utils.download_from_url(url, filepath, silent=silent, aspera=aspera)
else:
logger.info("File already exist: using local version.")
return filepath, geotype
else:
raise UnknownGEOTypeException("%s type is not known" % geotype)
if not path.isfile(filepath):
logger.info("Downloading %s to %s" % (url, filepath))
utils.download_from_url(url, filepath, silent=silent, aspera=aspera)
else:
logger.info("File already exist: using local version.")
return filepath, geotype
def __parse_entry(entry_line):
"""Parse the SOFT file entry name line that starts with '^', '!' or '#'.
Args:
entry_line (:obj:`str`): Line from SOFT to be parsed.
Returns:
:obj:`2-tuple`: Type of entry, value of entry.
"""
if entry_line.startswith("!"):
entry_line = sub(r"!\w*?_", '', entry_line)
else:
entry_line = entry_line.strip()[1:]
try:
entry_type, entry_name = [i.strip() for i in entry_line.split("=", 1)]
except ValueError:
entry_type = [i.strip() for i in entry_line.split("=", 1)][0]
entry_name = ''
return entry_type, entry_name
[docs]def parse_entry_name(nameline):
"""Parse line that starts with ^ and assign the name to it.
Args:
nameline (:obj:`str`): A line to process.
Returns:
:obj:`str`: Entry name.
"""
entry_type, entry_name = __parse_entry(nameline)
return entry_name
[docs]def parse_columns(lines):
"""Parse list of lines with columns description from SOFT file.
Args:
lines (:obj:`Iterable`): Iterator over the lines.
Returns:
:obj:`pandas.DataFrame`: Columns description.
"""
data = []
index = []
for line in lines:
line = line.rstrip()
if line.startswith("#"):
tmp = __parse_entry(line)
data.append(tmp[1])
index.append(tmp[0])
return DataFrame(data, index=index, columns=['description'])
[docs]def parse_GDS_columns(lines, subsets):
"""Parse list of line with columns description from SOFT file of GDS.
Args:
lines (:obj:`Iterable`): Iterator over the lines.
subsets (:obj:`dict` of :obj:`GEOparse.GDSSubset`): Subsets to use.
Returns:
:obj:`pandas.DataFrame`: Columns description.
"""
data = []
index = []
for line in lines:
line = line.rstrip()
if line.startswith("#"):
tmp = __parse_entry(line)
data.append(tmp[1])
index.append(tmp[0])
df = DataFrame(data, index=index, columns=['description'])
subset_ids = defaultdict(dict)
for subsetname, subset in iteritems(subsets):
for expid in subset.metadata["sample_id"][0].split(","):
try:
subset_type = subset.get_type()
subset_ids[subset_type][expid] = \
subset.metadata['description'][0]
except Exception as err:
logger.error("Error processing subsets: %s for subset %s" % (
subset.get_type(), subsetname))
return df.join(DataFrame(subset_ids))
[docs]def parse_table_data(lines):
""""Parse list of lines from SOFT file into DataFrame.
Args:
lines (:obj:`Iterable`): Iterator over the lines.
Returns:
:obj:`pandas.DataFrame`: Table data.
"""
# filter lines that do not start with symbols
data = "\n".join([i.rstrip() for i in lines
if not i.startswith(("^", "!", "#")) and i.rstrip()])
if data:
return read_csv(StringIO(data), index_col=None, sep="\t")
else:
return DataFrame()
[docs]def parse_GSM(filepath, entry_name=None):
"""Parse GSM entry from SOFT file.
Args:
filepath (:obj:`str` or :obj:`Iterable`): Path to file with 1 GSM entry
or list of lines representing GSM from GSE file.
entry_name (:obj:`str`, optional): Name of the entry. By default it is
inferred from the data.
Returns:
:obj:`GEOparse.GSM`: A GSM object.
"""
if isinstance(filepath, str):
with utils.smart_open(filepath) as f:
soft = []
has_table = False
for line in f:
if "_table_begin" in line or (not line.startswith(("^", "!", "#"))):
has_table = True
soft.append(line.rstrip())
else:
soft = []
has_table = False
for line in filepath:
if "_table_begin" in line or (not line.startswith(("^", "!", "#"))):
has_table = True
soft.append(line.rstrip())
if entry_name is None:
sets = [i for i in soft if i.startswith("^")]
if len(sets) > 1:
raise Exception("More than one entry in GPL")
if len(sets) == 0:
raise NoEntriesException(
"No entries found. Check the if accession is correct!")
entry_name = parse_entry_name(sets[0])
columns = parse_columns(soft)
metadata = parse_metadata(soft)
if has_table:
table_data = parse_table_data(soft)
else:
table_data = DataFrame()
gsm = GSM(name=entry_name,
table=table_data,
metadata=metadata,
columns=columns)
return gsm
[docs]def parse_GPL(filepath, entry_name=None, partial=None):
"""Parse GPL entry from SOFT file.
Args:
filepath (:obj:`str` or :obj:`Iterable`): Path to file with 1 GPL entry
or list of lines representing GPL from GSE file.
entry_name (:obj:`str`, optional): Name of the entry. By default it is
inferred from the data.
partial (:obj:'iterable', optional): A list of accession IDs of GSMs
to be partially extracted from GPL, works only if a file/accession
is a GPL.
Returns:
:obj:`GEOparse.GPL`: A GPL object.
"""
gsms = {}
gses = {}
gpl_soft = []
has_table = False
gpl_name = entry_name
database = None
if isinstance(filepath, str):
with utils.smart_open(filepath) as soft:
groupper = groupby(soft, lambda x: x.startswith("^"))
for is_new_entry, group in groupper:
if is_new_entry:
entry_type, entry_name = __parse_entry(next(group))
logger.debug("%s: %s" % (entry_type.upper(), entry_name))
if entry_type == "SERIES":
is_data, data_group = next(groupper)
gse_metadata = parse_metadata(data_group)
gses[entry_name] = GSE(name=entry_name,
metadata=gse_metadata)
elif entry_type == "SAMPLE":
if partial and entry_name not in partial:
continue
is_data, data_group = next(groupper)
gsms[entry_name] = parse_GSM(data_group, entry_name)
elif entry_type == "DATABASE":
is_data, data_group = next(groupper)
database_metadata = parse_metadata(data_group)
database = GEODatabase(name=entry_name,
metadata=database_metadata)
elif entry_type == "PLATFORM" or entry_type == "Annotation":
gpl_name = entry_name
is_data, data_group = next(groupper)
has_gpl_name = gpl_name or gpl_name is None
for line in data_group:
if ("_table_begin" in line or
not line.startswith(("^", "!", "#"))):
has_table = True
if not has_gpl_name:
if match("!Annotation_platform\s*=\s*", line):
gpl_name = split("\s*=\s*", line)[-1].strip()
has_gpl_name = True
gpl_soft.append(line)
else:
raise RuntimeError(
"Cannot parse {etype}. Unknown for GPL.".format(
etype=entry_type
))
else:
for line in filepath:
if "_table_begin" in line or (not line.startswith(("^", "!", "#"))):
has_table = True
gpl_soft.append(line.rstrip())
columns = None
try:
columns = parse_columns(gpl_soft)
except Exception:
pass
metadata = parse_metadata(gpl_soft)
if has_table:
table_data = parse_table_data(gpl_soft)
else:
table_data = DataFrame()
gpl = GPL(name=gpl_name,
gses=gses,
gsms=gsms,
table=table_data,
metadata=metadata,
columns=columns,
database=database
)
# link samples to series, if these were present in the GPL soft file
for gse_id, gse in gpl.gses.items():
for gsm_id in gse.metadata.get("sample_id", []):
if gsm_id in gpl.gsms:
gpl.gses[gse_id].gsms[gsm_id] = gpl.gsms[gsm_id]
return gpl
[docs]def parse_GSE(filepath):
"""Parse GSE SOFT file.
Args:
filepath (:obj:`str`): Path to GSE SOFT file.
Returns:
:obj:`GEOparse.GSE`: A GSE object.
"""
gpls = {}
gsms = {}
series_counter = 0
database = None
metadata = {}
gse_name = None
with utils.smart_open(filepath) as soft:
groupper = groupby(soft, lambda x: x.startswith("^"))
for is_new_entry, group in groupper:
if is_new_entry:
entry_type, entry_name = __parse_entry(next(group))
logger.debug("%s: %s" % (entry_type.upper(), entry_name))
if entry_type == "SERIES":
gse_name = entry_name
series_counter += 1
if series_counter > 1:
raise Exception(
"GSE file should contain only one series entry!")
is_data, data_group = next(groupper)
message = ("The key is not False, probably there is an "
"error in the SOFT file")
assert not is_data, message
metadata = parse_metadata(data_group)
elif entry_type == "SAMPLE":
is_data, data_group = next(groupper)
gsms[entry_name] = parse_GSM(data_group, entry_name)
elif entry_type == "PLATFORM":
is_data, data_group = next(groupper)
gpls[entry_name] = parse_GPL(data_group, entry_name)
elif entry_type == "DATABASE":
is_data, data_group = next(groupper)
database_metadata = parse_metadata(data_group)
database = GEODatabase(name=entry_name,
metadata=database_metadata)
else:
logger.error("Cannot recognize type %s" % entry_type)
gse = GSE(name=gse_name,
metadata=metadata,
gpls=gpls,
gsms=gsms,
database=database)
return gse
[docs]def parse_GDS(filepath):
"""Parse GDS SOFT file.
Args:
filepath (:obj:`str`): Path to GDS SOFT file.
Returns:
:obj:`GEOparse.GDS`: A GDS object.
"""
dataset_lines = []
subsets = {}
database = None
dataset_name = None
with utils.smart_open(filepath) as soft:
groupper = groupby(soft, lambda x: x.startswith("^"))
for is_new_entry, group in groupper:
if is_new_entry:
entry_type, entry_name = __parse_entry(next(group))
logger.debug("%s: %s" % (entry_type.upper(), entry_name))
if entry_type == "SUBSET":
is_data, data_group = next(groupper)
message = ("The key is not False, probably there is an "
"error in the SOFT file")
assert not is_data, message
subset_metadata = parse_metadata(data_group)
subsets[entry_name] = GDSSubset(name=entry_name,
metadata=subset_metadata)
elif entry_type == "DATABASE":
is_data, data_group = next(groupper)
message = ("The key is not False, probably there is an "
"error in the SOFT file")
assert not is_data, message
database_metadata = parse_metadata(data_group)
database = GEODatabase(name=entry_name,
metadata=database_metadata)
elif entry_type == "DATASET":
is_data, data_group = next(groupper)
dataset_name = entry_name
for line in data_group:
dataset_lines.append(line.rstrip())
else:
logger.error("Cannot recognize type %s" % entry_type)
metadata = parse_metadata(dataset_lines)
columns = parse_GDS_columns(dataset_lines, subsets)
table = parse_table_data(dataset_lines)
return GDS(name=dataset_name, metadata=metadata, columns=columns,
table=table, subsets=subsets, database=database)