1
2
3
4
5
6
7
8
9
10
11 """ Various storage (molecular and otherwise) functionality
12
13 """
14 from rdkit import RDConfig
15 from rdkit.Dbase import DbModule
16 from rdkit.Dbase.DbConnection import DbConnect
17
19 """ returns whether or not an RDId is valid
20
21 >>> ValidateRDId('RDCmpd-000-009-9')
22 1
23 >>> ValidateRDId('RDCmpd-009-000-009-8')
24 1
25 >>> ValidateRDId('RDCmpd-009-000-109-8')
26 0
27 >>> ValidateRDId('bogus')
28 0
29
30 """
31 id = id.replace('_','-')
32 splitId = id.split('-')
33 if len(splitId)<4:
34 return 0
35 accum = 0
36 for entry in splitId[1:-1]:
37 for char in entry:
38 try:
39 v = int(char)
40 except ValueError:
41 return 0
42 accum += v
43 crc = int(splitId[-1])
44 return accum%10 == crc
45
47 """ Returns the integer index for a given RDId
48 Throws a ValueError on error
49
50 >>> RDIdToInt('RDCmpd-000-009-9')
51 9
52 >>> RDIdToInt('RDCmpd-009-000-009-8')
53 9000009
54 >>> RDIdToInt('RDData_000_009_9')
55 9
56 >>> try:
57 ... RDIdToInt('RDCmpd-009-000-109-8')
58 ... except ValueError:
59 ... print('ok')
60 ... else:
61 ... print('failed')
62 ok
63 >>> try:
64 ... RDIdToInt('bogus')
65 ... except ValueError:
66 ... print('ok')
67 ... else:
68 ... print('failed')
69 ok
70
71 """
72 if validate and not ValidateRDId(id):
73 raise ValueError("Bad RD Id")
74 id = id.replace('_','-')
75 terms = id.split('-')[1:-1]
76 res = 0
77 factor = 1
78 terms.reverse()
79 for term in terms:
80 res += factor*int(term)
81 factor *= 1000
82 return res
83
84
86 """ Converts an integer index into an RDId
87
88 The format of the ID is:
89 leadText-xxx-xxx-xxx-y
90 The number blocks are zero padded and the the final digit (y)
91 is a checksum:
92 >>> str(IndexToRDId(9))
93 'RDCmpd-000-009-9'
94 >>> str(IndexToRDId(9009))
95 'RDCmpd-009-009-8'
96
97 A millions block is included if it's nonzero:
98 >>> str(IndexToRDId(9000009))
99 'RDCmpd-009-000-009-8'
100
101 The text at the beginning can be altered:
102 >>> str(IndexToRDId(9,leadText='RDAlt'))
103 'RDAlt-000-009-9'
104
105 Negative indices are errors:
106 >>> try:
107 ... IndexToRDId(-1)
108 ... except ValueError:
109 ... print('ok')
110 ... else:
111 ... print('failed')
112 ok
113
114 """
115 if idx < 0:
116 raise ValueError('indices must be >= zero')
117
118 res = leadText+'-'
119 tmpIdx = idx
120 if idx>=1e6:
121 res += '%03d-'%(idx//1e6)
122 tmpIdx = idx % int(1e6)
123 if tmpIdx<1000:
124 res += '000-'
125 else:
126 res += '%03d-'%(tmpIdx//1000)
127 tmpIdx = tmpIdx % 1000
128
129 res += '%03d-'%(tmpIdx)
130 accum = 0
131 txt = str(idx)
132 for char in txt:
133 accum += int(char)
134
135 res += str(accum%10)
136 return res
137
139 """ returns the next available Id in the database
140
141 see RegisterItem for testing/documentation
142
143 """
144 vals = conn.GetData(table=table,fields=idColName)
145 max = 0
146 for val in vals:
147 val = RDIdToInt(val[0],validate=0)
148 if val > max: max = val
149 max += 1
150 return max
151
152 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
153 """ returns the next available RDId in the database
154
155 see RegisterItem for testing/documentation
156
157 """
158 if not leadText:
159 val = conn.GetData(table=table,fields=idColName)[0][0]
160 val = val.replace('_','-')
161 leadText = val.split('-')[0]
162
163 id = GetNextId(conn,table,idColName=idColName)
164 return IndexToRDId(id,leadText=leadText)
165
166 -def RegisterItem(conn,table,value,columnName,data=None,
167 id='',idColName='Id',leadText='RDCmpd'):
168 """
169
170 >>> conn = DbConnect(tempDbName)
171 >>> tblName = 'StorageTest'
172 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int')
173 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1')
174 True
175 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2')
176 True
177 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1')
178 True
179 >>> str(GetNextRDId(conn,tblName))
180 'RDCmpd-000-003-3'
181 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1)
182 True
183
184 It's also possible to provide ids by hand:
185 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1')
186 True
187 >>> str(GetNextRDId(conn,tblName))
188 'RDCmpd-000-011-2'
189
190 """
191 curs = conn.GetCursor()
192 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder)
193 curs.execute(query,(value,))
194 tmp = curs.fetchone()
195 if tmp:
196 return 0,tmp[0]
197 if not id:
198 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText)
199 if data:
200 row = [id]
201 row.extend(data)
202 conn.InsertData(table,row)
203 conn.Commit()
204 return 1,id
205
206 -def RegisterItems(conn,table,values,columnName,rows,
207 startId='',idColName='Id',leadText='RDCmpd'):
208 """
209 """
210 if rows and len(rows) != len(values):
211 raise ValueError("length mismatch between rows and values")
212 nVals = len(values)
213 origOrder={}
214 for i,v in enumerate(values):
215 origOrder[v]=i
216
217 curs = conn.GetCursor()
218 qs = ','.join(DbModule.placeHolder*nVals)
219 curs.execute("create temporary table regitemstemp (%(columnName)s)"%locals())
220 curs.executemany("insert into regitemstemp values (?)",[(x,) for x in values])
221 query = 'select %(columnName)s,%(idColName)s from %(table)s where %(columnName)s in (select * from regitemstemp)'%locals()
222 curs.execute(query)
223
224 dbData = curs.fetchall()
225 if dbData and len(dbData)==nVals:
226 return 0,[x[1] for x in dbData]
227
228 if not startId:
229 startId = GetNextRDId(conn,table,idColName=idColName,leadText=leadText)
230 startId = RDIdToInt(startId)
231 ids = [None]*nVals
232 for val,id in dbData:
233 ids[origOrder[val]]=id
234
235 rowsToInsert=[]
236 for i in range(nVals):
237 if ids[i] is None:
238 id = startId
239 startId += 1
240 id = IndexToRDId(id,leadText=leadText)
241 ids[i] = id
242 if rows:
243 row = [id]
244 row.extend(rows[i])
245 rowsToInsert.append(row)
246 if rowsToInsert:
247 nCols = len(rowsToInsert[0])
248 qs = ','.join(DbModule.placeHolder*nCols)
249 curs.executemany('insert into %(table)s values (%(qs)s)'%locals(),rowsToInsert)
250 conn.Commit()
251 return len(values)-len(dbData),ids
252
253
254
255
256
257
258
259
260
261 _roundtripTests = """
262 >>> ValidateRDId(IndexToRDId(100))
263 1
264 >>> ValidateRDId(IndexToRDId(10000,leadText='foo'))
265 1
266 >>> indices = [1,100,1000,1000000]
267 >>> vals = []
268 >>> for idx in indices:
269 ... vals.append(RDIdToInt(IndexToRDId(idx)))
270 >>> vals == indices
271 1
272
273 """
274 __test__ = {"roundtrip":_roundtripTests}
275
277 import doctest,sys
278 return doctest.testmod(sys.modules["__main__"])
279
280 if __name__ == '__main__':
281 import sys,tempfile,shutil,os
282 if RDConfig.useSqlLite:
283 tmpf,tempName = tempfile.mkstemp(suffix='sqlt')
284 tempDbName = tempName
285 shutil.copyfile(RDConfig.RDTestDatabase,tempDbName)
286 else:
287 tempDbName='::RDTests'
288 failed,tried = _test()
289 if RDConfig.useSqlLite and os.path.exists(tempDbName):
290 try:
291 os.unlink(tempDbName)
292 except:
293 import traceback
294 traceback.print_exc()
295 sys.exit(failed)
296