Source code for iCount.genomes.ensembl

""".. Line to protect from pydocstyle D205, D400.

Ensembl API
-----------

Functions to query and download data from the `Ensembl`_ FTP site.

.. _Ensembl:
    http://www.ensembl.org/index.html

"""

import gzip
import logging
import os
import re
import shutil

import pysam

import iCount

BASE_URL = 'ftp.ensembl.org'

MIN_RELEASE_SUPPORTED = 59
MAX_RELEASE_SUPPORTED = 88

LOGGER = logging.getLogger(__name__)


def _docstring_parameter(*sub):
    """Pass arguments to function docstrings (to be used as decorator)."""
    def dec(obj):  # pylint: disable=missing-docstring
        obj.__doc__ = obj.__doc__.format(*sub)
        return obj
    return dec


[docs]@_docstring_parameter(MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED) def releases(): """ Get list of available ENSEMBL releases. Only allows ENSEMBL releases {0}-{1}. Returns ------- list List of available releases """ ftp = iCount.genomes.get_ftp_instance(BASE_URL) # set current working directory ftp.cwd('pub') fetch = [int(item.strip('release-')) for item in ftp.nlst() if re.match(r'release-\d+', item)] ftp.quit() out = [i for i in fetch if i >= MIN_RELEASE_SUPPORTED and i <= MAX_RELEASE_SUPPORTED] out = sorted(out, reverse=True) LOGGER.info('There are %d ENSEMBL releases available: %s', len(out), ','.join(map(str, (out)))) return out
[docs]@_docstring_parameter(MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED) def species(release=MAX_RELEASE_SUPPORTED): """ Get list of available species for given ENSEMBL release. Parameters ---------- release : int Release number. Only ENSEMBL releases {0}-{1} are available. Returns ------- list List of species. """ if release is None: release = MAX_RELEASE_SUPPORTED iCount.log_inputs(LOGGER, level=logging.INFO) if not MIN_RELEASE_SUPPORTED <= release <= MAX_RELEASE_SUPPORTED: raise ValueError('Release should be a number between {} and {}.'.format( MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED)) ftp = iCount.genomes.get_ftp_instance(BASE_URL) ftp.cwd('pub/' + 'release-' + str(release) + '/fasta/') spec_list = sorted([item for item in ftp.nlst()]) ftp.quit() LOGGER.info('There are %d species available: %s', len(spec_list), ','.join(map(str, spec_list))) return spec_list
[docs]@_docstring_parameter(MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED) # pylint: disable=redefined-outer-name def annotation(species, release=MAX_RELEASE_SUPPORTED, out_dir=None, annotation=None): """ Download ENSEMBL annotation for given release/species. Parameters ---------- species : str Species latin name. release : int Release number. Only ENSEMBL releases {0}-{1} are available. out_dir : str Download to this directory (if not given, current working directory). annotation : str Annotation filename (must have .gz file extension). If not given, species.release.gtf.gz is used. If annotation is provided as absolute path, value of out_dir parameter is ignored and file is saved to given absolute path. Returns ------- str Downloaded annotation filename. """ iCount.log_inputs(LOGGER, level=logging.INFO) if not MIN_RELEASE_SUPPORTED <= release <= MAX_RELEASE_SUPPORTED: raise ValueError('Release should be a number between {} and {}.'.format( MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED)) if species not in iCount.genomes.ensembl.species(release): raise ValueError('Invalid species name.') if annotation: assert annotation.endswith(('.gtf', '.gtf.gz')) else: annotation = '{}.{}.gtf.gz'.format(species, release) # If absolute path is given, ignore out_dir: if os.path.isabs(annotation): if out_dir: LOGGER.info('out_dir parameter has been changed, since absolute ' 'path is provided by annotation parameter.') out_dir = os.path.dirname(annotation) annotation = os.path.basename(annotation) if not out_dir: out_dir = os.getcwd() if not os.path.isdir(out_dir): raise ValueError('Directory "{}" does not exist.'.format(out_dir)) ftp = iCount.genomes.get_ftp_instance(BASE_URL) server_dir = '/pub/release-{}/gtf/{}/'.format(release, species) ftp.cwd(server_dir) server_files = ftp.nlst() # Get the desired annotation file form list of all server files: annotation_file = '' regex = r'{}\.[\w\d]+\.\d+\.gtf\.gz'.format(species.capitalize()) for file_ in server_files: if re.match(regex, file_): annotation_file = file_ break if not annotation_file: LOGGER.info('No GTF file found for species %s, release %s', species, str(release)) return None # Download to file on disk saveas_fname = os.path.join(out_dir, annotation) LOGGER.info('Downloading GTF to: %s', saveas_fname) with open(saveas_fname, 'wb') as fhandle: ftp.retrbinary('RETR ' + annotation_file, fhandle.write) ftp.quit() LOGGER.info('Done.') return os.path.abspath(saveas_fname)
[docs]def chrom_length(fasta_in): """ Compute chromosome lengths of fasta file and store them into a file. More about the .fai file format can be found here: http://www.htslib.org/doc/faidx.html Parameters ---------- fasta_in : str Path to genome FASTA file (can be .gz). Returns ------- str Absolute path to output file. """ iCount.log_inputs(LOGGER, level=logging.INFO) temp = iCount.files.decompress_to_tempfile(fasta_in) pysam.faidx(temp) # pylint: disable=no-member fai_file = os.path.abspath(fasta_in + '.fai') shutil.move(temp + '.fai', fai_file) LOGGER.info('Fai file saved to : %s', fai_file) return fai_file
[docs]@_docstring_parameter(MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED) # pylint: disable=redefined-outer-name def genome(species, release=MAX_RELEASE_SUPPORTED, out_dir=None, genome=None, chromosomes=None): """ Download ENSEMBL genome for given release/species. Parameters ---------- species : str Species latin name. release : int Release number. Only ENSEMBL releases {0}-{1} are available. out_dir : str Download to this directory (if not given, current working directory). genome : str Genome filename (must have .gz file extension). If not given, species.release.fa.gz is used. If genome is provided as absolute path, value of out_dir parameter is ignored and file is saved to given absolute path. chromosomes : list_str If given, do not download the whole genome, but listed chromosomes only. Chromosomes can be given as strings or integers. Returns ------- str Downloaded genome/sequnce filename. """ iCount.log_inputs(LOGGER, level=logging.INFO) if species not in iCount.genomes.ensembl.species(): raise ValueError('Invalid species name.') if not MIN_RELEASE_SUPPORTED <= release <= MAX_RELEASE_SUPPORTED: raise ValueError('Release should be a number between {} and {}.'.format( MIN_RELEASE_SUPPORTED, MAX_RELEASE_SUPPORTED)) if genome: assert genome.endswith(('.fa', '.fasta', '.fa.gz', '.fasta.gz')) # If absolute path is given, ignore out_dir: if genome and os.path.isabs(genome): if out_dir: LOGGER.info('out_dir parameter has been changed, since absolute ' 'path is provided by genome parameter.') out_dir = os.path.dirname(genome) genome = os.path.basename(genome) if not out_dir: out_dir = os.getcwd() if not os.path.isdir(out_dir): raise ValueError('Directory "{}" does not exist.'.format(out_dir)) ftp = iCount.genomes.get_ftp_instance(BASE_URL) server_dir = '/pub/release-{}/fasta/{}/dna'.format(release, species) ftp.cwd(server_dir) all_fasta_files = ftp.nlst() filtered_files = [] # Recognize all "whole-chromosme" files regex = r'{}\.[\w\d\-]+\.(\d+\.)*dna\.chromosome\.' \ r'[a-zA-Z0-9]+\.fa\.gz'.format(species.capitalize()) for fasta_file in all_fasta_files: if re.match(regex, fasta_file): filtered_files.append(fasta_file) def sorting_func(name): """Sort names by chromosome order.""" key = name.split('.')[-3] if key == 'MT': return 'ZZ' # this makes mitohondrial "chromosome" the last one return key # Sort the filtered_files in correct order of chromosomes: filtered_files.sort(key=sorting_func) # If parameter chromosomes is given, take only a subset of all chromosomes if chromosomes: subset_list = [] for chromosome in chromosomes: regex = r'.*chromosome\.{}\.fa\.gz'.format(str(chromosome)) chromosome_found = False for file_ in filtered_files: if re.match(regex, file_): subset_list.append(file_) chromosome_found = True if not chromosome_found: # Provide user with a list of available chromosomes: chrom_regex = r'.*chromosome\.([a-zA-Z0-9]+)\.fa\.gz' available = [re.match(chrom_regex, file_).group(1) for file_ in filtered_files] raise ValueError('Could not find chromosome {}. Available chromosomes ' 'are: {}'.format(chromosome, ' '.join(available))) filtered_files = subset_list if not genome: genome = '{}.{}.chr{}.fa.gz'.format( species, release, '_'.join(map(str, chromosomes))) if not genome: genome = '{}.{}.fa.gz'.format(species, release) target_path = os.path.abspath(os.path.join(out_dir, genome)) LOGGER.info('Downloading FASTA file into: %s', target_path) temp_dir = os.path.join(iCount.TMP_ROOT, 'ensembl') if not os.path.exists(temp_dir): os.makedirs(temp_dir) for fname in filtered_files: LOGGER.debug('Downloading file: %s', fname) # Download all files to tempdir: temp_file = os.path.join(temp_dir, fname) with open(temp_file, 'wb') as fhandle: ftp.retrbinary('RETR ' + fname, fhandle.write) # Write the content into final file (target_fname) with gzip.open(temp_file, 'rb') as f_in, gzip.open(target_path, 'ab') as f_out: shutil.copyfileobj(f_in, f_out) # Clean up: delete temporary files: ftp.quit() shutil.rmtree(temp_dir) # Compute chromosome lengths: chrom_length(target_path) LOGGER.info('Done.') return target_path