Source code for iCount.externals.star

""".. Line to protect from pydocstyle D205, D400.

STAR aligner
------------

Interface to running STAR.
"""

import os
import logging
import subprocess

from threading import Thread
from queue import Queue

import iCount

LOGGER = logging.getLogger(__name__)


def _execute(cmd):
    """Give stdout and stderr in realtime when executing the cmd as iterator."""
    def readline_output(out, queue, name):  # pylint: disable=missing-docstring
        for line in iter(out.readline, ''):
            queue.put((name, line))
        out.close()
        queue.put((name, 'readline_output finished.'))

    popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                             universal_newlines=True)

    queue = Queue()
    Thread(target=readline_output, args=(popen.stdout, queue, 'stdout'), daemon=True).start()
    Thread(target=readline_output, args=(popen.stderr, queue, 'stderr'), daemon=True).start()

    done = 0
    while True:
        out, message = queue.get()
        if message == 'readline_output finished.':
            done += 1
        else:
            yield ('{}_line'.format(out), message)

        if done >= 2:
            break

    yield ('return_code', popen.wait())


[docs]def get_version(): """Get STAR version.""" args = ['STAR', '--version'] try: ver = subprocess.check_output(args, shell=False, universal_newlines=True) return str(ver).rstrip('\n\r') except (FileNotFoundError, subprocess.CalledProcessError): return None
[docs]def build_index(genome, genome_index, annotation='', overhang=100, overhang_min=8, threads=1, genome_sasparsed=1, genome_saindexnbases=14): """ Call STAR to generate genome index, which is used for mapping. Parameters ---------- genome : str Genome sequence to index. genome_index : str Output folder, where to store genome index. annotation : str Annotation that defines splice junctions. overhang : int Sequence length around annotated junctions to be used by STAR when constructing splice junction database. overhang_min : int Minimum overhang for unannotated junctions. threads : int Number of threads that STAR can use for generating index. genome_sasparsed : int STAR parameter genomeSAsparseD. Suffix array sparsity. Bigger numbers decrease RAM requirements at the cost of mapping speed reduction. Suggested values are 1 (30 GB RAM) or 2 (16 GB RAM). genome_saindexnbases : int STAR parameter genomeSAindexNbases. SA pre-indexing string length, typically between 10 and 15. Longer strings require more memory, but result in faster searches. Returns ------- int Star return code. """ iCount.log_inputs(LOGGER, level=logging.INFO) if not os.path.isdir(genome_index): raise FileNotFoundError('Output directory does not exist. Make sure it does.') LOGGER.info('Building genome index with STAR for genome %s', genome) genome_fname2 = iCount.files.decompress_to_tempfile( genome, 'starindex') args = [ 'STAR', '--runThreadN', '{:d}'.format(threads), '--genomeSAsparseD', '{:d}'.format(genome_sasparsed), '--genomeSAindexNbases', '{:d}'.format(genome_saindexnbases), '--runMode', 'genomeGenerate', '--genomeDir', '{:s}'.format(genome_index), '--genomeFastaFiles', '{:s}'.format(genome_fname2), '--alignSJoverhangMin', '{:d}'.format(overhang_min), '--outFileNamePrefix', '{:s}'.format(genome_index), ] if annotation: annotation2 = iCount.files.decompress_to_tempfile(annotation, 'starindex') args.extend([ '--sjdbGTFfile', annotation2, '--sjdbOverhang', '{:d}'.format(overhang), ]) else: annotation2 = annotation try: ret_code = 1 for name, value in _execute(args): if name == 'return_code' and isinstance(value, int): ret_code = value break elif name == 'stdout_line': LOGGER.info(value.strip()) elif name == 'stderr_lines': for line in value.split('\n'): LOGGER.error(line.strip()) finally: # remove temporary decompressed files if genome != genome_fname2: os.remove(genome_fname2) if annotation != annotation2: os.remove(annotation2) LOGGER.info('Done.') return ret_code
[docs]def map_reads(reads, genome_index, out_dir, annotation='', multimax=10, mismatches=2, threads=1, genome_load=False): """ Map FASTQ file reads to reference genome. Parameters ---------- reads : str Sequencing reads to map to genome. genome_index : str Folder with genome index. out_dir : str Output folder, where to store mapping results. annotation : str GTF annotation needed for mapping to splice-junctions. multimax : int Number of allowed multiple hits. mismatches : int Number of allowed mismatches. threads : int Number of threads that STAR can use for generating index. genome_load: bool Load genome into shared memory. Shared memory must be available in the system. See Chapter 3.3 in STAR manual. Returns ------- int Return code """ iCount.log_inputs(LOGGER, level=logging.INFO) if not os.path.isdir(genome_index): raise FileNotFoundError('Directory with genome index does not exist. Make sure it does.') if not os.path.isdir(out_dir): raise FileNotFoundError('Output directory does not exist. Make sure it does.') LOGGER.info('Mapping reads from %s', reads) sequences_fname2 = iCount.files.decompress_to_tempfile( reads, 'starmap') args = [ 'STAR', '--runMode', 'alignReads', '--runThreadN', '{:d}'.format(threads), '--genomeDir', '{:s}'.format(genome_index), '--readFilesIn', '{:s}'.format(sequences_fname2), ] if not out_dir.endswith('/'): out_dir += '/' args.extend([ '--outFileNamePrefix', '{:s}'.format(out_dir), '--outSAMprimaryFlag', 'AllBestScore', '--outFilterMultimapNmax', '{:d}'.format(multimax), '--outFilterMismatchNmax', '{:d}'.format(mismatches), '--alignEndsType', 'EndToEnd', # soft-clipping of starts and ends may produce multiple hits '--outSAMtype', 'BAM', 'SortedByCoordinate', '--outSAMunmapped', 'Within', 'KeepPairs', ]) if genome_load: args.extend([ '--genomeLoad', 'LoadAndRemove', # load genome into shared memory ]) if annotation: annotation2 = iCount.files.decompress_to_tempfile(annotation, 'starmap') args.extend([ '--sjdbGTFfile', annotation2, ]) else: annotation2 = annotation try: ret_code = 1 for name, value in _execute(args): if name == 'return_code' and isinstance(value, int): ret_code = value break elif name == 'stdout_line': LOGGER.info(value.strip()) elif name == 'stderr_lines': for line in value.split('\n'): LOGGER.error(line.strip()) finally: # remove temporary decompressed files if reads != sequences_fname2: os.remove(sequences_fname2) if annotation != annotation2: os.remove(annotation2) LOGGER.info('Done.') return ret_code