Source code for genometools.misc.functions

# Copyright (c) 2015, 2016 Florian Wagner
#
# This file is part of GenomeTools.
#
# GenomeTools is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License, Version 3,
# as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Miscellaneous functions that are useful in many different contexts.

"""

from __future__ import (absolute_import, division,
                        print_function, unicode_literals)
_oldstr = str
from builtins import *

import os
import io
import errno
import shutil
import sys
import bisect
import gzip
import logging
import contextlib
import subprocess as subproc
import locale
import ftplib
import hashlib

import six

if six.PY3:
    from urllib import parse as urlparse
else:
    import urlparse


import unicodecsv as csv
import requests

logger = logging.getLogger(__name__)


def try_open_gzip(path):
    fh = None
    try:
        next(gzip.open(path))
    except IOError:
        pass
    else:
        fh = gzip.open(path)
    return fh


[docs]def get_file_md5sum(path): """Calculate the MD5 hash for a file.""" with open(path, 'rb') as fh: h = str(hashlib.md5(fh.read()).hexdigest()) return h
@contextlib.contextmanager
[docs]def smart_open_read(path=None, mode='rb', encoding=None, try_gzip=False): """Open a file for reading or return ``stdin``. Adapted from StackOverflow user "Wolph" (http://stackoverflow.com/a/17603000). """ assert mode in ('r', 'rb') assert path is None or isinstance(path, (str, _oldstr)) assert isinstance(mode, (str, _oldstr)) assert encoding is None or isinstance(encoding, (str, _oldstr)) assert isinstance(try_gzip, bool) fh = None binfh = None gzfh = None if path is None: # open stdin fh = io.open(sys.stdin.fileno(), mode=mode, encoding=encoding) else: # open an actual file if try_gzip: # gzip.open defaults to mode 'rb' gzfh = try_open_gzip(path) if gzfh is not None: logger.debug('Opening gzip''ed file.') # wrap gzip stream binfh = io.BufferedReader(gzfh) if 'b' not in mode: # add a text wrapper on top logger.debug('Adding text wrapper.') fh = io.TextIOWrapper(binfh, encoding=encoding) else: fh = io.open(path, mode=mode, encoding=encoding) yield_fh = fh if fh is None: yield_fh = binfh try: yield yield_fh finally: # close all open files if fh is not None: # make sure we don't close stdin if fh.fileno() != sys.stdin.fileno(): fh.close() if binfh is not None: binfh.close() if gzfh is not None: gzfh.close()
@contextlib.contextmanager
[docs]def smart_open_write(path=None, mode='wb', encoding=None): """Open a file for writing or return ``stdout``. Adapted from StackOverflow user "Wolph" (http://stackoverflow.com/a/17603000). """ if path is not None: # open a file fh = io.open(path, mode=mode, encoding=encoding) else: # open stdout fh = io.open(sys.stdout.fileno(), mode=mode, encoding=encoding) #fh = sys.stdout try: yield fh finally: # make sure we don't close stdout if fh.fileno() != sys.stdout.fileno(): fh.close()
[docs]def test_dir_writable(path): """Test if we can write to a directory. Parameters ---------- dir: str The directory path. Returns ------- bool Whether the directory is writable or not. """ dir_ = os.path.dirname(path) if dir_ == '': dir_ = '.' return os.access(dir_, os.W_OK)
[docs]def test_file_writable(path): """Test if we can write to a file. Parameters ---------- path: str The file path. Returns ------- bool Whether the file is writable or not. """ if os.path.isfile(path): # file exists, can we modify it? try: with open(path, 'ab') as fh: pass except IOError: return False else: return True else: # file does not exist, can we write to the directory? return test_dir_writable(path)
[docs]def get_fize_size(path): """The the size of a file. Parameters ---------- path: str The file path. Returns ------- int The size of the file in bytes. """ return os.path.getsize(path)
[docs]def get_url_size(url): """Get the size of a URL. Note: Uses requests, so it does not work for FTP URLs. Source: StackOverflow user "Burhan Khalid". (http://stackoverflow.com/a/24585314/5651021) Parameters ---------- url : str The URL. Returns ------- int The size of the URL in bytes. """ r = requests.head(url, headers={'Accept-Encoding': 'identity'}) size = int(r.headers['content-length']) return size
[docs]def get_url_file_name(url): """Get the file name from an url Parameters ---------- url : str Returns ------- str The file name """ assert isinstance(url, (str, _oldstr)) return urlparse.urlparse(url).path.split('/')[-1]
[docs]def make_sure_dir_exists(dir_, create_subfolders=False): """Ensures that a directory exists. Adapted from StackOverflow users "Bengt" and "Heikki Toivonen" (http://stackoverflow.com/a/5032238). Parameters ---------- dir_: str The directory path. create_subfolders: bool, optional Whether to create any inexistent subfolders. [False] Returns ------- None Raises ------ OSError If a file system error occurs. """ assert isinstance(dir_, (str, _oldstr)) assert isinstance(create_subfolders, bool) try: if create_subfolders: os.makedirs(dir_) else: os.mkdir(dir_) except OSError as exception: if exception.errno != errno.EEXIST: raise
[docs]def get_file_size(path): """The the size of a file in bytes. Parameters ---------- path: str The path of the file. Returns ------- int The size of the file in bytes. Raises ------ IOError If the file does not exist. OSError If a file system error occurs. """ assert isinstance(path, (str, _oldstr)) if not os.path.isfile(path): raise IOError('File "%s" does not exist.', path) return os.path.getsize(path)
[docs]def get_file_checksum(path): """Get the checksum of a file (using ``sum``, Unix-only). This function is only available on certain platforms. Parameters ---------- path: str The path of the file. Returns ------- int The checksum. Raises ------ IOError If the file does not exist. """ if not (sys.platform.startswith('linux') or \ sys.platform in ['darwin', 'cygwin']): raise OSError('This function is not available on your platform.') assert isinstance(path, (str, _oldstr)) if not os.path.isfile(path): # not a file raise IOError('File "%s" does not exist.' %(path)) # calculate checksum sub = subproc.Popen('sum "%s"' %(path), bufsize=-1, shell=True, stdout=subproc.PIPE) stdoutdata = sub.communicate()[0] assert sub.returncode == 0 # in Python 3, communicate() returns bytes that need to be decoded encoding = locale.getpreferredencoding() stdoutstr = str(stdoutdata, encoding=encoding) file_checksum = int(stdoutstr.split(' ')[0]) logger.debug('Checksum of file "%s": %d', path, file_checksum) return file_checksum
[docs]def test_file_checksum(path, checksum): """Test if a file has a given checksum (using ``sum``, Unix-only). Parameters ---------- path: str The path of the file. checksum: int The checksum to compare. Returns ------- bool Whether or not the file has the given checksum. Raises ------ IOError If the file does not exist. """ assert isinstance(path, (str, _oldstr)) assert isinstance(checksum, int) # calculate file checksum and compare to given checksum file_checksum = get_file_checksum(path) logger.debug('File checksum: %d. Reference checksum: %d. Match: %s.', file_checksum, checksum, str(file_checksum == checksum)) return file_checksum == checksum
# @contextlib.contextmanager
[docs]def gzip_open_text(path, encoding=None): """Opens a plain-text file that may be gzip'ed. Parameters ---------- path : str The file. encoding : str, optional The encoding to use. Returns ------- file-like A file-like object. Notes ----- Generally, reading gzip'ed files with gzip.open is very slow, and it is preferable to pipe the file into the python script using ``gunzip -c``. The script then reads the file from stdin. """ if encoding is None: encoding = sys.getdefaultencoding() assert os.path.isfile(path) is_compressed = False try: gzip.open(path, mode='rb').read(1) except IOError: pass else: is_compressed = True if is_compressed: if six.PY2: import codecs zf = gzip.open(path, 'rb') reader = codecs.getreader(encoding) fh = reader(zf) else: fh = gzip.open(path, mode='rt', encoding=encoding) else: # the following works in Python 2.7, thanks to future fh = open(path, mode='r', encoding=encoding) return fh
[docs]def is_writable(path): """Tests if a file is writable.""" try: with open(path, 'a'): pass except: return False return True
[docs]def flatten(l): """Flattens a list of lists. Parameters ---------- l: list The list of lists. Returns ------- list The flattened list. """ # see http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python#comment10547502_952952 # use incomprensible list comprehension return [item for sublist in l for item in sublist]
[docs]def bisect_index(a, x): """ Find the leftmost index of an element in a list using binary search. Parameters ---------- a: list A sorted list. x: arbitrary The element. Returns ------- int The index. """ i = bisect.bisect_left(a, x) if i != len(a) and a[i] == x: return i raise ValueError
[docs]def argsort(seq): """ Returns a list of indices that would sort a list. Parameters ---------- seq: List The list. Returns ------- List[int] The list of indices that would sort the given list ``seq``. Notes ----- If the returned list of indices can be a NumPy array, use `numpy.lexsort` instead. If the given list ``seq`` is a NumPy array, use `numpy.argsort` instead. """ # see http://stackoverflow.com/questions/3382352/equivalent-of-numpy-argsort-in-basic-python/3382369#3382369 return sorted(range(len(seq)), key=seq.__getitem__)
[docs]def argmin(seq): """ Obtains the index of the smallest element in a list. Parameters ---------- seq: List The list. Returns ------- int The index of the smallest element. """ return argsort(seq)[0]
[docs]def argmax(seq): """ Obtains the index of the largest element in a list. Parameters ---------- seq: List The list Returns ------- int The index of the largest element. """ return argsort(seq)[-1]
[docs]def read_single(path, encoding = 'UTF-8'): """ Reads the first column of a tab-delimited text file. The file can either be uncompressed or gzip'ed. Parameters ---------- path: str The path of the file. enc: str The file encoding. Returns ------- List of str A list containing the elements in the first column. """ assert isinstance(path, (str, _oldstr)) data = [] with smart_open_read(path, mode='rb', try_gzip=True) as fh: reader = csv.reader(fh, dialect='excel-tab', encoding=encoding) for l in reader: data.append(l[0]) return data
[docs]def read_all(path, encoding='UTF-8'): """ Reads a tab-delimited text file. The file can either be uncompressed or gzip'ed. Parameters ---------- path: str The path of the file. enc: str, optional The file encoding. Returns ------- List of (tuple of str) A list, which each element containing the contents of a row (as a tuple). """ assert isinstance(path, (str, _oldstr)) data = [] with smart_open_read(path, mode='rb', try_gzip=True) as fh: reader = csv.reader(fh, dialect='excel-tab', encoding=encoding) for l in reader: data.append(tuple(l)) return data