"""Run all test cases. """ import sys import os import unittest try: # For Pythons w/distutils pybsddb import bsddb3 as bsddb except ImportError: # For Python 2.3 import bsddb if sys.version_info[0] >= 3 : charset = "iso8859-1" # Full 8 bit class logcursor_py3k(object) : def __init__(self, env) : self._logcursor = env.log_cursor() def __getattr__(self, v) : return getattr(self._logcursor, v) def __next__(self) : v = getattr(self._logcursor, "next")() if v is not None : v = (v[0], v[1].decode(charset)) return v next = __next__ def first(self) : v = self._logcursor.first() if v is not None : v = (v[0], v[1].decode(charset)) return v def last(self) : v = self._logcursor.last() if v is not None : v = (v[0], v[1].decode(charset)) return v def prev(self) : v = self._logcursor.prev() if v is not None : v = (v[0], v[1].decode(charset)) return v def current(self) : v = self._logcursor.current() if v is not None : v = (v[0], v[1].decode(charset)) return v def set(self, lsn) : v = self._logcursor.set(lsn) if v is not None : v = (v[0], v[1].decode(charset)) return v class cursor_py3k(object) : def __init__(self, db, *args, **kwargs) : self._dbcursor = db.cursor(*args, **kwargs) def __getattr__(self, v) : return getattr(self._dbcursor, v) def _fix(self, v) : if v is None : return None key, value = v if isinstance(key, bytes) : key = key.decode(charset) return (key, value.decode(charset)) def __next__(self) : v = getattr(self._dbcursor, "next")() return self._fix(v) next = __next__ def previous(self) : v = self._dbcursor.previous() return self._fix(v) def last(self) : v = self._dbcursor.last() return self._fix(v) def set(self, k) : if isinstance(k, str) : k = bytes(k, charset) v = self._dbcursor.set(k) return self._fix(v) def set_recno(self, num) : v = self._dbcursor.set_recno(num) return self._fix(v) def set_range(self, k, dlen=-1, doff=-1) : if isinstance(k, str) : k = bytes(k, charset) v = self._dbcursor.set_range(k, dlen=dlen, doff=doff) return self._fix(v) def dup(self, flags=0) : cursor = self._dbcursor.dup(flags) return dup_cursor_py3k(cursor) def next_dup(self) : v = self._dbcursor.next_dup() return self._fix(v) def next_nodup(self) : v = self._dbcursor.next_nodup() return self._fix(v) def put(self, key, data, flags=0, dlen=-1, doff=-1) : if isinstance(key, str) : key = bytes(key, charset) if isinstance(data, str) : value = bytes(data, charset) return self._dbcursor.put(key, data, flags=flags, dlen=dlen, doff=doff) def current(self, flags=0, dlen=-1, doff=-1) : v = self._dbcursor.current(flags=flags, dlen=dlen, doff=doff) return self._fix(v) def first(self) : v = self._dbcursor.first() return self._fix(v) def pget(self, key=None, data=None, flags=0) : # Incorrect because key can be a bare number, # but enough to pass testsuite if isinstance(key, int) and (data is None) and (flags == 0) : flags = key key = None if isinstance(key, str) : key = bytes(key, charset) if isinstance(data, int) and (flags==0) : flags = data data = None if isinstance(data, str) : data = bytes(data, charset) v=self._dbcursor.pget(key=key, data=data, flags=flags) if v is not None : v1, v2, v3 = v if isinstance(v1, bytes) : v1 = v1.decode(charset) if isinstance(v2, bytes) : v2 = v2.decode(charset) v = (v1, v2, v3.decode(charset)) return v def join_item(self) : v = self._dbcursor.join_item() if v is not None : v = v.decode(charset) return v def get(self, *args, **kwargs) : l = len(args) if l == 2 : k, f = args if isinstance(k, str) : k = bytes(k, "iso8859-1") args = (k, f) elif l == 3 : k, d, f = args if isinstance(k, str) : k = bytes(k, charset) if isinstance(d, str) : d = bytes(d, charset) args =(k, d, f) v = self._dbcursor.get(*args, **kwargs) if v is not None : k, v = v if isinstance(k, bytes) : k = k.decode(charset) v = (k, v.decode(charset)) return v def get_both(self, key, value) : if isinstance(key, str) : key = bytes(key, charset) if isinstance(value, str) : value = bytes(value, charset) v=self._dbcursor.get_both(key, value) return self._fix(v) class dup_cursor_py3k(cursor_py3k) : def __init__(self, dbcursor) : self._dbcursor = dbcursor class DB_py3k(object) : def __init__(self, *args, **kwargs) : args2=[] for i in args : if isinstance(i, DBEnv_py3k) : i = i._dbenv args2.append(i) args = tuple(args2) for k, v in kwargs.items() : if isinstance(v, DBEnv_py3k) : kwargs[k] = v._dbenv self._db = bsddb._db.DB_orig(*args, **kwargs) def __contains__(self, k) : if isinstance(k, str) : k = bytes(k, charset) return getattr(self._db, "has_key")(k) def __getitem__(self, k) : if isinstance(k, str) : k = bytes(k, charset) v = self._db[k] if v is not None : v = v.decode(charset) return v def __setitem__(self, k, v) : if isinstance(k, str) : k = bytes(k, charset) if isinstance(v, str) : v = bytes(v, charset) self._db[k] = v def __delitem__(self, k) : if isinstance(k, str) : k = bytes(k, charset) del self._db[k] def __getattr__(self, v) : return getattr(self._db, v) def __len__(self) : return len(self._db) def has_key(self, k, txn=None) : if isinstance(k, str) : k = bytes(k, charset) return self._db.has_key(k, txn=txn) def set_re_delim(self, c) : if isinstance(c, str) : # We can use a numeric value byte too c = bytes(c, charset) return self._db.set_re_delim(c) def set_re_pad(self, c) : if isinstance(c, str) : # We can use a numeric value byte too c = bytes(c, charset) return self._db.set_re_pad(c) def get_re_source(self) : source = self._db.get_re_source() return source.decode(charset) def put(self, key, data, txn=None, flags=0, dlen=-1, doff=-1) : if isinstance(key, str) : key = bytes(key, charset) if isinstance(data, str) : value = bytes(data, charset) return self._db.put(key, data, flags=flags, txn=txn, dlen=dlen, doff=doff) def append(self, value, txn=None) : if isinstance(value, str) : value = bytes(value, charset) return self._db.append(value, txn=txn) def get_size(self, key) : if isinstance(key, str) : key = bytes(key, charset) return self._db.get_size(key) def exists(self, key, *args, **kwargs) : if isinstance(key, str) : key = bytes(key, charset) return self._db.exists(key, *args, **kwargs) def get(self, key, default="MagicCookie", txn=None, flags=0, dlen=-1, doff=-1) : if isinstance(key, str) : key = bytes(key, charset) if default != "MagicCookie" : # Magic for 'test_get_none.py' v=self._db.get(key, default=default, txn=txn, flags=flags, dlen=dlen, doff=doff) else : v=self._db.get(key, txn=txn, flags=flags, dlen=dlen, doff=doff) if (v is not None) and isinstance(v, bytes) : v = v.decode(charset) return v def pget(self, key, txn=None) : if isinstance(key, str) : key = bytes(key, charset) v=self._db.pget(key, txn=txn) if v is not None : v1, v2 = v if isinstance(v1, bytes) : v1 = v1.decode(charset) v = (v1, v2.decode(charset)) return v def get_both(self, key, value, txn=None, flags=0) : if isinstance(key, str) : key = bytes(key, charset) if isinstance(value, str) : value = bytes(value, charset) v=self._db.get_both(key, value, txn=txn, flags=flags) if v is not None : v = v.decode(charset) return v def delete(self, key, txn=None) : if isinstance(key, str) : key = bytes(key, charset) return self._db.delete(key, txn=txn) def keys(self) : k = self._db.keys() if len(k) and isinstance(k[0], bytes) : return [i.decode(charset) for i in self._db.keys()] else : return k def items(self) : data = self._db.items() if not len(data) : return data data2 = [] for k, v in data : if isinstance(k, bytes) : k = k.decode(charset) data2.append((k, v.decode(charset))) return data2 def associate(self, secondarydb, callback, flags=0, txn=None) : class associate_callback(object) : def __init__(self, callback) : self._callback = callback def callback(self, key, data) : if isinstance(key, str) : key = key.decode(charset) data = data.decode(charset) key = self._callback(key, data) if (key != bsddb._db.DB_DONOTINDEX) : if isinstance(key, str) : key = bytes(key, charset) elif isinstance(key, list) : key2 = [] for i in key : if isinstance(i, str) : i = bytes(i, charset) key2.append(i) key = key2 return key return self._db.associate(secondarydb._db, associate_callback(callback).callback, flags=flags, txn=txn) def cursor(self, txn=None, flags=0) : return cursor_py3k(self._db, txn=txn, flags=flags) def join(self, cursor_list) : cursor_list = [i._dbcursor for i in cursor_list] return dup_cursor_py3k(self._db.join(cursor_list)) class DBEnv_py3k(object) : def __init__(self, *args, **kwargs) : self._dbenv = bsddb._db.DBEnv_orig(*args, **kwargs) def __getattr__(self, v) : return getattr(self._dbenv, v) def log_cursor(self, flags=0) : return logcursor_py3k(self._dbenv) def get_lg_dir(self) : return self._dbenv.get_lg_dir().decode(charset) def get_tmp_dir(self) : return self._dbenv.get_tmp_dir().decode(charset) def get_data_dirs(self) : return tuple( (i.decode(charset) for i in self._dbenv.get_data_dirs())) class DBSequence_py3k(object) : def __init__(self, db, *args, **kwargs) : self._db=db self._dbsequence = bsddb._db.DBSequence_orig(db._db, *args, **kwargs) def __getattr__(self, v) : return getattr(self._dbsequence, v) def open(self, key, *args, **kwargs) : return self._dbsequence.open(bytes(key, charset), *args, **kwargs) def get_key(self) : return self._dbsequence.get_key().decode(charset) def get_dbp(self) : return self._db import string string.letters=[chr(i) for i in xrange(65,91)] bsddb._db.DBEnv_orig = bsddb._db.DBEnv bsddb._db.DB_orig = bsddb._db.DB if bsddb.db.version() <= (4, 3) : bsddb._db.DBSequence_orig = None else : bsddb._db.DBSequence_orig = bsddb._db.DBSequence def do_proxy_db_py3k(flag) : flag2 = do_proxy_db_py3k.flag do_proxy_db_py3k.flag = flag if flag : bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = DBEnv_py3k bsddb.DB = bsddb.db.DB = bsddb._db.DB = DB_py3k bsddb._db.DBSequence = DBSequence_py3k else : bsddb.DBEnv = bsddb.db.DBEnv = bsddb._db.DBEnv = bsddb._db.DBEnv_orig bsddb.DB = bsddb.db.DB = bsddb._db.DB = bsddb._db.DB_orig bsddb._db.DBSequence = bsddb._db.DBSequence_orig return flag2 do_proxy_db_py3k.flag = False do_proxy_db_py3k(True) try: # For Pythons w/distutils pybsddb from bsddb3 import db, dbtables, dbutils, dbshelve, \ hashopen, btopen, rnopen, dbobj except ImportError: # For Python 2.3 from bsddb import db, dbtables, dbutils, dbshelve, \ hashopen, btopen, rnopen, dbobj try: from bsddb3 import test_support except ImportError: if sys.version_info[0] < 3 : from test import test_support else : from test import support as test_support try: if sys.version_info[0] < 3 : from threading import Thread, currentThread del Thread, currentThread else : from threading import Thread, current_thread del Thread, current_thread have_threads = True except ImportError: have_threads = False verbose = 0 if 'verbose' in sys.argv: verbose = 1 sys.argv.remove('verbose') if 'silent' in sys.argv: # take care of old flag, just in case verbose = 0 sys.argv.remove('silent') def print_versions(): print print '-=' * 38 print db.DB_VERSION_STRING print 'bsddb.db.version(): %s' % (db.version(), ) if db.version() >= (5, 0) : print 'bsddb.db.full_version(): %s' %repr(db.full_version()) print 'bsddb.db.__version__: %s' % db.__version__ print 'bsddb.db.cvsid: %s' % db.cvsid # Workaround for allowing generating an EGGs as a ZIP files. suffix="__" print 'py module: %s' % getattr(bsddb, "__file"+suffix) print 'extension module: %s' % getattr(bsddb, "__file"+suffix) print 'python version: %s' % sys.version print 'My pid: %s' % os.getpid() print '-=' * 38 def get_new_path(name) : get_new_path.mutex.acquire() try : import os path=os.path.join(get_new_path.prefix, name+"_"+str(os.getpid())+"_"+str(get_new_path.num)) get_new_path.num+=1 finally : get_new_path.mutex.release() return path def get_new_environment_path() : path=get_new_path("environment") import os try: os.makedirs(path,mode=0700) except os.error: test_support.rmtree(path) os.makedirs(path) return path def get_new_database_path() : path=get_new_path("database") import os if os.path.exists(path) : os.remove(path) return path # This path can be overriden via "set_test_path_prefix()". import os, os.path get_new_path.prefix=os.path.join(os.environ.get("TMPDIR", os.path.join(os.sep,"tmp")), "z-Berkeley_DB") get_new_path.num=0 def get_test_path_prefix() : return get_new_path.prefix def set_test_path_prefix(path) : get_new_path.prefix=path def remove_test_path_directory() : test_support.rmtree(get_new_path.prefix) if have_threads : import threading get_new_path.mutex=threading.Lock() del threading else : class Lock(object) : def acquire(self) : pass def release(self) : pass get_new_path.mutex=Lock() del Lock class PrintInfoFakeTest(unittest.TestCase): def testPrintVersions(self): print_versions() # This little hack is for when this module is run as main and all the # other modules import it so they will still be able to get the right # verbose setting. It's confusing but it works. if sys.version_info[0] < 3 : import test_all test_all.verbose = verbose else : import sys print >>sys.stderr, "Work to do!" def suite(module_prefix='', timing_check=None): test_modules = [ 'test_associate', 'test_basics', 'test_dbenv', 'test_db', 'test_compare', 'test_compat', 'test_cursor_pget_bug', 'test_dbobj', 'test_dbshelve', 'test_dbtables', 'test_distributed_transactions', 'test_early_close', 'test_fileid', 'test_get_none', 'test_join', 'test_lock', 'test_misc', 'test_pickle', 'test_queue', 'test_recno', 'test_replication', 'test_sequence', 'test_thread', ] alltests = unittest.TestSuite() for name in test_modules: #module = __import__(name) # Do it this way so that suite may be called externally via # python's Lib/test/test_bsddb3. module = __import__(module_prefix+name, globals(), locals(), name) alltests.addTest(module.test_suite()) if timing_check: alltests.addTest(unittest.makeSuite(timing_check)) return alltests def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(PrintInfoFakeTest)) return suite if __name__ == '__main__': print_versions() unittest.main(defaultTest='suite')