# Copyright 2016 Netherlands eScience Center
#
# Licensed under the Apache License, Version 2.0 (the 'License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Similarity matrix using hdf5 as storage backend."""
from __future__ import absolute_import
from math import log10, ceil, floor
import numpy as np
from progressbar import ProgressBar
import tables
import six
[docs]class SimilarityMatrix(object):
"""Similarity matrix
Args:
filename (str): File name of hdf5 file to write or read similarity matrix from
mode (str): Can be 'r' for reading or 'w' for writing
expectedpairrows (int): Expected number of pairs to be added.
Required when similarity matrix is opened in write mode, helps optimize storage
expectedlabelrows (int): Expected number of labels to be added.
Required when similarity matrix is opened in write mode, helps optimize storage
cache_labels (bool): Cache labels, speed up label lookups
Attributes:
h5file (tables.File): Object representing an open hdf5 file
pairs (PairsTable): HDF5 Table that contains pairs
labels (LabelsLookup): Table to look up label of fragment by id or id of fragment by label
"""
filters = tables.Filters(complevel=6, complib='blosc')
def __init__(self, filename, mode='r', expectedpairrows=None, expectedlabelrows=None, cache_labels=False, **kwargs):
self.h5file = tables.open_file(filename, mode, filters=self.filters, **kwargs)
self.pairs = PairsTable(self.h5file, expectedpairrows)
self.labels = LabelsLookup(self.h5file, expectedlabelrows)
self.cache_i2l = {}
self.cache_l2i = {}
if cache_labels:
self._build_label_cache()
def _build_label_cache(self):
if not self.cache_l2i:
self.cache_l2i = self.labels.label2ids()
self.cache_i2l = {v: k for k, v in six.iteritems(self.cache_l2i)}
[docs] def close(self):
"""Closes the hdf5file"""
self.h5file.close()
[docs] def append(self, other):
"""Append data from other similarity matrix to me
Args:
other (SimilarityMatrix): Other similarity matrix
"""
if len(self.labels) == 0:
# copy labels when self has no labels
self.labels.append(other.labels)
if self.labels == other.labels:
# same id 2 labels mapping, can safely copy pairs
self.pairs.append(other.pairs)
else:
# different id 2 labels mapping, must remap labels of other into labels of self
# for labels missing in self, generate new id and add to self
self.labels.merge(other.labels)
# for other pairs map a,b to labels of other and map other labels to self id using self labels
self.pairs.update(other, self.labels.label2ids())
def __iter__(self):
self._build_label_cache()
for pair in self.pairs:
yield self.cache_i2l[pair['a']], self.cache_i2l[pair['b']], pair['score']
[docs] def update(self, similarities_iter, label2id):
"""Store pairs of fragment identifier with their similarity score and label 2 id lookup
Args:
similarities_iter (iterator): Iterator which yields (label1, label2, similarity_score)
label2id (dict): Dictionary with fragment label as key and fragment identifier as value.
"""
self.pairs.update(similarities_iter, label2id)
self.labels.update(label2id)
[docs] def find(self, query, cutoff, limit=None):
"""Find similar fragments to query.
Args:
query (str): Query fragment identifier
cutoff (float): Cutoff, similarity scores below cutoff are discarded.
limit (int): Maximum number of hits. Default is None for no limit.
Yields:
(str, float): Hit fragment idenfier and similarity score
"""
if self.cache_l2i:
frag_id = self.cache_l2i[query]
for hit_frag_id, score in self.pairs.find(frag_id, cutoff, limit):
yield self.cache_i2l[hit_frag_id], score
else:
frag_id = self.labels.by_label(query)
for hit_frag_id, score in self.pairs.find(frag_id, cutoff, limit):
yield self.labels.by_id(hit_frag_id), score
[docs] def count(self, frame_size, raw_score=False, lower_triangle=False):
"""Count occurrences of each score
Args:
frame_size (int): Size of matrix loaded each time. Larger requires more memory and smaller is slower.
raw_score (bool): Return raw int16 score or fraction score
lower_triangle (bool): Dummy argument to force same interface for thawed and frozen matrix
Returns:
(str, int): Score and number of occurrences
"""
return self.pairs.count(frame_size, raw_score)
[docs] def keep(self, other, keep):
"""Copy content of self to other and only keep given fragment labels and the labels they pair with
Args:
other (SimilarityMatrix): Writable matrix to fill
keep (set[str]): Fragment labels to keep
"""
frag_ids2keep = self.labels.by_labels(keep)
all_frag_ids2keep = self.pairs.keep(other.pairs, frag_ids2keep)
self.labels.keep(other.labels, all_frag_ids2keep)
[docs] def skip(self, other, skip):
"""Copy content of self to other and skip all given fragment labels
Args:
other (SimilarityMatrix): Writable matrix to fill
skip (set[str]): Fragment labels to skip
"""
frag_ids2skip = self.labels.by_labels(skip)
self.pairs.skip(other.pairs, frag_ids2skip)
self.labels.skip(other.labels, frag_ids2skip)
[docs]class AbstractSimpleTable(object):
"""Abstract wrapper around a HDF5 table
Args:
table (tables.Table): HDF5 table
append_chunk_size (int): Size of chunk to append in one go.
Defaults to 1e8, which when table description is 10bytes will require 2Gb during append.
Attributes
table (tables.Table): HDF5 table
append_chunk_size (int): Number of rows to read from other table during append.
"""
def __init__(self, table, append_chunk_size=int(1e8)):
self.table = table
self.append_chunk_size = append_chunk_size
[docs] def append(self, other):
"""Append rows of other table to self
Args:
other: Table of same type as self
"""
limit = len(other.table)
for start in range(0, limit, self.append_chunk_size):
stop = self.append_chunk_size + start
self.table.append(other.table.read(start=start, stop=stop))
def __len__(self):
"""
Returns:
int: Number of rows in table
"""
return len(self.table)
def __iter__(self):
return self.table.__iter__()
class SimilarityPair(tables.IsDescription):
"""Table description for similarity pair"""
a = tables.UInt32Col()
b = tables.UInt32Col()
score = tables.UInt16Col()
[docs]class PairsTable(AbstractSimpleTable):
"""Tabel to store similarity score of a pair of fragment fingerprints
When table does not exist in h5file it is created.
Args:
h5file (tables.File): Object representing an open hdf5 file
expectedrows (int): Expected number of pairs to be added.
Required when similarity matrix is opened in write mode, helps optimize storage
Attributes:
score_precision (int): Similarity score is a fraction,
the score is converted to an int by multiplying it with the precision
full_matrix (bool): Matrix is filled above and below diagonal.
"""
table_name = 'pairs'
filters = tables.Filters(complevel=6, complib='blosc')
def __init__(self, h5file, expectedrows=0):
if self.table_name in h5file.root:
table = h5file.root.__getattr__(self.table_name)
else:
table = h5file.create_table('/',
self.table_name,
SimilarityPair,
'Similarity pairs',
expectedrows=expectedrows)
super(PairsTable, self).__init__(table)
self.score_precision = 2 ** 16 - 1
@property
def full_matrix(self):
try:
return self.table.attrs['full_matrix']
except KeyError:
return False
@full_matrix.setter
def full_matrix(self, value):
self.table.attrs['full_matrix'] = value
[docs] def update(self, similarities_iter, label2id):
"""Store pairs of fragment identifier with their similarity score
Args:
similarities_iter (Iterator): Iterator which yields (label1, label2, similarity_score)
label2id (Dict): Lookup with fragment label as key and fragment identifier as value
"""
hit = self.table.row
for label1, label2, similarity in similarities_iter:
hit['a'] = label2id[label1]
hit['b'] = label2id[label2]
hit['score'] = int(similarity * self.score_precision)
hit.append()
self.table.flush()
[docs] def find(self, frag_id, cutoff, limit):
"""Find fragment hits which has a similarity score with frag_id above cutoff.
Args:
frag_id (int): query fragment identifier
cutoff (float): Cutoff, similarity scores below cutoff are discarded.
limit (int): Maximum number of hits. Default is None for no limit.
Returns:
List[Tuple]: Where first tuple value is hit fragment identifier and second value is similarity score
"""
precision = float(self.score_precision)
precision10 = float(10**(floor(log10(precision))))
scutoff = int(cutoff * precision)
hits = {}
query1 = '(a == {0}) & (score >= {1})'.format(frag_id, scutoff)
for row in self.table.where(query1):
hit_id, score = row[1], row[2]
score = ceil(precision10 * score / precision) / precision10
hits[hit_id] = score
if not self.full_matrix:
query2 = '(b == {0}) & (score >= {1})'.format(frag_id, scutoff)
for row in self.table.where(query2):
hit_id, score = row[0], row[2]
score = ceil(precision10 * score / precision) / precision10
hits[hit_id] = score
# highest score==most similar first
sorted_hits = sorted(six.iteritems(hits), reverse=True, key=lambda r: r[1])
if limit is not None:
sorted_hits = sorted_hits[:limit]
return sorted_hits
[docs] def append(self, other):
"""Append rows of other table to self
Args:
other: Table of same type as self
"""
super(PairsTable, self).append(other)
if self.score_precision is None:
self.score_precision = other.score_precision
def __iter__(self):
precision = float(self.score_precision)
precision10 = float(10**(floor(log10(precision))))
for pair in super(PairsTable, self).__iter__():
score = ceil(precision10 * pair['score'] / precision) / precision10
yield {'a': pair['a'], 'b': pair['b'], 'score': score}
[docs] def count(self, frame_size, raw_score=False):
"""Count occurrences of each score
Args:
frame_size (int): Size of matrix loaded each time. Larger requires more memory and smaller is slower.
raw_score (bool): Return raw int16 score or fraction score
Returns:
Tuple[(str, int)]: Score and number of occurrences
"""
# Count occurrences of each score by reading pairs table in frames
nr_rows = len(self.table)
nr_bins = self.score_precision + 1
counts = np.zeros(shape=nr_bins, dtype=np.int64)
bar = ProgressBar()
for start in bar(six.moves.range(0, nr_rows, frame_size)):
stop = frame_size + start
frame = self.table.read(start=start, stop=stop)
frame_counts = np.bincount(frame['score'], minlength=nr_bins)
counts += frame_counts
if raw_score:
for raw_score in counts.nonzero()[0]:
yield (raw_score, counts[raw_score])
else:
# Convert int score into fraction
precision = float(self.score_precision)
precision10 = float(10 ** (ceil(log10(precision))))
for raw_score in counts.nonzero()[0]:
score = ceil(precision10 * raw_score / precision) / precision10
count = counts[raw_score]
yield (score, count)
[docs] def keep(self, other, keep):
"""Copy pairs from self to other and keep given fragment identifiers and the identifiers they pair with.
Args:
other (PairsTable): Pairs table to fill
keep (set[int]): Fragment identifiers to keep
Returns:
set[int]: Fragment identifiers that have been copied to other
"""
all_frags2keep = set(keep)
hit = other.table.row
for row in self.table:
if row['a'] in keep and row['b'] in keep:
hit['a'] = row['a']
hit['b'] = row['b']
hit['score'] = row['score']
hit.append()
elif row['a'] in keep:
hit['a'] = row['a']
hit['b'] = row['b']
hit['score'] = row['score']
hit.append()
all_frags2keep.add(row['b'])
elif row['a'] in keep:
hit['a'] = row['a']
hit['b'] = row['b']
hit['score'] = row['score']
hit.append()
all_frags2keep.add(row['a'])
other.table.flush()
return all_frags2keep
[docs] def skip(self, other, skip):
"""Copy content from self to other and skip given fragment identifiers
Args:
other (PairsTable): Pairs table to fill
skip (set[int]): Fragment identifiers to skip
"""
hit = other.table.row
for row in self.table:
if row['a'] not in skip and row['b'] not in skip:
hit['a'] = row['a']
hit['b'] = row['b']
hit['score'] = row['score']
hit.append()
other.table.flush()
class Id2Label(tables.IsDescription):
"""Table description of id 2 label table."""
frag_id = tables.UInt32Col()
label = tables.StringCol(16)
[docs]class LabelsLookup(AbstractSimpleTable):
"""Table to look up label of fragment by id or id of fragment by label
When table does not exist in h5file it is created.
Args:
h5file (tables.File): Object representing an open hdf5 file
expectedrows (int): Expected number of pairs to be added.
Required when similarity matrix is opened in write mode, helps optimize storage
"""
table_name = 'labels'
filters = tables.Filters(complevel=6, complib='blosc')
def __init__(self, h5file, expectedrows=0):
if self.table_name in h5file.root:
table = h5file.root.__getattr__(self.table_name)
else:
table = h5file.create_table('/',
self.table_name,
Id2Label,
'Labels lookup',
expectedrows=expectedrows)
table.cols.frag_id.create_index(filters=self.filters)
table.cols.label.create_index(filters=self.filters)
super(LabelsLookup, self).__init__(table)
[docs] def by_id(self, frag_id):
"""Look up label of fragment by id
Args:
frag_id (int): Fragment identifier
Raises:
IndexError: When id of fragment is not found
Returns:
str: Label of fragment
"""
query = 'frag_id == {}'.format(frag_id)
result = self.table.where(query)
first_row = next(result)
label_col = first_row[1].decode()
return label_col
[docs] def by_label(self, label):
"""Look up id of fragment by label
Args:
label (str): Fragment label
Raises:
IndexError: When label of fragment is not found
Returns:
int: Fragment identifier
"""
query = 'label == b"{}"'.format(label)
result = self.table.where(query)
first_row = next(result)
id_col = first_row[0]
return id_col
[docs] def by_labels(self, labels):
"""Look up ids of fragments by label
Args:
labels (set[str]): Set of fragment labels
Raises:
IndexError: When label of fragment is not found
Returns:
set[int]: Set of fragment identifiers
"""
ids = set()
for frag_label, frag_id in six.iteritems(self.label2ids()):
if frag_label in labels:
ids.add(frag_id)
return ids
[docs] def label2ids(self):
"""Return whole table as a dictionary
Returns:
dict: Dictionary with label as key and frag_id as value.
"""
return {r['label'].decode(): r['frag_id'] for r in self.table}
[docs] def update(self, label2id):
"""Update labels lookup by adding labels in label2id.
Args:
label2id (dict): Dictionary with fragment label as key and fragment identifier as value.
"""
for label, frag_id in six.iteritems(label2id):
self.table.row['frag_id'] = frag_id
self.table.row['label'] = label
self.table.row.append()
self.table.flush()
def __eq__(self, other):
# same length
if len(self.table) != len(other.table):
return False
# same columns
if self.table.coldescrs != other.table.coldescrs:
return False
# same content
mydict = self.label2ids()
otherdict = other.label2ids()
return mydict == otherdict
[docs] def merge(self, label2id):
"""Merge label2id dict into self
When label does not exists an id is generated and the label/id is added.
When label does exist the id of the label in self is kept.
Args:
label2id (dict]): Dictionary with fragment label as key and fragment identifier as value.
Returns:
dict: Dictionary of label/id which where in label2id, but missing in self
"""
id_offset = max([r['frag_id'] for r in self.table]) + 1
mylabels = frozenset([r['label'].decode() for r in self.table])
missing_label2ids = {}
for row in label2id:
if row['label'] not in mylabels:
missing_label2ids[row['label']] = id_offset
id_offset += 1
self.update(missing_label2ids)
return missing_label2ids
def __iter__(self):
for r in self.table.__iter__():
yield {'frag_id': r['frag_id'], 'label': r['label'].decode()}
def _copy(self, other, condition):
hit = other.table.row
for row in self.table:
if condition(row['frag_id']):
hit['frag_id'] = row['frag_id']
hit['label'] = row['label']
hit.append()
other.table.flush()
[docs] def keep(self, other, keep):
"""Copy content of self to other and only keep given fragment identifiers
Args:
other (LabelsLookup): Labels table to fill
keep (set[int]): Fragment identifiers to keep
"""
self._copy(other, lambda d: d in keep)
[docs] def skip(self, other, skip):
"""Copy content of self to other and skip given fragment identifiers
Args:
other (LabelsLookup): Labels table to fill
skip (set[int]): Fragment identifiers to skip
"""
self._copy(other, lambda d: d not in skip)