Hari ini saya mencoba revisit ulang driver DBExpress wrapper untuk python yang kompatibel dengan DB-API, driver ini sebenarnya telah saya buat sebelumnya, namun sepertinya saat itu karena masih banyak instability issue atas project middleware server yang saya kerja, saya coba lepasin untuk meminimalisir permasalahan yang mungkin disebabkan karena driver ini.
Namun setelah host midleware server-nya mencapai tahapan maturity yang cukup baik, sepertinya ini saat yang cukup tepat untuk melakukan revisit, dan sepertinya bisa dikatakan tulis ulang karena kode yang saya buat sebelumnya banyak yang keliru dalam pemanfaatan Python API functionya. Meskipun belum sempat saya test lebih lanjut, kode dasar ini saya postingkan di sini sebagai alternatif pengarsipan.
Berikut adalah source code DBExpress Wrapper dalam Delphi:
unit dbxapi;
interface
uses
SysUtils, Classes, DBCommonTypes, SqlExpr, Variants, ECFPythonEngine,
ecfpyinf, FMTBcd, DB, DBXCommon;
type
TECFDbxAccess = class(TComponent)
private
FConnection: TSQLConnection;
FQuery: TCustomSQLDataSet;
public
constructor Create( AOwner : TComponent ); override;
destructor Destroy; override;
procedure Execute(const SQL: WideString);
property Connection: TSQLConnection read FConnection;
property Query: TCustomSQLDataSet read FQuery;
end;
TECFDbxConn = class(TPyObject)
private
FSQLConn: TECFDbxAccess;
function getHeaders: PPyObject;
public
constructor Create( APythonType : TPythonType ); override;
destructor Destroy; override;
function _SetDriverName( args : PPyObject ) : PPyObject; cdecl;
function _SetGetDriverFunc( args : PPyObject ) : PPyObject; cdecl;
function _SetLibraryName( args : PPyObject ) : PPyObject; cdecl;
function _SetParams( args : PPyObject ) : PPyObject; cdecl;
function _VendorLib( args : PPyObject ) : PPyObject; cdecl;
function _beginTrans( args : PPyObject ) : PPyObject; cdecl;
function _commitTrans( args : PPyObject ) : PPyObject; cdecl;
function _rollbackTrans( args : PPyObject ) : PPyObject; cdecl;
function _query( args : PPyObject ) : PPyObject; cdecl;
function _fetch_array( args : PPyObject ) : PPyObject; cdecl;
function _close( args : PPyObject ) : PPyObject; cdecl;
class procedure RegisterMethods( PythonType : TPythonType ); override;
end;
TPythonDbxConn = class( TPythonType)
public
constructor Create( AOwner : TComponent ); override;
end;
TECFDbxAPI = class(TECFPythonModule)
private
FPythonDBXConn: TPythonDbxConn;
public
function PyGetConnection( pyself, args : PPyObject ) : PPyObject; cdecl;
procedure RaiseDBXError(const Msg: string);
public
constructor Create( AOwner : TComponent ); override;
destructor Destroy; override;
procedure Initialize; override;
procedure SetupObjectType;
end;
procedure CreateComponents( AOwner : TComponent; APyEngine: TPythonEngine );
implementation
uses
ActiveX;
type
TProtCustomSQLDataSet = class(TCustomSQLDataSet);
const
TYPE_STRING = 1;
TYPE_BINARY = 2;
TYPE_NUMBER = 3;
TYPE_DATETIME = 4;
TYPE_DECIMAL = 5;
var
DbxAPIModule: TECFDbxAPI;
procedure CreateComponents( AOwner : TComponent; APyEngine: TPythonEngine );
begin
with TECFDbxAPI.Create(AOwner) do
begin
Engine := APyEngine;
SetupObjectType;
end;
end;
{ TECFDbxAPI }
constructor TECFDbxAPI.Create(AOwner: TComponent);
begin
inherited Create(AOwner);
ModuleName := '_dbxapi';
Name := 'ECFDbxAPI';
with DocString do
begin
Add( 'This module contains several Object Types that' );
Add( 'will let you work with the DBExpress and access' );
Add( 'a database in a python DB-API compatible implementation.' );
Add( '' );
Add( 'getdbxconn(drv,getdrvfnc,libnm,vlibnm,params) -> Get dbx connection object' );
end;
with Errors.Add do
Name := 'DBXError';
DbxAPIModule := Self;
end;
destructor TECFDbxAPI.Destroy;
begin
inherited Destroy;
end;
procedure TECFDbxAPI.Initialize;
begin
AddDelphiMethod('getdbxconn',
PyGetConnection,
'getdbxconn(drv,getdrvfnc,libnm,vlibnm,params) -- get connection object with driver attached '+
'on it.');
inherited Initialize;
end;
function TECFDbxAPI.PyGetConnection(pyself, args: PPyObject): PPyObject;
var
drv,getdrvfnc,libnm,vlibnm,params:PChar;
AThreadState: PPyThreadState;
begin
Result := nil;
with GetPythonEngine do
begin
if Assigned(args) and (PyTuple_Size(args) > 0) and
(PyArg_ParseTuple( args, 'sssss:', [@drv,@getdrvfnc,@libnm,@vlibnm,@params]) <> 0) then
begin
Result := FPythonDBXConn.CreateInstance;
if (Result <> nil) then
begin
with TECFDbxConn(PythonToDelphi(Result)).FSQLConn do
begin
if Connection.Connected then
Connection.Close;
Connection.DriverName := String(drv);
Connection.GetDriverFunc := String(getdrvfnc);
Connection.LibraryName := String(libnm);
Connection.VendorLib := String(vlibnm);
Connection.Params.Text := String(params);
Connection.LoginPrompt := False;
try
AThreadState := PyEval_SaveThread;
try
Connection.Open;
finally
PyEval_RestoreThread(AThreadState);
end;
except
on E:Exception do
begin
Py_XDECREF(Result);
RaiseDBXError(E.Message);
Result := nil;
end;
end;
end;
end;
end else
RaiseDBXError('Connection could not be established');
end;
end;
procedure TECFDbxAPI.RaiseDBXError(const Msg: string);
begin
RaiseError('DBXError', Msg);
end;
procedure TECFDbxAPI.SetupObjectType;
begin
FPythonDBXConn := TPythonDbxConn.Create(Self);
end;
{ TECFDbxAccess }
constructor TECFDbxAccess.Create(AOwner: TComponent);
begin
inherited Create(AOwner);
CoInitialize(nil);
FConnection := TSQLConnection.Create(Self);
FConnection.LoginPrompt := False;
FQuery := TCustomSQLDataSet.Create(Self);
FQuery.SQLConnection := FConnection;
end;
destructor TECFDbxAccess.Destroy;
begin
FQuery.Free;
FConnection.Free;
inherited Destroy;
end;
procedure TECFDbxAccess.Execute(const SQL: WideString);
begin
FConnection.ConnectionState := csStateExecuting;
try
CoInitialize(nil);
FQuery.Active := False;
TProtCustomSQLDataSet(FQuery).CommandText := SQL;
FQuery.Active := True;
finally
FConnection.ConnectionState := csStateOpen;
end;
end;
{ TPythonDbxConn }
constructor TPythonDbxConn.Create(AOwner: TComponent);
begin
inherited Create(AOwner);
if AOwner is TECFPythonModule then
begin
Module := TECFPythonModule(AOwner);
Engine := Module.Engine;
end;
Name := 'typeDbxConn';
TypeName := 'DBXConn';
PyObjectClass := TECFDbxConn;
GenerateCreateFunction := False;
end;
{ TECFDbxConn }
constructor TECFDbxConn.Create(APythonType: TPythonType);
begin
inherited Create(APythonType);
FSQLConn := TECFDbxAccess.Create(nil);
end;
destructor TECFDbxConn.Destroy;
begin
FSQLConn.Free;
inherited Destroy;
end;
function TECFDbxConn.getHeaders: PPyObject;
var
APyHeader, APyColumn: PPyObject;
AColType: integer;
AColName: string;
I: Integer;
begin
with GetPythonEngine do
begin
APyHeader := PyTuple_New(FSQLConn.Query.FieldCount);
if APyHeader = nil then
CheckError(False);
for I := 0 to FSQLConn.Query.FieldCount - 1 do
begin
AColType := 0;
case FSQLConn.Query.Fields[i].DataType of
ftString, ftFixedChar, ftWideString, // 19..24
ftFixedWideChar, ftWideMemo
: AColType := TYPE_STRING;
ftBlob, ftMemo, ftGraphic, ftFmtMemo, // 12..18
ftBytes, ftVarBytes, ftADT, ftArray, ftTypedBinary
: AColType := TYPE_BINARY;
ftSmallint, ftInteger, ftWord, // 0..4
ftBoolean, ftAutoInc, ftLargeint
: AColType := TYPE_NUMBER;
ftDate, ftTime, ftDateTime, // 5..11
ftTimeStamp
: AColType := TYPE_DATETIME;
ftFloat, ftCurrency, ftBCD, ftFMTBcd
: AColType := TYPE_DECIMAL;
end;
AColName := FSQLConn.Query.Fields[i].FieldName;
APyColumn := PyTuple_New(2);
if APyColumn = nil then
begin
Py_XDECREF(APyHeader);
CheckError(False);
end;
PyTuple_SetItem(APyColumn, 0, PyString_FromString(PChar(AColName)));
PyTuple_SetItem(APyColumn, 1, PyLong_FromLong(AColType));
PyTuple_SetItem(APyHeader, I, APyColumn);
end;
Result := APyHeader;
end;
end;
class procedure TECFDbxConn.RegisterMethods(PythonType: TPythonType);
begin
inherited RegisterMethods(PythonType);
with PythonType do
begin
AddMethod( 'driverName', @TECFDbxConn._SetDriverName,
'DBXConn.driverName(drvname) -> None' );
AddMethod( 'getDriverFunc', @TECFDbxConn._SetGetDriverFunc,
'DBXConn.getDriverFunc(getdrvfunc) -> None' );
AddMethod( 'libraryName', @TECFDbxConn._SetLibraryName,
'DBXConn.libraryName(libname) -> None' );
AddMethod( 'setParam', @TECFDbxConn._SetParams,
'DBXConn.setParam(name, value) -> None' );
AddMethod( 'vendorLib', @TECFDbxConn._VendorLib,
'DBXConn.vendorLib(vdrlib) -> None' );
AddMethod( 'beginTrans', @TECFDbxConn._beginTrans,
'DBXConn.beginTrans() -> TransactionID' );
AddMethod( 'commit', @TECFDbxConn._commitTrans,
'DBXConn.commit(TransactionID) -> None' );
AddMethod( 'rollback', @TECFDbxConn._rollbackTrans,
'DBXConn.rollback(TransactionID) -> None' );
AddMethod( 'query', @TECFDbxConn._query,
'DBXConn.query(SQL) -> affected rows' );
AddMethod( 'fetch_array', @TECFDbxConn._fetch_array,
'DBXConn._fetch_array() -> array of result' );
AddMethod( 'close', @TECFDbxConn._close,
'DBXConn.close) -> None' );
end;
end;
function TECFDbxConn._beginTrans(args: PPyObject): PPyObject;
var
AThreadState: PPyThreadState;
ATransactionID: Integer;
begin
with GetPythonEngine do
begin
Adjust(@Self);
Result := nil;
if not FSQLConn.Connection.Connected then
DbxAPIModule.RaiseDBXError('Not connected to the database')
else
begin
try
AThreadState := PyEval_SaveThread;
try
ATransactionID := Integer(FSQLConn.Connection.BeginTransaction);
finally
PyEval_RestoreThread(AThreadState);
end;
Result := PyLong_FromLong(ATransactionID);
except
on E:Exception do
begin
DbxAPIModule.RaiseDBXError(E.Message);
end;
end;
end;
end;
end;
function TECFDbxConn._close(args: PPyObject): PPyObject;
var
AThreadState: PPyThreadState;
begin
with GetPythonEngine do
begin
Adjust(@Self);
Result := nil;
if not FSQLConn.Connection.Connected then
DbxAPIModule.RaiseDBXError('Not connected to the database')
else
begin
try
AThreadState := PyEval_SaveThread;
try
if FSQLConn.Query.Active then
FSQLConn.Query.Close;
if FSQLConn.Connection.Connected then
FSQLConn.Connection.Close;
finally
PyEval_RestoreThread(AThreadState);
end;
Result := ReturnNone;
except
on E:Exception do
begin
DbxAPIModule.RaiseDBXError(E.Message);
end;
end;
end;
end;
end;
function TECFDbxConn._commitTrans(args: PPyObject): PPyObject;
var
AThreadState: PPyThreadState;
ATransactionID: Integer;
ATrans: TDBXTransaction;
begin
with GetPythonEngine do
begin
Adjust(@Self);
Result := nil;
if not FSQLConn.Connection.Connected then
DbxAPIModule.RaiseDBXError('Not connected to the database')
else
begin
if (PyArg_ParseTuple( args, 'L:DBXConn.commit', [@ATransactionID] ) <> 0) then
begin
ATrans := TDBXTransaction(Ptr(ATransactionID));
if ATrans = nil then
DbxAPIModule.RaiseDBXError('Invalid Transaction ID')
else
begin
try
AThreadState := PyEval_SaveThread;
try
FSQLConn.Connection.CommitFreeAndNil(ATrans);
finally
PyEval_RestoreThread(AThreadState);
end;
Result := ReturnNone;
except
on E:Exception do
begin
DbxAPIModule.RaiseDBXError(E.Message);
end;
end;
end;
end else
begin
DbxAPIModule.RaiseDBXError('TransactionID should be specified');
Result := nil;
end;
end;
end;
end;
function TECFDbxConn._fetch_array(args: PPyObject): PPyObject;
var
APyResult, APyHeader, APyRecord, APyRecordLst, APyRecordTuple: PPyObject;
AThreadState: PPyThreadState;
ACount: Int64;
I: integer;
begin
with GetPythonEngine do
begin
Adjust(@Self);
Result := nil;
if not FSQLConn.Connection.Connected then
DbxAPIModule.RaiseDBXError('Not connected to the database')
else
begin
try
APyResult := PyTuple_New(3);
if APyResult = nil then
CheckError(False);
try
APyHeader := getHeaders;
PyTuple_SetItem(APyResult, 0, APyHeader);
except
Py_XDECREF(APyResult);
raise;
end;
ACount := 0;
APyRecordLst := PyList_New(0);
while not FSQLConn.Query.Eof do
begin
APyRecord := PyTuple_New(FSQLConn.Query.FieldCount);
if APyRecord = nil then
begin
Py_XDECREF(APyRecordLst);
Py_XDECREF(APyResult);
CheckError(False);
end;
for i := 0 to FSQLConn.Query.FieldCount - 1 do
PyTuple_SetItem(APyRecord, i, VariantAsPyObject(FSQLConn.Query.Fields[i].Value));
PyList_Append(APyRecordLst, APyRecord);
Inc(ACount);
AThreadState := PyEval_SaveThread;
FSQLConn.Query.Next;
PyEval_RestoreThread(AThreadState);
end;
APyRecordTuple := PyList_AsTuple(APyRecordLst);
Py_XDECREF(APyRecordLst);
if APyRecordTuple = nil then
begin
Py_XDECREF(APyResult);
CheckError(False);
end;
PyTuple_SetItem(APyResult, 1, PyLong_FromLong(ACount));
PyTuple_SetItem(APyResult, 2, APyRecordTuple);
Result := APyResult;
except
on E:Exception do
begin
DbxAPIModule.RaiseDBXError(E.Message);
end;
end;
end;
end;
end;
function TECFDbxConn._query(args: PPyObject): PPyObject;
var
AThreadState: PPyThreadState;
ASQL: PChar;
begin
with GetPythonEngine do
begin
Adjust(@Self);
Result := nil;
if not FSQLConn.Connection.Connected then
DbxAPIModule.RaiseDBXError('Not connected to the database')
else
begin
if (PyArg_ParseTuple( args, 's:DBXConn.query', [@ASQL] ) <> 0) then
begin
try
AThreadState := PyEval_SaveThread;
try
FSQLConn.Execute(String(ASQL));
finally
PyEval_RestoreThread(AThreadState);
end;
// Result := PyLong_FromLong(TProtCustomSQLDataSet(FSQLConn.Query).RowsAffected);
Result := PyLong_FromLong(1);
except
on E:Exception do
begin
DbxAPIModule.RaiseDBXError(E.Message);
end;
end;
end else
begin
DbxAPIModule.RaiseDBXError('TransactionID should be specified');
Result := nil;
end;
end;
end;
end;
function TECFDbxConn._rollbackTrans(args: PPyObject): PPyObject;
var
AThreadState: PPyThreadState;
ATransactionID: Integer;
ATrans: TDBXTransaction;
begin
with GetPythonEngine do
begin
Adjust(@Self);
Result := nil;
if not FSQLConn.Connection.Connected then
DbxAPIModule.RaiseDBXError('Not connected to the database')
else
begin
if (PyArg_ParseTuple( args, 'L:DBXConn.rollback', [@ATransactionID] ) <> 0) then
begin
ATrans := TDBXTransaction(Ptr(ATransactionID));
if ATrans = nil then
DbxAPIModule.RaiseDBXError('Invalid Transaction ID')
else
begin
try
AThreadState := PyEval_SaveThread;
try
FSQLConn.Connection.RollbackFreeAndNil(ATrans);
finally
PyEval_RestoreThread(AThreadState);
end;
Result := ReturnNone;
except
on E:Exception do
begin
DbxAPIModule.RaiseDBXError(E.Message);
end;
end;
end;
end else
begin
DbxAPIModule.RaiseDBXError('TransactionID should be specified');
Result := nil;
end;
end;
end;
end;
function TECFDbxConn._SetDriverName(args: PPyObject): PPyObject;
var
AValue : PChar;
begin
with GetPythonEngine do
begin
Adjust(@Self);
if (PyArg_ParseTuple( args, 's:DBXConn.driverName', [@AValue] ) <> 0) then
begin
FSQLConn.Connection.DriverName := String(AValue);
Result := ReturnNone;
end else
begin
DbxAPIModule.RaiseDBXError('Driver name should be specified');
Result := nil;
end;
end;
end;
function TECFDbxConn._SetGetDriverFunc(args: PPyObject): PPyObject;
var
AValue : PChar;
begin
with GetPythonEngine do
begin
Adjust(@Self);
if (PyArg_ParseTuple( args, 's:DBXConn.getDriverFunc', [@AValue] ) <> 0) then
begin
FSQLConn.Connection.GetDriverFunc := String(AValue);
Result := ReturnNone;
end else
begin
DbxAPIModule.RaiseDBXError('GetDriverFunction name should be specified');
Result := nil;
end;
end;
end;
function TECFDbxConn._SetLibraryName(args: PPyObject): PPyObject;
var
AValue : PChar;
begin
with GetPythonEngine do
begin
Adjust(@Self);
if (PyArg_ParseTuple( args, 's:DBXConn.libraryName', [@AValue] ) <> 0) then
begin
FSQLConn.Connection.LibraryName := String(AValue);
Result := ReturnNone;
end else
begin
DbxAPIModule.RaiseDBXError('Library name should be specified');
Result := nil;
end;
end;
end;
function TECFDbxConn._SetParams(args: PPyObject): PPyObject;
var
AKey, AValue : PChar;
begin
with GetPythonEngine do
begin
Adjust(@Self);
if (PyArg_ParseTuple( args, 'ss:DBXConn.setParam', [@AKey, @AValue] ) <> 0) then
begin
FSQLConn.Connection.Params.Add(Format('%s=%s', [String(AKey), String(AValue)]));
Result := ReturnNone;
end else
begin
DbxAPIModule.RaiseDBXError('Parameter name, and value should be specified');
Result := nil;
end;
end;
end;
function TECFDbxConn._VendorLib(args: PPyObject): PPyObject;
var
AValue : PChar;
begin
with GetPythonEngine do
begin
Adjust(@Self);
if (PyArg_ParseTuple( args, 's:DBXConn.vendorLib', [@AValue] ) <> 0) then
begin
FSQLConn.Connection.VendorLib := String(AValue);
Result := ReturnNone;
end else
begin
DbxAPIModule.RaiseDBXError('Vendor Library name should be specified');
Result := nil;
end;
end;
end;
end.
Sementara Python DBExpress API Wrappernya adalah sebagai berikut:
__author__ = "Jaimy Azle"
__version__ = '0.0.1'
import _dbxapi
import types
import string
import time
import datetime
# compliant with DB SIG 2.0
apilevel = '2.0'
# module may be shared, but not connections
threadsafety = 1
# this module use extended python format codes
paramstyle = 'pyformat'
class DBAPITypeObject:
def __init__(self,*values):
self.values = values
def __cmp__(self,other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
STRING = DBAPITypeObject(1)
BINARY = DBAPITypeObject(2)
NUMBER = DBAPITypeObject(3)
DATETIME = DBAPITypeObject(4)
DECIMAL = DBAPITypeObject(5)
class Warning(StandardError):
pass
class Error(StandardError):
pass
class InterfaceError(Error):
pass
class DatabaseError(Error):
pass
class DataError(DatabaseError):
pass
class OperationalError(DatabaseError):
pass
class IntegrityError(DatabaseError):
pass
class InternalError(DatabaseError):
pass
class ProgrammingError(DatabaseError):
pass
class NotSupportedError(DatabaseError):
pass
### cursor object
class pydbxCursor:
def __init__(self, src):
self.__source = src
self.description = None
self.rowcount = -1
self.arraysize = 1
self._result = []
self.__fetchpos = 0
def close(self):
self.__source = None
self.description = None
self.result = []
self.rowcount = -1
def execute(self, operation, params = None):
# "The parameters may also be specified as list of
# tuples to e.g. insert multiple rows in a single
# operation, but this kind of usage is depreciated:
if params and type(params) == types.ListType and \
type(params[0]) == types.TupleType:
self.executemany(operation, params)
else:
# not a list of tuples
self.executemany(operation, (params,))
def executemany(self, operation, param_seq):
self.description = None
self.rowcount = -1
self.__fetchpos = 0
# first try to execute all queries
totrows = 0
sql = ""
try:
for params in param_seq:
if params != None:
sql = _quoteparams(operation, params)
else:
sql = operation
# print sql
ret = self.__source.query(sql)
if ret == 1:
self._result = self.__source.fetch_array()
totrows = totrows + self._result[1]
else:
self._result = None
raise DatabaseError, "error: %s" % self.__source.errmsg()
except:
raise DatabaseError, "internal error: %s" % self.__source.errmsg()
# then initialize result raw count and description
self.description = None
self.rowcount = self._result[1]
def fetchone(self):
ret = self.fetchmany(1)
if ret: return ret[0]
else: return None
def fetchall(self):
return self._result[2][self.__fetchpos:]
def fetchmany(self, size = None, keep = 1):
if size == None:
size = self.arraysize
if keep == 1:
self.arraysize = size
res = self._result
if res[1]==self.__fetchpos:
return []
reslen = len(res[2][self.__fetchpos:])
if reslen < size:
size = res[1]
ret = res[2][self.__fetchpos:self.__fetchpos+size]
self.__fetchpos = self.__fetchpos + size
return ret
def setinputsizes(self, sizes):
pass
def setoutputsize(self, size, col = 0):
pass
def _quote(x):
if type(x) == types.StringType:
x = "'" + string.replace(str(x), "'", "''") + "'"
elif type(x) in (types.IntType, types.LongType, types.FloatType):
pass
elif x is None:
x = 'NULL'
# datetime quoting
# described under "Writing International Transact-SQL Statements" in BOL
# beware the order: isinstance(x,datetime.date)=True if x is
# datetime.datetime ! Also round x.microsecond to milliseconds,
# otherwise we get Msg 241, Level 16, State 1: Syntax error
elif isinstance(x, datetime.datetime):
x = "{ts '%04d-%02d-%02d %02d:%02d:%02d.%s'}" % \
(x.year,x.month, x.day,
x.hour, x.minute, x.second, x.microsecond / 1000)
elif isinstance(x, datetime.date):
x = "{d '%04d-%02d-%02d'}" % (x.year, x.month, x.day)
# alternative quoting by Luciano Pacheco
#elif hasattr(x, 'timetuple'):
# x = time.strftime('\'%Y%m%d %H:%M:%S\'', x.timetuple())
else:
#print "didn't like " + x + " " + str(type(x))
raise InterfaceError, 'do not know how to handle type %s' % type(x)
return x
def _quoteparams(s, params):
if hasattr(params, 'has_key'):
x = {}
for k, v in params.items():
x[k] = _quote(v)
params = x
else:
params = tuple(map(_quote, params))
return s % params
### connection object
class pydbxCnx:
def __init__(self, cnx):
self.__cnx = cnx
self.transid = None
try:
transid = self.__cnx.beginTrans()
self.__cnx.commit(transid)
except:
raise OperationalError, "invalid connection."
def close(self):
if self.__cnx == None:
raise OperationalError, "invalid connection."
self.__cnx.close()
self.__cnx = None
def commit(self):
if self.__cnx == None:
raise OperationalError, "invalid connection."
try:
if self.transid:
self.__cnx.commit(self.transid)
self.transid = self.__cnx.beginTrans()
except:
raise OperationalError, "can't commit."
def rollback(self):
if self.__cnx == None:
raise OperationalError, "invalid connection."
try:
if self.transid:
self.__cnx.rollback(self.transid)
self.transid = self.__cnx.beginTrans()
except:
raise OperationalError, "can't rollback."
def cursor(self):
if self.__cnx == None:
raise OperationalError, "invalid connection."
try:
return pydbxCursor(self.__cnx)
except:
raise OperationalError, "invalid connection."
# connects to a database
def connect(dsn = None, drv="", getdrvfnc="", libnm="", vlibnm=""):
# open the connection
dbdsn = dsn.strip()
if dbdsn != '':
dbdict = dict([x.split('=') for x in dsn.split(';')])
dbdrv = dbdict.pop('DriverName') if dbdict.has_key('DriverName') else drv
dbgetdrvfnc = dbdict.pop('GetDriverFunc') if dbdict.has_key('GetDriverFunc') else getdrvfnc
dblibnm = dbdict.pop('LibraryName') if dbdict.has_key('LibraryName') else libnm
dbvlibnm = dbdict.pop('VendorLibName') if dbdict.has_key('VendorLibName') else vlibnm
dbdsn = '\n'.join(['%s=%s' % (key, value) for key, value in dbdict.iteritems()])
else:
dbdrv = drv
dbgetdrvfnc = getdrvfnc
dblibnm = libnm
dbvlibnm = vlibnm
con = _dbxapi.getdbxconn(dbdrv, dbgetdrvfnc, dblibnm, dbvlibnm, dbdsn)
return pydbxCnx(con)
My name is 
No comments yet.
RSS feed for comments on this post. TrackBack URL