Source code for kripodb.db

# Copyright 2016 Netherlands eScience Center
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Fragments and fingerprints sqlite based data storage.

Registers `BitMap` and `molblockgz` data types in sqlite.
"""

from __future__ import absolute_import
from collections import MutableMapping
import sqlite3
import logging
import zlib
import re

import blosc
from pyroaring import BitMap
from rdkit.Chem import MolToMolBlock, MolFromMolBlock, MolToSmiles
from rdkit.Chem.rdchem import Mol
import six

ATTR_NUMBER_OF_BITS = 'number_of_bits'


[docs]def adapt_BitMap(ibs): """Convert BitMap to it's serialized format Args: ibs (BitMap): bitset Examples: Serialize BitMap >>> adapt_BitMap(BitMap([1, 2, 3, 4])) 'x\x9c\x93c@\x05\x00\x01\xf0\x00\x1f' Returns: str: serialized BitMap """ return sqlite3.Binary(blosc.compress(ibs.serialize(), cname='zstd'))
[docs]def convert_BitMap(s): """Convert serialized BitMap to BitMap Args: s (str): serialized BitMap Examples: Deserialize BitMap >>> ibs = convert_BitMap('x\x9c\x93c@\x05\x00\x01\xf0\x00\x1f') BitMap([1, 2, 3, 4]) Returns: BitMap: bitset """ return BitMap.deserialize(blosc.decompress(s))
[docs]def adapt_molblockgz(mol): """Convert RDKit molecule to compressed molblock Args: mol (rdkit.Chem.Mol): molecule Returns: str: Compressed molblock """ molblock = MolToMolBlock(mol).encode() return zlib.compress(molblock)
[docs]def convert_molblockgz(molgz): """Convert compressed molblock to RDKit molecule Args: molgz: (str) zlib compressed molblock Returns: rdkit.Chem.Mol: molecule """ return MolFromMolBlock(zlib.decompress(molgz))
sqlite3.register_adapter(BitMap, adapt_BitMap) sqlite3.register_converter('BitMap', convert_BitMap) sqlite3.register_adapter(Mol, adapt_molblockgz) sqlite3.register_converter('molblockgz', convert_molblockgz)
[docs]class FastInserter(object): """Use with to make inserting faster, but less safe By setting journal mode to WAL and turn synchronous off. Args: cursor (sqlite3.Cursor): Sqlite cursor Examples: >>> with FastInserter(cursor): cursor.executemany('INSERT INTO table VALUES (?), rows)) """ def __init__(self, cursor): self.cursor = cursor def __enter__(self): # increase insert speed, this is less safe self.cursor.connection.commit() self.cursor.execute('PRAGMA journal_mode=WAL') self.cursor.execute('PRAGMA synchronous=OFF') def __exit__(self, exc_type, exc_val, exc_tb): # switch back to default journal, so db file can be read-only and is safe again self.cursor.connection.commit() self.cursor.execute('PRAGMA journal_mode=DELETE') self.cursor.execute('PRAGMA synchronous=FULL')
[docs]class SqliteDb(object): """Wrapper around a sqlite database connection Database is created if it does not exist. Args: filename (str): Sqlite filename Attributes: connection (sqlite3.Connection): Sqlite connection cursor (sqlite3.Cursor): Sqlite cursor """ def __init__(self, filename): self.filename = filename self.connection = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES) # sqlite3 defaults to unicode as text_factory, unicode can't be used for byte string self.connection.text_factory = str self.connection.row_factory = sqlite3.Row self.cursor = self.connection.cursor() self.create_tables() def __enter__(self): return self def __exit__(self, type, value, traceback): self.close()
[docs] def commit(self): """Commit pending changes""" self.connection.commit()
[docs] def close(self): """Close database""" self.connection.close()
[docs] def create_tables(self): """Abstract method which is called after connecting to database so tables can be created. Use `CREATE TABLE IF NOT EXISTS ...` in method to prevent duplicate create errors. """ raise NotImplementedError("Please Implement this method")
def _row2fragment(row): fragment = {} for idx, v in enumerate(row.keys()): fragment[v] = row[idx] return fragment
[docs]class FragmentsDb(SqliteDb): """Fragments database""" select_sql = '''SELECT f.rowid, * FROM fragments f LEFT JOIN pdbs USING (pdb_code, prot_chain) LEFT JOIN molecules USING (frag_id)'''
[docs] def create_tables(self): """Create tables if they don't exist""" self.cursor.execute('''CREATE TABLE IF NOT EXISTS fragments ( frag_id TEXT PRIMARY KEY, frag_nr INT NOT NULL, pdb_code TEXT NOT NULL, prot_chain TEXT NOT NULL, het_chain TEXT NOT NULL, het_code TEXT NOT NULL, het_seq_nr INT, atom_codes TEXT, hash_code TEXT, nr_r_groups INT )''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS molecules ( frag_id TEXT PRIMARY KEY, smiles TEXT, mol molblockgz )''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS pdbs ( pdb_code TEXT NOT NULL, prot_chain TEXT NOT NULL, pdb_title TEXT, prot_name TEXT, uniprot_acc TEXT, uniprot_name TEXT, ec_number TEXT, PRIMARY KEY (pdb_code, prot_chain) )''')
[docs] def add_molecules(self, mols): """Adds molecules to to molecules table. Args: mols (list[rdkit.Chem.Mol]): List of molecules """ with FastInserter(self.cursor): for mol in mols: self.add_molecule(mol)
[docs] def add_pdbs(self, pdbs): """Adds pdb meta data to to pdbs table. Args: pdbs (Iterable[Dict]): List of pdb meta data """ rows = self.cursor.execute('SELECT pdb_code || prot_chain FROM fragments') pdbs_in_fragments = frozenset([r[0] for r in rows]) with FastInserter(self.cursor): for pdb in pdbs: if pdb['structureId'].lower() + pdb['chainId'] in pdbs_in_fragments: self.add_pdb(pdb)
[docs] def add_fragments_from_shelve(self, myshelve, skipdups=False): """Adds fragments from shelve to fragments table. Also creates index on pdb_code column. Args: myshelve (Dict[Fragment]): Dictionary with fragment identifier as key and fragment as value. skipdups (bool): Skip duplicates, instead of dieing one first duplicate """ with FastInserter(self.cursor): for k, v in six.iteritems(myshelve): self.add_fragment_from_shelve(k, v, skipdups) self.cursor.execute('CREATE INDEX IF NOT EXISTS fragments_pdb_code_i ON fragments (pdb_code)')
[docs] def add_molecule(self, mol): """Adds molecule to molecules table Uses the name of the molecule as the primary key. Args: mol (rdkit.Chem.AllChem.Mol): the rdkit molecule """ sql = '''INSERT OR REPLACE INTO molecules (frag_id, smiles, mol) VALUES (?, ?, ?)''' if mol is None: logging.warning('Empty molecule, skipping') return self.cursor.execute(sql, ( mol.GetProp('_Name'), MolToSmiles(mol), mol, )) self.connection.commit()
def add_fragment_from_shelve(self, frag_id, fragment, skipdups=False): splitted_frag_id = frag_id.split('-') if len(splitted_frag_id) != 3: logging.warning('Weird id {}, skipping'.format(frag_id)) return try: frag_nr = int(splitted_frag_id[2].replace('frag', '')) except ValueError: logging.warning('Weird id {}, skipping'.format(frag_id)) return lig_id = fragment['ligID'].split('-') het_seq_nr = int(re.sub('[A-Z]$', '', lig_id[3])) frag_id = frag_id.replace('-', '_') try: self.add_fragment( frag_id=frag_id, pdb_code=splitted_frag_id[0], prot_chain=lig_id[1], het_code=splitted_frag_id[1], het_seq_nr=het_seq_nr, het_chain=lig_id[4], frag_nr=frag_nr, hash_code=fragment['hashcode'], atom_codes=fragment['atomCodes'], nr_r_groups=int(fragment['numRgroups']), ) except sqlite3.IntegrityError as e: logging.warning('Duplicate ID: {}, skipping'.format(frag_id)) if not skipdups: raise e
[docs] def add_fragment(self, frag_id, pdb_code, prot_chain, het_code, frag_nr, atom_codes, hash_code, het_chain, het_seq_nr, nr_r_groups): """Add fragment to database Args: frag_id (str): Fragment identifier pdb_code (str): Protein databank identifier prot_chain (str): Major chain of pdb on which pharmacophore is based het_code (str): Ligand/Hetero code frag_nr (int): Fragment number, whole ligand has number 1, fragments are >1 atom_codes (str): Comma separated list of HETATOM atom names which make up the fragment (hydrogens are excluded) hash_code (str): Unique identifier for fragment het_chain (str): Chain ligand is part of het_seq_nr (int): Residue sequence number of ligand the fragment is a part of nr_r_groups (int): Number of R groups in fragment """ sql = '''INSERT INTO fragments ( frag_id, pdb_code, prot_chain, het_code, frag_nr, atom_codes, hash_code, het_chain, het_seq_nr, nr_r_groups ) VALUES ( :frag_id, :pdb_code, :prot_chain, :het_code, :frag_nr, :atom_codes, :hash_code, :het_chain, :het_seq_nr, :nr_r_groups )''' fragment_row = { 'frag_id': frag_id, 'pdb_code': pdb_code, 'prot_chain': prot_chain, 'het_code': het_code, 'frag_nr': frag_nr, 'atom_codes': atom_codes, 'hash_code': hash_code, 'het_chain': het_chain, 'het_seq_nr': het_seq_nr, 'nr_r_groups': nr_r_groups, } self.cursor.execute(sql, fragment_row)
def add_pdb(self, pdb): sql = '''INSERT OR REPLACE INTO pdbs ( pdb_code, prot_chain, pdb_title, prot_name, uniprot_acc, uniprot_name, ec_number ) VALUES ( :pdb_code, :prot_chain, :pdb_title, :prot_name, :uniprot_acc, :uniprot_name, :ec_number )''' pdb2col = { 'structureId': 'pdb_code', 'chainId': 'prot_chain', 'structureTitle': 'pdb_title', 'compound': 'prot_name', 'uniprotAcc': 'uniprot_acc', 'uniprotRecommendedName': 'uniprot_name', 'ecNo': 'ec_number', } row = {pdb2col[k]: v for k, v in six.iteritems(pdb)} row['pdb_code'] = row['pdb_code'].lower() self.cursor.execute(sql, row) def __getitem__(self, key): """Retrieve fragment based on it's identifier. Args: key (str): Fragment identifier Returns: Fragment """ sql = self.select_sql + 'WHERE frag_id=?' self.cursor.execute(sql, (key,)) row = self.cursor.fetchone() if row is None: raise KeyError(key) return _row2fragment(row)
[docs] def by_pdb_code(self, pdb_code): """Retrieve fragments which are part of a PDB structure. Args: pdb_code (str): PDB code Returns: List[Fragment]: List of fragments Raises: LookupError: When pdb_code could not be found """ fragments = [] sql = self.select_sql + 'WHERE pdb_code=? ORDER BY frag_id' for row in self.cursor.execute(sql, (pdb_code.lower(),)): fragments.append(_row2fragment(row)) if len(fragments) == 0: raise LookupError(pdb_code) return fragments
[docs] def id2label(self): """Lookup table of fragments from an number to a label. Returns: SqliteDict """ return SqliteDict(self.connection, 'fragments', 'rowid', 'frag_id')
[docs] def label2id(self): """Lookup table of fragments from an label to a number. Returns: SqliteDict """ return SqliteDict(self.connection, 'fragments', 'frag_id', 'rowid')
def __len__(self): self.cursor.execute('SELECT count(*) FROM fragments') row = self.cursor.fetchone() return row[0] def __iter__(self): self.cursor.execute(self.select_sql) for row in self.cursor.fetchall(): yield _row2fragment(row)
[docs] def is_ligand_stored(self, pdb_code, het_code): """Check whether ligand is already in database Args: pdb_code (str): Protein databank identifier het_code (str): Ligand/hetero identifier Returns: bool """ sql = 'SELECT 1 FROM fragments WHERE pdb_code=? AND het_code=?' self.cursor.execute(sql, (pdb_code.lower(), het_code)) res = self.cursor.fetchone() return res is not None
[docs]class FingerprintsDb(SqliteDb): """Fingerprints database"""
[docs] def create_tables(self): self.cursor.execute('''CREATE TABLE IF NOT EXISTS bitsets ( frag_id TEXT PRIMARY KEY, bitset BitMap )''') self.cursor.execute('''CREATE TABLE IF NOT EXISTS attributes ( key TEXT PRIMARY KEY, value TEXT )''')
[docs] def as_dict(self, number_of_bits=None): """Returns a dict-like object to query and alter fingerprints db Args: number_of_bits (Optional[int]): Number of bits that all fingerprints have Returns: BitMapDict """ return IntbitsetDict(self, number_of_bits)
[docs]class SqliteDict(MutableMapping): """Dict-like object of 2 columns of a sqlite table. Can be used to query and alter the table. Args: connection (sqlite3.Connection): Sqlite connection table_name (str): Table name key_column (str): Column name used as key value_column (str): Column name used as value Attributes: connection (sqlite3.Connection): Sqlite connection cursor (sqlite3.Cursor): Sqlite cursor """ def __init__(self, connection, table_name, key_column, value_column): self.connection = connection self.cursor = connection.cursor() kwargs = { 'key_column': key_column, 'table_name': table_name, 'value_column': value_column } self.sqls = { 'iter': 'SELECT {key_column} FROM {table_name}'.format(**kwargs), 'getitem': 'SELECT {value_column} FROM {table_name} WHERE {key_column}=?'.format(**kwargs), 'delitem': 'DELETE FROM {table_name} WHERE {key_column}=?'.format(**kwargs), 'setitem': '''INSERT OR REPLACE INTO {table_name} ({key_column}, {value_column}) VALUES (?, ?)'''.format(**kwargs), 'len': 'SELECT count(*) FROM {table_name}'.format(**kwargs), 'iteritems': 'SELECT {key_column}, {value_column} FROM {table_name}'.format(**kwargs), 'itervalues': 'SELECT {value_column} FROM {table_name}'.format(**kwargs), 'contains': 'SELECT count(*) FROM {table_name} WHERE {key_column}=?'.format(**kwargs), 'iteritems_startswith': '''SELECT {key_column}, {value_column} FROM {table_name} WHERE {key_column} LIKE ?'''.format(**kwargs), } def __iter__(self): sql = self.sqls['iter'] for row in self.cursor.execute(sql): yield row[0] def __getitem__(self, key): sql = self.sqls['getitem'] self.cursor.execute(sql, (key,)) row = self.cursor.fetchone() if row is None: raise KeyError(key) return row[0] def __delitem__(self, key): sql = self.sqls['delitem'] self.cursor.execute(sql, (key,)) self.connection.commit() def __setitem__(self, key, value): sql = self.sqls['setitem'] self.cursor.execute(sql, (key, value)) self.connection.commit() def __len__(self): sql = self.sqls['len'] self.cursor.execute(sql) row = self.cursor.fetchone() return row[0]
[docs] def iteritems(self): sql = self.sqls['iteritems'] for row in self.cursor.execute(sql): yield row
[docs] def items(self): sql = self.sqls['iteritems'] for row in self.cursor.execute(sql): yield row
[docs] def values(self): sql = self.sqls['itervalues'] for row in self.cursor.execute(sql): yield row[0]
[docs] def itervalues(self): sql = self.sqls['itervalues'] for row in self.cursor.execute(sql): yield row[0]
def __contains__(self, key): sql = self.sqls['contains'] self.cursor.execute(sql, (key,)) row = self.cursor.fetchone() return row[0] == 1
[docs] def iteritems_startswith(self, prefix): """item iterator over keys with prefix Args: prefix (str): Prefix of key Examples: All items with key starting with letter 'a' are returned. >>> for frag_id, fragment in fragments.iteritems_startswith('a'): # do something with frag_id and fragment Returns: List[Tuple[key, value]] """ sql = self.sqls['iteritems_startswith'] for row in self.cursor.execute(sql, (prefix + '%',)): yield row
[docs] def materialize(self): """Fetches all kev/value pairs from the sqlite database. Useful when dictionary is iterated multiple times and the cost of fetching is to high. Returns: Dict: Dictionary with all kev/value pairs """ return {k: v for k, v in six.iteritems(self)}
[docs]class IntbitsetDict(SqliteDict): """Dictionary of BitMaps with sqlite3 backend. Args: db (FingerprintsDb): Fingerprints db number_of_bits (int): Number of bits Attributes: number_of_bits (int): Number of bits the bitsets consist of """ def __init__(self, db, number_of_bits=None): super(IntbitsetDict, self).__init__(db.connection, 'bitsets', 'frag_id', 'bitset') if number_of_bits is not None: self.number_of_bits = number_of_bits
[docs] def update(*args, **kwds): self = args[0] with FastInserter(self.cursor): MutableMapping.update(*args, **kwds) # make table and index stored contiguously self.cursor.execute('VACUUM')
@property def number_of_bits(self): self.cursor.execute('SELECT value FROM attributes WHERE key=?', (ATTR_NUMBER_OF_BITS,)) row = self.cursor.fetchone() if row is None: return None return int(row[0]) @number_of_bits.setter def number_of_bits(self, value): sql = 'INSERT OR REPLACE INTO attributes (key, value) VALUES (?, ?)' self.cursor.execute(sql, (ATTR_NUMBER_OF_BITS, str(value))) self.connection.commit() @number_of_bits.deleter def number_of_bits(self): sql = 'DELETE FROM attributes WHERE key=?' self.cursor.execute(sql, (ATTR_NUMBER_OF_BITS,)) self.connection.commit()