# Copyright 2016 Netherlands eScience Center
#
# Licensed under the Apache License, Version 2.0 (the 'License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Similarity matrix using pytables carray"""
from __future__ import absolute_import, print_function
from math import log10, ceil, floor
try:
# for Python >3.3
from time import process_time
except ImportError:
from time import clock as process_time
import numpy as np
import pandas as pd
from progressbar import ProgressBar
from scipy.sparse import coo_matrix
import six
import tables
[docs]class FrozenSimilarityMatrix(object):
"""Frozen similarities matrix
Can retrieve whole column of a specific row fairly quickly.
Store as compressed dense matrix.
Due to compression the zeros use up little space.
Warning! Can not be enlarged.
Compared find performance FrozenSimilarityMatrix with SimilarityMatrix::
>>> from kripodb.db import FragmentsDb
>>> db = FragmentsDb('data/feb2016/Kripo20151223.sqlite')
>>> ids = [v[0] for v in db.cursor.execute('SELECT frag_id FROM fragments ORDER BY RANDOM() LIMIT 20')]
>>> from kripodb.frozen import FrozenSimilarityMatrix
>>> fdm = FrozenSimilarityMatrix('01-01_to_13-13.out.frozen.blosczlib.h5')
>>> from kripodb.hdf5 import SimilarityMatrix
>>> dm = SimilarityMatrix('data/feb2016/01-01_to_13-13.out.h5', cache_labels=True)
>>> %timeit list(dm.find(ids[0], 0.45, None))
... 1 loop, best of 3: 1.96 s per loop
>>> %timeit list(fdm.find(ids[0], 0.45, None))
... The slowest run took 6.21 times longer than the fastest. This could mean that an intermediate result is being cached.
... 10 loops, best of 3: 19.3 ms per loop
>>> ids = [v[0] for v in db.cursor.execute('SELECT frag_id FROM fragments ORDER BY RANDOM() LIMIT 20')]
>>> %timeit -n1 [list(fdm.find(v, 0.45, None)) for v in ids]
... 1 loop, best of 3: 677 ms per loop
>>> %timeit -n1 [list(dm.find(v, 0.45, None)) for v in ids]
... 1 loop, best of 3: 29.7 s per loop
Args:
filename (str): File name of hdf5 file to write or read similarity matrix from
mode (str): Can be 'r' for reading or 'w' for writing
**kwargs: Passed though to tables.open_file()
Attributes:
h5file (tables.File): Object representing an open hdf5 file
scores (tables.CArray): HDF5 Table that contains matrix
labels (tables.CArray): Table to look up label of fragment by id or id of fragment by label
"""
filters = tables.Filters(complevel=6, complib='blosc', shuffle=True)
def __init__(self, filename, mode='r', **kwargs):
self.h5file = tables.open_file(filename, mode, filters=self.filters, **kwargs)
self.score_precision = 2**16-1
if 'labels' in self.h5file.root:
self.labels = self.h5file.root.labels
else:
self.labels = None
if 'scores' in self.h5file.root:
self.scores = self.h5file.root.scores
else:
self.scores = None
self.cache_i2l = {}
self.cache_l2i = {}
if self.labels is not None:
self.build_label_cache()
[docs] def close(self):
"""Closes the hdf5file"""
self.h5file.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def find(self, query, cutoff, limit=None):
"""Find similar fragments to query.
Args:
query (str): Query fragment identifier
cutoff (float): Cutoff, similarity scores below cutoff are discarded.
limit (int): Maximum number of hits. Default is None for no limit.
Returns:
list[tuple[str,float]]: Hit fragment identifier and similarity score
"""
precision = float(self.score_precision)
precision10 = float(10**(floor(log10(precision))))
scutoff = int(cutoff * precision)
query_id = self.cache_l2i[query]
subjects = self.h5file.root.scores[query_id, ...]
filled_subjects_ids = subjects.nonzero()[0]
filled_subjects = [(i, subjects[i]) for i in filled_subjects_ids]
hits = [(self.cache_i2l[k], ceil(precision10 * v / precision) / precision10) for k, v in filled_subjects if v >= scutoff]
sorted_hits = sorted(hits, key=lambda r: r[1], reverse=True)
if limit is not None:
sorted_hits = sorted_hits[:limit]
return sorted_hits
def __getitem__(self, item):
"""Get all similarities of fragment or the similarity score between to 2 fragments.
Self is excluded in list of similarity scores.
Args:
item (str|Tuple[str, str]): Label of a fragment or tuple of 2 fragment labels
Returns:
list[tuple[str, float]]|float: list of (fragment_label, score) or the score
Raises:
KeyError: When item can not be found
"""
if isinstance(item, tuple):
return self._fetch_cell(item[0], item[1])
precision = float(self.score_precision)
precision10 = float(10**(floor(log10(precision))))
query_id = self.cache_l2i[item]
subjects = self.h5file.root.scores[query_id, ...]
hits = [(self.cache_i2l[k], ceil(precision10 * v / precision) / precision10) for k, v in enumerate(subjects) if k != query_id]
return hits
def __iter__(self):
"""
Yields: Tuple[str, str, float] Fragment id 1, Fragment id 2, similarity score of lower triangle of matrix
"""
precision = float(self.score_precision)
precision10 = float(10**(floor(log10(precision))))
for row_id, row in enumerate(self.h5file.root.scores.iterrows()):
row_label = self.cache_i2l[row_id]
# loop through raw scores below triangle
for col_id, raw_score in enumerate(row[:row_id]):
if raw_score == 0:
# skip if below cutoff
continue
col_label = self.cache_i2l[col_id]
score = ceil(precision10 * raw_score / precision) / precision10
yield col_label, row_label, score
def _fetch_cell(self, frag_label1, frag_label2):
frag_id1 = self.cache_l2i[frag_label1]
frag_id2 = self.cache_l2i[frag_label2]
if frag_id1 == frag_id2:
return 1.0
raw_score = self.h5file.root.scores[frag_id1, frag_id2]
precision = float(self.score_precision)
precision10 = float(10**(floor(log10(precision))))
return ceil(precision10 * raw_score / precision) / precision10
def build_label_cache(self):
self.cache_i2l = {k: v.decode() for k, v in enumerate(self.labels)}
self.cache_l2i = {v: k for k, v in self.cache_i2l.items()}
[docs] def from_pairs(self, similarity_matrix, frame_size, limit=None, single_sided=False):
"""Fills self with matrix which is stored in pairs.
Also known as COOrdinate format, the 'ijv' or 'triplet' format.
Args:
similarity_matrix (kripodb.hdf5.SimilarityMatrix):
frame_size (int): Number of pairs to append in a single go
limit (int|None): Number of pairs to add, None for no limit, default is None.
single_sided (bool): If false add stored direction and reverse direction. Default is False.
time kripodb similarities freeze --limit 200000 -f 100000 data/feb2016/01-01_to_13-13.out.h5 percell.h5
47.2s
time kripodb similarities freeze --limit 200000 -f 100000 data/feb2016/01-01_to_13-13.out.h5 coo.h5
0.2m - 2m6s
.4m - 2m19s
.8m - 2m33s
1.6m - 2m48s
3.2m - 3m4s
6.4m - 3m50s
12.8m - 4m59s
25.6m - 7m27s
"""
nr_frags = len(similarity_matrix.labels)
six.print_('Filling labels ... ', end='')
id2labels = {v: k for k, v in similarity_matrix.labels.label2ids().items()}
id2nid = {v: k for k, v in enumerate(id2labels)}
labels2nid = [None] * nr_frags
for myid in id2nid:
labels2nid[id2nid[myid]] = np.string_(id2labels[myid])
self.labels = self.h5file.create_carray('/', 'labels', obj=labels2nid, filters=self.filters)
self.h5file.flush()
self.build_label_cache()
six.print_('Done')
six.print_('Filling matrix')
self.scores = self.h5file.create_carray('/', 'scores', atom=tables.UInt16Atom(),
shape=(nr_frags, nr_frags), chunkshape=(1, nr_frags),
filters=self.filters)
if limit is None:
limit = len(similarity_matrix.pairs)
self._ingest_pairs(similarity_matrix.pairs.table, id2nid, frame_size, limit, single_sided)
self.h5file.flush()
def _ingest_pairs(self, pairs, oid2nid, frame_size, limit, single_sided):
oid2nid_v = np.vectorize(oid2nid.get)
# whole pairs set does not fit in memory, so split it in frames with `frame_size` number of pairs.
for start in range(0, limit, frame_size):
stop = frame_size + start
t1 = process_time()
six.print_('Fetching pairs {0}:{1} of {2} ... '.format(start, stop, limit), end='', flush=True)
raw_frame = pairs.read(start=start, stop=stop)
t2 = process_time()
six.print_('{0}s, Parsing ... '.format(int(t2 - t1)), flush=True)
frame = self._translate_frame(raw_frame, oid2nid_v, single_sided)
t3 = process_time()
six.print_('Writing ... '.format(int(t3 - t2)), flush=True)
# alternate direction, to make use of cached chunks of prev frame
self._ingest_pairs_frame(frame)
del frame
t4 = process_time()
six.print_('{0}s, Done with {1}:{2} in {3}s'.format(int(t4 - t3), start, stop, int(t4 - t1)), flush=True)
def _translate_frame(self, raw_frame, oid2nid, single_sided):
bar = ProgressBar(max_value=4)
bar.update(0)
a = oid2nid(raw_frame['a'])
bar.update(1)
b = oid2nid(raw_frame['b'])
bar.update(2)
data = raw_frame['score']
nr_frags = len(self.labels)
smat = coo_matrix((data, (b, a)), shape=(nr_frags, nr_frags)).tocsc()
bar.update(3)
if not single_sided:
smat += smat.transpose()
bar.update(4)
return smat
def _ingest_pairs_frame(self, frame):
scores = self.scores
# loop each
bar = ProgressBar()
for row_idx in bar(six.moves.range(frame.shape[0])):
new_row = frame.getcol(row_idx)
if not new_row.nnz:
# new col has only zeros, skipping
continue
current_row = scores[row_idx, ...]
# write whole column, so chunk compression + shuffle only performed once per row_idx
scores[row_idx, ...] = current_row + new_row.toarray()[:, 0]
[docs] def to_pandas(self):
"""Pandas dataframe with labelled colums and rows.
Warning! Only use on matrices that fit in memory
Returns:
pd.DataFrame
"""
precision = float(self.score_precision)
decimals = int(log10(precision))
labels = [v.decode() for v in self.labels]
df = pd.DataFrame(self.scores.read(), index=labels, columns=labels)
df /= precision
df = df.round(decimals)
return df
[docs] def from_array(self, data, labels):
"""Fill matrix from 2 dimensional array
Args:
data (np.array): 2 dimensional square array with scores
labels (list): List of labels for each column and row index
"""
labels = [np.string_(d) for d in labels]
self.labels = self.h5file.create_carray('/', 'labels', obj=labels, filters=self.filters)
self.h5file.flush()
self.build_label_cache()
nr_frags = len(labels)
self.scores = self.h5file.create_carray('/', 'scores', atom=tables.UInt16Atom(),
shape=(nr_frags, nr_frags), chunkshape=(1, nr_frags),
filters=self.filters)
self.scores[0:nr_frags, 0:nr_frags] = (data * self.score_precision).astype('uint16')
[docs] def to_pairs(self, pairs):
"""Copies labels and scores from self to pairs matrix.
Args:
pairs (SimilarityMatrix):
"""
six.print_('copy labels', flush=True)
self.build_label_cache()
pairs.labels.update(self.cache_l2i)
six.print_('copy matrix to pairs', flush=True)
limit = self.scores.shape[0]
bar = ProgressBar()
for query_id in bar(six.moves.range(0, limit)):
subjects = self.scores[query_id, ...]
filled_subjects_ids = subjects.nonzero()[0]
filled_subjects = [(query_id, i, subjects[i]) for i in filled_subjects_ids if query_id < i]
if filled_subjects:
pairs.pairs.table.append(filled_subjects)
[docs] def count(self, frame_size=None, raw_score=False, lower_triangle=False):
"""Count occurrences of each score
Only scores are counted of the upper triangle or lower triangle.
Zero scores are skipped.
Args:
frame_size (int): Dummy argument to force same interface for thawed and frozen matrix
raw_score (bool): When true return raw int16 score else fraction score
lower_triangle (bool): When true return scores from lower triangle else return scores from upper triangle
Returns:
Tuple[(str, int)]: Score and number of occurrences
"""
nr_rows = self.scores.shape[0]
nr_bins = self.score_precision + 1
counts = np.zeros(shape=nr_bins, dtype=np.int64)
bar = ProgressBar()
for query_id in bar(six.moves.range(0, nr_rows)):
if lower_triangle:
subjects = self.scores[query_id, query_id + 1:]
else:
subjects = self.scores[query_id, :query_id + 1]
frame_counts = np.bincount(subjects[subjects.nonzero()], minlength=nr_bins)
counts += frame_counts
if raw_score:
for raw_score in counts.nonzero()[0]:
yield (raw_score, counts[raw_score])
else:
# Convert int score into fraction
precision = float(self.score_precision)
precision10 = float(10 ** (ceil(log10(precision))))
for raw_score in counts.nonzero()[0]:
score = ceil(precision10 * raw_score / precision) / precision10
count = counts[raw_score]
yield (score, count)