Package rdkit :: Package Dbase :: Module StorageUtils
[hide private]
[frames] | no frames]

Source Code for Module rdkit.Dbase.StorageUtils

  1  # $Id$ 
  2  # 
  3  #  Copyright (C) 2003-2006 Rational Discovery LLC 
  4  # 
  5  #   @@ All Rights Reserved @@ 
  6  #  This file is part of the RDKit. 
  7  #  The contents are covered by the terms of the BSD license 
  8  #  which is included in the file license.txt, found at the root 
  9  #  of the RDKit source tree. 
 10  # 
 11  """ Various storage (molecular and otherwise) functionality 
 12   
 13  """ 
 14  from rdkit import RDConfig 
 15  from rdkit.Dbase import DbModule 
 16  from rdkit.Dbase.DbConnection import DbConnect 
 17   
18 -def ValidateRDId(id):
19 """ returns whether or not an RDId is valid 20 21 >>> ValidateRDId('RDCmpd-000-009-9') 22 1 23 >>> ValidateRDId('RDCmpd-009-000-009-8') 24 1 25 >>> ValidateRDId('RDCmpd-009-000-109-8') 26 0 27 >>> ValidateRDId('bogus') 28 0 29 30 """ 31 id = id.replace('_','-') 32 splitId = id.split('-') 33 if len(splitId)<4: 34 return 0 35 accum = 0 36 for entry in splitId[1:-1]: 37 for char in entry: 38 try: 39 v = int(char) 40 except ValueError: 41 return 0 42 accum += v 43 crc = int(splitId[-1]) 44 return accum%10 == crc
45
46 -def RDIdToInt(id,validate=1):
47 """ Returns the integer index for a given RDId 48 Throws a ValueError on error 49 50 >>> RDIdToInt('RDCmpd-000-009-9') 51 9 52 >>> RDIdToInt('RDCmpd-009-000-009-8') 53 9000009 54 >>> RDIdToInt('RDData_000_009_9') 55 9 56 >>> try: 57 ... RDIdToInt('RDCmpd-009-000-109-8') 58 ... except ValueError: 59 ... print('ok') 60 ... else: 61 ... print('failed') 62 ok 63 >>> try: 64 ... RDIdToInt('bogus') 65 ... except ValueError: 66 ... print('ok') 67 ... else: 68 ... print('failed') 69 ok 70 71 """ 72 if validate and not ValidateRDId(id): 73 raise ValueError("Bad RD Id") 74 id = id.replace('_','-') 75 terms = id.split('-')[1:-1] 76 res = 0 77 factor = 1 78 terms.reverse() 79 for term in terms: 80 res += factor*int(term) 81 factor *= 1000 82 return res
83 84
85 -def IndexToRDId(idx,leadText='RDCmpd'):
86 """ Converts an integer index into an RDId 87 88 The format of the ID is: 89 leadText-xxx-xxx-xxx-y 90 The number blocks are zero padded and the the final digit (y) 91 is a checksum: 92 >>> str(IndexToRDId(9)) 93 'RDCmpd-000-009-9' 94 >>> str(IndexToRDId(9009)) 95 'RDCmpd-009-009-8' 96 97 A millions block is included if it's nonzero: 98 >>> str(IndexToRDId(9000009)) 99 'RDCmpd-009-000-009-8' 100 101 The text at the beginning can be altered: 102 >>> str(IndexToRDId(9,leadText='RDAlt')) 103 'RDAlt-000-009-9' 104 105 Negative indices are errors: 106 >>> try: 107 ... IndexToRDId(-1) 108 ... except ValueError: 109 ... print('ok') 110 ... else: 111 ... print('failed') 112 ok 113 114 """ 115 if idx < 0: 116 raise ValueError('indices must be >= zero') 117 118 res = leadText+'-' 119 tmpIdx = idx 120 if idx>=1e6: 121 res += '%03d-'%(idx//1e6) 122 tmpIdx = idx % int(1e6) 123 if tmpIdx<1000: 124 res += '000-' 125 else: 126 res += '%03d-'%(tmpIdx//1000) 127 tmpIdx = tmpIdx % 1000 128 129 res += '%03d-'%(tmpIdx) 130 accum = 0 131 txt = str(idx) 132 for char in txt: 133 accum += int(char) 134 135 res += str(accum%10) 136 return res
137
138 -def GetNextId(conn,table,idColName='Id'):
139 """ returns the next available Id in the database 140 141 see RegisterItem for testing/documentation 142 143 """ 144 vals = conn.GetData(table=table,fields=idColName) 145 max = 0 146 for val in vals: 147 val = RDIdToInt(val[0],validate=0) 148 if val > max: max = val 149 max += 1 150 return max
151
152 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
153 """ returns the next available RDId in the database 154 155 see RegisterItem for testing/documentation 156 157 """ 158 if not leadText: 159 val = conn.GetData(table=table,fields=idColName)[0][0] 160 val = val.replace('_','-') 161 leadText = val.split('-')[0] 162 163 id = GetNextId(conn,table,idColName=idColName) 164 return IndexToRDId(id,leadText=leadText)
165
166 -def RegisterItem(conn,table,value,columnName,data=None, 167 id='',idColName='Id',leadText='RDCmpd'):
168 """ 169 170 >>> conn = DbConnect(tempDbName) 171 >>> tblName = 'StorageTest' 172 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int') 173 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1') 174 True 175 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2') 176 True 177 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1') 178 True 179 >>> str(GetNextRDId(conn,tblName)) 180 'RDCmpd-000-003-3' 181 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1) 182 True 183 184 It's also possible to provide ids by hand: 185 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1') 186 True 187 >>> str(GetNextRDId(conn,tblName)) 188 'RDCmpd-000-011-2' 189 190 """ 191 curs = conn.GetCursor() 192 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder) 193 curs.execute(query,(value,)) 194 tmp = curs.fetchone() 195 if tmp: 196 return 0,tmp[0] 197 if not id: 198 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 199 if data: 200 row = [id] 201 row.extend(data) 202 conn.InsertData(table,row) 203 conn.Commit() 204 return 1,id
205
206 -def RegisterItems(conn,table,values,columnName,rows, 207 startId='',idColName='Id',leadText='RDCmpd'):
208 """ 209 """ 210 if rows and len(rows) != len(values): 211 raise ValueError("length mismatch between rows and values") 212 nVals = len(values) 213 origOrder={} 214 for i,v in enumerate(values): 215 origOrder[v]=i 216 217 curs = conn.GetCursor() 218 qs = ','.join(DbModule.placeHolder*nVals) 219 curs.execute("create temporary table regitemstemp (%(columnName)s)"%locals()) 220 curs.executemany("insert into regitemstemp values (?)",[(x,) for x in values]) 221 query = 'select %(columnName)s,%(idColName)s from %(table)s where %(columnName)s in (select * from regitemstemp)'%locals() 222 curs.execute(query) 223 224 dbData = curs.fetchall() 225 if dbData and len(dbData)==nVals: 226 return 0,[x[1] for x in dbData] 227 228 if not startId: 229 startId = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 230 startId = RDIdToInt(startId) 231 ids = [None]*nVals 232 for val,id in dbData: 233 ids[origOrder[val]]=id 234 235 rowsToInsert=[] 236 for i in range(nVals): 237 if ids[i] is None: 238 id = startId 239 startId += 1 240 id = IndexToRDId(id,leadText=leadText) 241 ids[i] = id 242 if rows: 243 row = [id] 244 row.extend(rows[i]) 245 rowsToInsert.append(row) 246 if rowsToInsert: 247 nCols = len(rowsToInsert[0]) 248 qs = ','.join(DbModule.placeHolder*nCols) 249 curs.executemany('insert into %(table)s values (%(qs)s)'%locals(),rowsToInsert) 250 conn.Commit() 251 return len(values)-len(dbData),ids
252 253 254 255 256 257 #------------------------------------ 258 # 259 # doctest boilerplate 260 # 261 _roundtripTests = """ 262 >>> ValidateRDId(IndexToRDId(100)) 263 1 264 >>> ValidateRDId(IndexToRDId(10000,leadText='foo')) 265 1 266 >>> indices = [1,100,1000,1000000] 267 >>> vals = [] 268 >>> for idx in indices: 269 ... vals.append(RDIdToInt(IndexToRDId(idx))) 270 >>> vals == indices 271 1 272 273 """ 274 __test__ = {"roundtrip":_roundtripTests} 275
276 -def _test():
277 import doctest,sys 278 return doctest.testmod(sys.modules["__main__"])
279 280 if __name__ == '__main__': 281 import sys,tempfile,shutil,os 282 if RDConfig.useSqlLite: 283 tmpf,tempName = tempfile.mkstemp(suffix='sqlt') 284 tempDbName = tempName 285 shutil.copyfile(RDConfig.RDTestDatabase,tempDbName) 286 else: 287 tempDbName='::RDTests' 288 failed,tried = _test() 289 if RDConfig.useSqlLite and os.path.exists(tempDbName): 290 try: 291 os.unlink(tempDbName) 292 except: 293 import traceback 294 traceback.print_exc() 295 sys.exit(failed) 296