J.a.i.m’s Diary

Tulis ulang DBExpress wrapper for Python

Hari ini saya mencoba revisit ulang driver DBExpress wrapper untuk python yang kompatibel dengan DB-API, driver ini sebenarnya telah saya buat sebelumnya, namun sepertinya saat itu karena masih banyak instability issue atas project middleware server yang saya kerja, saya coba lepasin untuk meminimalisir permasalahan yang mungkin disebabkan karena driver ini.

Namun setelah host midleware server-nya mencapai tahapan maturity yang cukup baik, sepertinya ini saat yang cukup tepat untuk melakukan revisit, dan sepertinya bisa dikatakan tulis ulang karena kode yang saya buat sebelumnya banyak yang keliru dalam pemanfaatan Python API functionya. Meskipun belum sempat saya test lebih lanjut, kode dasar ini saya postingkan di sini sebagai alternatif pengarsipan.

Berikut adalah source code DBExpress Wrapper dalam Delphi:


unit dbxapi;

interface

uses
  SysUtils, Classes, DBCommonTypes, SqlExpr, Variants, ECFPythonEngine,
  ecfpyinf, FMTBcd, DB, DBXCommon;

type
  TECFDbxAccess = class(TComponent)
  private
    FConnection: TSQLConnection;
    FQuery: TCustomSQLDataSet;
  public
    constructor Create( AOwner : TComponent ); override;
    destructor  Destroy; override;

    procedure Execute(const SQL: WideString);

    property Connection: TSQLConnection read FConnection;
    property Query: TCustomSQLDataSet read FQuery;
  end;

  TECFDbxConn = class(TPyObject)
  private
    FSQLConn: TECFDbxAccess;

    function getHeaders: PPyObject;
  public
    constructor Create( APythonType : TPythonType ); override;
    destructor  Destroy; override;

    function _SetDriverName( args : PPyObject ) : PPyObject; cdecl;
    function _SetGetDriverFunc( args : PPyObject ) : PPyObject; cdecl;
    function _SetLibraryName( args : PPyObject ) : PPyObject; cdecl;
    function _SetParams( args : PPyObject ) : PPyObject; cdecl;
    function _VendorLib( args : PPyObject ) : PPyObject; cdecl;
    function _beginTrans( args : PPyObject ) : PPyObject; cdecl;
    function _commitTrans( args : PPyObject ) : PPyObject; cdecl;
    function _rollbackTrans( args : PPyObject ) : PPyObject; cdecl;
    function _query( args : PPyObject ) : PPyObject; cdecl;
    function _fetch_array( args : PPyObject ) : PPyObject; cdecl;
    function _close( args : PPyObject ) : PPyObject; cdecl;

    class procedure RegisterMethods( PythonType : TPythonType ); override;
  end;

  TPythonDbxConn = class( TPythonType)
  public
    constructor Create( AOwner : TComponent ); override;
  end;

  TECFDbxAPI = class(TECFPythonModule)
  private
    FPythonDBXConn: TPythonDbxConn;
  public
    function PyGetConnection( pyself, args : PPyObject ) : PPyObject; cdecl;

    procedure RaiseDBXError(const Msg: string);
  public
    constructor Create( AOwner : TComponent ); override;
    destructor  Destroy; override;

    procedure Initialize; override;
    procedure SetupObjectType;
  end;

  procedure CreateComponents( AOwner : TComponent; APyEngine: TPythonEngine );

implementation
uses
  ActiveX;

type
  TProtCustomSQLDataSet = class(TCustomSQLDataSet);

const
  TYPE_STRING	= 1;
  TYPE_BINARY	= 2;
  TYPE_NUMBER	= 3;
  TYPE_DATETIME	= 4;
  TYPE_DECIMAL	= 5;

var
  DbxAPIModule: TECFDbxAPI;

procedure CreateComponents( AOwner : TComponent; APyEngine: TPythonEngine );
begin
  with TECFDbxAPI.Create(AOwner) do
  begin
    Engine := APyEngine;
    SetupObjectType;
  end;
end;

{ TECFDbxAPI }

constructor TECFDbxAPI.Create(AOwner: TComponent);
begin
  inherited Create(AOwner);
  ModuleName := '_dbxapi';
  Name := 'ECFDbxAPI';
  with DocString do
    begin
      Add( 'This module contains several Object Types that' );
      Add( 'will let you work with the DBExpress and access' );
      Add( 'a database in a python DB-API compatible implementation.' );
      Add( '' );
      Add( 'getdbxconn(drv,getdrvfnc,libnm,vlibnm,params) -> Get dbx connection object' );
    end;
  with Errors.Add do
    Name := 'DBXError';
  DbxAPIModule := Self;
end;

destructor TECFDbxAPI.Destroy;
begin
  inherited Destroy;
end;

procedure TECFDbxAPI.Initialize;
begin
  AddDelphiMethod('getdbxconn',
    PyGetConnection,
    'getdbxconn(drv,getdrvfnc,libnm,vlibnm,params) -- get connection object with driver attached '+
    'on it.');
  inherited Initialize;
end;

function TECFDbxAPI.PyGetConnection(pyself, args: PPyObject): PPyObject;
var
  drv,getdrvfnc,libnm,vlibnm,params:PChar;
  AThreadState: PPyThreadState;
begin
  Result := nil;
  with GetPythonEngine do
  begin
    if Assigned(args) and (PyTuple_Size(args) > 0) and
      (PyArg_ParseTuple( args, 'sssss:', [@drv,@getdrvfnc,@libnm,@vlibnm,@params]) <> 0) then
    begin
      Result := FPythonDBXConn.CreateInstance;
      if (Result <> nil) then
      begin
        with TECFDbxConn(PythonToDelphi(Result)).FSQLConn do
        begin
          if Connection.Connected then
            Connection.Close;
          Connection.DriverName := String(drv);
          Connection.GetDriverFunc := String(getdrvfnc);
          Connection.LibraryName := String(libnm);
          Connection.VendorLib := String(vlibnm);
          Connection.Params.Text := String(params);
          Connection.LoginPrompt := False;
          try
            AThreadState := PyEval_SaveThread;
            try
              Connection.Open;
            finally
              PyEval_RestoreThread(AThreadState);
            end;
          except
            on E:Exception do
              begin
                Py_XDECREF(Result);
                RaiseDBXError(E.Message);
                Result := nil;
              end;
          end;
        end;
      end;
    end else
      RaiseDBXError('Connection could not be established');
  end;
end;

procedure TECFDbxAPI.RaiseDBXError(const Msg: string);
begin
  RaiseError('DBXError', Msg);
end;

procedure TECFDbxAPI.SetupObjectType;
begin
  FPythonDBXConn := TPythonDbxConn.Create(Self);
end;

{ TECFDbxAccess }

constructor TECFDbxAccess.Create(AOwner: TComponent);
begin
  inherited Create(AOwner);
  CoInitialize(nil);
  FConnection             := TSQLConnection.Create(Self);
  FConnection.LoginPrompt := False;
  FQuery                  := TCustomSQLDataSet.Create(Self);
  FQuery.SQLConnection    := FConnection;
end;

destructor TECFDbxAccess.Destroy;
begin
  FQuery.Free;
  FConnection.Free;
  inherited Destroy;
end;

procedure TECFDbxAccess.Execute(const SQL: WideString);
begin
  FConnection.ConnectionState := csStateExecuting;
  try
    CoInitialize(nil);
    FQuery.Active := False;
    TProtCustomSQLDataSet(FQuery).CommandText := SQL;
    FQuery.Active := True;
  finally
    FConnection.ConnectionState := csStateOpen;
  end;
end;

{ TPythonDbxConn }

constructor TPythonDbxConn.Create(AOwner: TComponent);
begin
  inherited Create(AOwner);
  if AOwner is TECFPythonModule then
  begin
    Module := TECFPythonModule(AOwner);
    Engine := Module.Engine;
  end;
  Name     := 'typeDbxConn';
  TypeName := 'DBXConn';
  PyObjectClass := TECFDbxConn;
  GenerateCreateFunction := False;
end;

{ TECFDbxConn }

constructor TECFDbxConn.Create(APythonType: TPythonType);
begin
  inherited Create(APythonType);
  FSQLConn := TECFDbxAccess.Create(nil);
end;

destructor TECFDbxConn.Destroy;
begin
  FSQLConn.Free;
  inherited Destroy;
end;

function TECFDbxConn.getHeaders: PPyObject;
var
  APyHeader, APyColumn: PPyObject;
  AColType: integer;
  AColName: string;
  I: Integer;
begin
  with GetPythonEngine do
  begin
    APyHeader := PyTuple_New(FSQLConn.Query.FieldCount);
    if APyHeader = nil then
      CheckError(False);
    for I := 0 to FSQLConn.Query.FieldCount - 1 do
    begin
      AColType := 0;
      case FSQLConn.Query.Fields[i].DataType of
        ftString, ftFixedChar, ftWideString, // 19..24
        ftFixedWideChar, ftWideMemo
            : AColType := TYPE_STRING;
        ftBlob, ftMemo, ftGraphic, ftFmtMemo, // 12..18
        ftBytes, ftVarBytes, ftADT, ftArray, ftTypedBinary
            : AColType := TYPE_BINARY;
        ftSmallint, ftInteger, ftWord, // 0..4
        ftBoolean, ftAutoInc, ftLargeint
            : AColType := TYPE_NUMBER;
        ftDate, ftTime, ftDateTime, // 5..11
        ftTimeStamp
            : AColType := TYPE_DATETIME;
        ftFloat, ftCurrency, ftBCD, ftFMTBcd
            : AColType := TYPE_DECIMAL;
      end;
      AColName := FSQLConn.Query.Fields[i].FieldName;
      APyColumn := PyTuple_New(2);
      if APyColumn = nil then
      begin
        Py_XDECREF(APyHeader);
        CheckError(False);
      end;
      PyTuple_SetItem(APyColumn, 0, PyString_FromString(PChar(AColName)));
      PyTuple_SetItem(APyColumn, 1, PyLong_FromLong(AColType));
      PyTuple_SetItem(APyHeader, I, APyColumn);
    end;
    Result := APyHeader;
  end;
end;

class procedure TECFDbxConn.RegisterMethods(PythonType: TPythonType);
begin
  inherited RegisterMethods(PythonType);
  with PythonType do
  begin
    AddMethod( 'driverName',  @TECFDbxConn._SetDriverName,
      'DBXConn.driverName(drvname) -> None' );
    AddMethod( 'getDriverFunc',  @TECFDbxConn._SetGetDriverFunc,
      'DBXConn.getDriverFunc(getdrvfunc) -> None' );
    AddMethod( 'libraryName',  @TECFDbxConn._SetLibraryName,
      'DBXConn.libraryName(libname) -> None' );
    AddMethod( 'setParam',  @TECFDbxConn._SetParams,
      'DBXConn.setParam(name, value) -> None' );
    AddMethod( 'vendorLib',  @TECFDbxConn._VendorLib,
      'DBXConn.vendorLib(vdrlib) -> None' );
    AddMethod( 'beginTrans',  @TECFDbxConn._beginTrans,
      'DBXConn.beginTrans() -> TransactionID' );
    AddMethod( 'commit',  @TECFDbxConn._commitTrans,
      'DBXConn.commit(TransactionID) -> None' );
    AddMethod( 'rollback',  @TECFDbxConn._rollbackTrans,
      'DBXConn.rollback(TransactionID) -> None' );
    AddMethod( 'query',  @TECFDbxConn._query,
      'DBXConn.query(SQL) -> affected rows' );
    AddMethod( 'fetch_array',  @TECFDbxConn._fetch_array,
      'DBXConn._fetch_array() -> array of result' );
    AddMethod( 'close',  @TECFDbxConn._close,
      'DBXConn.close) -> None' );
  end;
end;

function TECFDbxConn._beginTrans(args: PPyObject): PPyObject;
var
  AThreadState: PPyThreadState;
  ATransactionID: Integer;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    Result := nil;
    if not FSQLConn.Connection.Connected then
      DbxAPIModule.RaiseDBXError('Not connected to the database')
    else
    begin
      try
        AThreadState := PyEval_SaveThread;
        try
          ATransactionID := Integer(FSQLConn.Connection.BeginTransaction);
        finally
          PyEval_RestoreThread(AThreadState);
        end;
        Result := PyLong_FromLong(ATransactionID);
      except
        on E:Exception do
          begin
            DbxAPIModule.RaiseDBXError(E.Message);
          end;
      end;
    end;
  end;
end;

function TECFDbxConn._close(args: PPyObject): PPyObject;
var
  AThreadState: PPyThreadState;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    Result := nil;
    if not FSQLConn.Connection.Connected then
      DbxAPIModule.RaiseDBXError('Not connected to the database')
    else
    begin
      try
        AThreadState := PyEval_SaveThread;
        try
          if FSQLConn.Query.Active then
            FSQLConn.Query.Close;
          if FSQLConn.Connection.Connected then
            FSQLConn.Connection.Close;
        finally
          PyEval_RestoreThread(AThreadState);
        end;
        Result := ReturnNone;
      except
        on E:Exception do
          begin
            DbxAPIModule.RaiseDBXError(E.Message);
          end;
      end;
    end;
  end;
end;

function TECFDbxConn._commitTrans(args: PPyObject): PPyObject;
var
  AThreadState: PPyThreadState;
  ATransactionID: Integer;
  ATrans: TDBXTransaction;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    Result := nil;
    if not FSQLConn.Connection.Connected then
      DbxAPIModule.RaiseDBXError('Not connected to the database')
    else
    begin
      if (PyArg_ParseTuple( args, 'L:DBXConn.commit', [@ATransactionID] ) <> 0) then
      begin
        ATrans := TDBXTransaction(Ptr(ATransactionID));
        if ATrans = nil then
          DbxAPIModule.RaiseDBXError('Invalid Transaction ID')
        else
        begin
          try
            AThreadState := PyEval_SaveThread;
            try
              FSQLConn.Connection.CommitFreeAndNil(ATrans);
            finally
              PyEval_RestoreThread(AThreadState);
            end;
            Result := ReturnNone;
          except
            on E:Exception do
              begin
                DbxAPIModule.RaiseDBXError(E.Message);
              end;
          end;
        end;
      end else
      begin
        DbxAPIModule.RaiseDBXError('TransactionID should be specified');
        Result := nil;
      end;
    end;
  end;
end;

function TECFDbxConn._fetch_array(args: PPyObject): PPyObject;
var
  APyResult, APyHeader, APyRecord, APyRecordLst, APyRecordTuple: PPyObject;
  AThreadState: PPyThreadState;
  ACount: Int64;
  I: integer;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    Result := nil;
    if not FSQLConn.Connection.Connected then
      DbxAPIModule.RaiseDBXError('Not connected to the database')
    else
    begin
      try
        APyResult := PyTuple_New(3);
        if APyResult = nil then
          CheckError(False);
        try
          APyHeader := getHeaders;
          PyTuple_SetItem(APyResult, 0, APyHeader);
        except
          Py_XDECREF(APyResult);
          raise;
        end;
        ACount := 0;
        APyRecordLst := PyList_New(0);
        while not FSQLConn.Query.Eof do
        begin
          APyRecord := PyTuple_New(FSQLConn.Query.FieldCount);
          if APyRecord = nil then
          begin
            Py_XDECREF(APyRecordLst);
            Py_XDECREF(APyResult);
            CheckError(False);
          end;
          for i := 0 to FSQLConn.Query.FieldCount - 1 do
            PyTuple_SetItem(APyRecord, i, VariantAsPyObject(FSQLConn.Query.Fields[i].Value));
          PyList_Append(APyRecordLst, APyRecord);
          Inc(ACount);
          AThreadState := PyEval_SaveThread;
          FSQLConn.Query.Next;
          PyEval_RestoreThread(AThreadState);
        end;
        APyRecordTuple := PyList_AsTuple(APyRecordLst);
        Py_XDECREF(APyRecordLst);
        if APyRecordTuple = nil then
        begin
          Py_XDECREF(APyResult);
          CheckError(False);
        end;
        PyTuple_SetItem(APyResult, 1, PyLong_FromLong(ACount));
        PyTuple_SetItem(APyResult, 2, APyRecordTuple);
        Result := APyResult;
      except
        on E:Exception do
          begin
            DbxAPIModule.RaiseDBXError(E.Message);
          end;
      end;
    end;
  end;
end;

function TECFDbxConn._query(args: PPyObject): PPyObject;
var
  AThreadState: PPyThreadState;
  ASQL: PChar;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    Result := nil;
    if not FSQLConn.Connection.Connected then
      DbxAPIModule.RaiseDBXError('Not connected to the database')
    else
    begin
      if (PyArg_ParseTuple( args, 's:DBXConn.query', [@ASQL] ) <> 0) then
      begin
        try
          AThreadState := PyEval_SaveThread;
          try
            FSQLConn.Execute(String(ASQL));
          finally
            PyEval_RestoreThread(AThreadState);
          end;
          // Result := PyLong_FromLong(TProtCustomSQLDataSet(FSQLConn.Query).RowsAffected);
          Result := PyLong_FromLong(1);
        except
          on E:Exception do
            begin
              DbxAPIModule.RaiseDBXError(E.Message);
            end;
        end;
      end else
      begin
        DbxAPIModule.RaiseDBXError('TransactionID should be specified');
        Result := nil;
      end;
    end;
  end;
end;

function TECFDbxConn._rollbackTrans(args: PPyObject): PPyObject;
var
  AThreadState: PPyThreadState;
  ATransactionID: Integer;
  ATrans: TDBXTransaction;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    Result := nil;
    if not FSQLConn.Connection.Connected then
      DbxAPIModule.RaiseDBXError('Not connected to the database')
    else
    begin
      if (PyArg_ParseTuple( args, 'L:DBXConn.rollback', [@ATransactionID] ) <> 0) then
      begin
        ATrans := TDBXTransaction(Ptr(ATransactionID));
        if ATrans = nil then
          DbxAPIModule.RaiseDBXError('Invalid Transaction ID')
        else
        begin
          try
            AThreadState := PyEval_SaveThread;
            try
              FSQLConn.Connection.RollbackFreeAndNil(ATrans);
            finally
              PyEval_RestoreThread(AThreadState);
            end;
            Result := ReturnNone;
          except
            on E:Exception do
              begin
                DbxAPIModule.RaiseDBXError(E.Message);
              end;
          end;
        end;
      end else
      begin
        DbxAPIModule.RaiseDBXError('TransactionID should be specified');
        Result := nil;
      end;
    end;
  end;
end;

function TECFDbxConn._SetDriverName(args: PPyObject): PPyObject;
var
  AValue : PChar;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    if (PyArg_ParseTuple( args, 's:DBXConn.driverName', [@AValue] ) <> 0) then
    begin
      FSQLConn.Connection.DriverName := String(AValue);
      Result := ReturnNone;
    end else
    begin
      DbxAPIModule.RaiseDBXError('Driver name should be specified');
      Result := nil;
    end;
  end;
end;

function TECFDbxConn._SetGetDriverFunc(args: PPyObject): PPyObject;
var
  AValue : PChar;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    if (PyArg_ParseTuple( args, 's:DBXConn.getDriverFunc', [@AValue] ) <> 0) then
    begin
      FSQLConn.Connection.GetDriverFunc := String(AValue);
      Result := ReturnNone;
    end else
    begin
      DbxAPIModule.RaiseDBXError('GetDriverFunction name should be specified');
      Result := nil;
    end;
  end;
end;

function TECFDbxConn._SetLibraryName(args: PPyObject): PPyObject;
var
  AValue : PChar;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    if (PyArg_ParseTuple( args, 's:DBXConn.libraryName', [@AValue] ) <> 0) then
    begin
      FSQLConn.Connection.LibraryName := String(AValue);
      Result := ReturnNone;
    end else
    begin
      DbxAPIModule.RaiseDBXError('Library name should be specified');
      Result := nil;
    end;
  end;
end;

function TECFDbxConn._SetParams(args: PPyObject): PPyObject;
var
  AKey, AValue : PChar;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    if (PyArg_ParseTuple( args, 'ss:DBXConn.setParam', [@AKey, @AValue] ) <> 0) then
    begin
      FSQLConn.Connection.Params.Add(Format('%s=%s', [String(AKey), String(AValue)]));
      Result := ReturnNone;
    end else
    begin
      DbxAPIModule.RaiseDBXError('Parameter name, and value should be specified');
      Result := nil;
    end;
  end;
end;

function TECFDbxConn._VendorLib(args: PPyObject): PPyObject;
var
  AValue : PChar;
begin
  with GetPythonEngine do
  begin
    Adjust(@Self);
    if (PyArg_ParseTuple( args, 's:DBXConn.vendorLib', [@AValue] ) <> 0) then
    begin
      FSQLConn.Connection.VendorLib := String(AValue);
      Result := ReturnNone;
    end else
    begin
      DbxAPIModule.RaiseDBXError('Vendor Library name should be specified');
      Result := nil;
    end;
  end;
end;

end.

Sementara Python DBExpress API Wrappernya adalah sebagai berikut:


__author__ = "Jaimy Azle"
__version__ = '0.0.1'

import _dbxapi
import types
import string
import time
import datetime

# compliant with DB SIG 2.0
apilevel = '2.0'

# module may be shared, but not connections
threadsafety = 1

# this module use extended python format codes
paramstyle = 'pyformat'

class DBAPITypeObject:
  def __init__(self,*values):
    self.values = values
  def __cmp__(self,other):
    if other in self.values:
      return 0
    if other < self.values:
      return 1
    else:
      return -1

STRING = DBAPITypeObject(1)
BINARY = DBAPITypeObject(2)
NUMBER = DBAPITypeObject(3)
DATETIME = DBAPITypeObject(4)
DECIMAL = DBAPITypeObject(5)

class Warning(StandardError):
  pass

class Error(StandardError):
  pass

class InterfaceError(Error):
  pass

class DatabaseError(Error):
  pass

class DataError(DatabaseError):
  pass

class OperationalError(DatabaseError):
  pass

class IntegrityError(DatabaseError):
  pass

class InternalError(DatabaseError):
  pass

class ProgrammingError(DatabaseError):
  pass

class NotSupportedError(DatabaseError):
  pass

### cursor object

class pydbxCursor:

  def __init__(self, src):
    self.__source = src
    self.description = None
    self.rowcount = -1
    self.arraysize = 1
    self._result = []
    self.__fetchpos = 0

  def close(self):
    self.__source = None
    self.description = None
    self.result = []
    self.rowcount = -1

  def execute(self, operation, params = None):
    # "The parameters may also be specified as list of
    # tuples to e.g. insert multiple rows in a single
    # operation, but this kind of usage is depreciated:
    if params and type(params) == types.ListType and \
          type(params[0]) == types.TupleType:
      self.executemany(operation, params)
    else:
      # not a list of tuples
      self.executemany(operation, (params,))

  def executemany(self, operation, param_seq):
    self.description = None
    self.rowcount = -1
    self.__fetchpos = 0

    # first try to execute all queries
    totrows = 0
    sql = ""
    try:
      for params in param_seq:
        if params != None:
          sql = _quoteparams(operation, params)
        else:
          sql = operation
        # print sql
        ret = self.__source.query(sql)
        if ret == 1:
          self._result = self.__source.fetch_array()
          totrows = totrows + self._result[1]
        else:
            self._result = None
            raise DatabaseError, "error: %s" % self.__source.errmsg()
    except:
      raise DatabaseError, "internal error: %s" % self.__source.errmsg()

    # then initialize result raw count and description
    self.description = None
    self.rowcount = self._result[1]

  def fetchone(self):
    ret = self.fetchmany(1)
    if ret: return ret[0]
    else: return None

  def fetchall(self):
    return self._result[2][self.__fetchpos:]

  def fetchmany(self, size = None, keep = 1):
    if size == None:
      size = self.arraysize
    if keep == 1:
      self.arraysize = size
    res = self._result
    if res[1]==self.__fetchpos:
        return []
    reslen = len(res[2][self.__fetchpos:])
    if reslen < size:
        size = res[1]
    ret = res[2][self.__fetchpos:self.__fetchpos+size]
    self.__fetchpos = self.__fetchpos + size
    return ret

  def setinputsizes(self, sizes):
    pass

  def setoutputsize(self, size, col = 0):
    pass

def _quote(x):
  if type(x) == types.StringType:
    x = "'" + string.replace(str(x), "'", "''") + "'"
  elif type(x) in (types.IntType, types.LongType, types.FloatType):
    pass
  elif x is None:
    x = 'NULL'
  # datetime quoting
  # described under "Writing International Transact-SQL Statements" in BOL
  # beware the order: isinstance(x,datetime.date)=True if x is
  # datetime.datetime ! Also round x.microsecond to milliseconds,
  # otherwise we get Msg 241, Level 16, State 1: Syntax error
  elif isinstance(x, datetime.datetime):
    x = "{ts '%04d-%02d-%02d %02d:%02d:%02d.%s'}" % \
      (x.year,x.month, x.day,
      x.hour, x.minute, x.second, x.microsecond / 1000)
  elif isinstance(x, datetime.date):
    x = "{d '%04d-%02d-%02d'}" % (x.year, x.month, x.day)
  # alternative quoting by Luciano Pacheco
  #elif hasattr(x, 'timetuple'):
  # x = time.strftime('\'%Y%m%d %H:%M:%S\'', x.timetuple())
  else:
    #print "didn't like " + x + " " + str(type(x))
    raise InterfaceError, 'do not know how to handle type %s' % type(x)

  return x

def _quoteparams(s, params):
  if hasattr(params, 'has_key'):
    x = {}
    for k, v in params.items():
      x[k] = _quote(v)
    params = x
  else:
    params = tuple(map(_quote, params))
  return s % params

### connection object

class pydbxCnx:

  def __init__(self, cnx):
    self.__cnx = cnx
    self.transid = None
    try:
      transid = self.__cnx.beginTrans()
      self.__cnx.commit(transid)
    except:
      raise OperationalError, "invalid connection."

  def close(self):
    if self.__cnx == None:
      raise OperationalError, "invalid connection."
    self.__cnx.close()
    self.__cnx = None

  def commit(self):
    if self.__cnx == None:
      raise OperationalError, "invalid connection."
    try:
      if self.transid:
        self.__cnx.commit(self.transid)
      self.transid = self.__cnx.beginTrans()
    except:
      raise OperationalError, "can't commit."

  def rollback(self):
    if self.__cnx == None:
      raise OperationalError, "invalid connection."
    try:
      if self.transid:
        self.__cnx.rollback(self.transid)
      self.transid = self.__cnx.beginTrans()
    except:
      raise OperationalError, "can't rollback."

  def cursor(self):
    if self.__cnx == None:
      raise OperationalError, "invalid connection."
    try:
      return pydbxCursor(self.__cnx)
    except:
      raise OperationalError, "invalid connection."

# connects to a database
def connect(dsn = None, drv="", getdrvfnc="", libnm="", vlibnm=""):
  # open the connection
  dbdsn = dsn.strip()
  if dbdsn != '':
    dbdict = dict([x.split('=') for x in dsn.split(';')])
    dbdrv = dbdict.pop('DriverName') if dbdict.has_key('DriverName') else drv
    dbgetdrvfnc = dbdict.pop('GetDriverFunc') if dbdict.has_key('GetDriverFunc') else getdrvfnc
    dblibnm = dbdict.pop('LibraryName') if dbdict.has_key('LibraryName') else libnm
    dbvlibnm = dbdict.pop('VendorLibName') if dbdict.has_key('VendorLibName') else vlibnm
    dbdsn = '\n'.join(['%s=%s' % (key, value) for key, value in dbdict.iteritems()])
  else:
    dbdrv = drv
    dbgetdrvfnc = getdrvfnc
    dblibnm = libnm
    dbvlibnm = vlibnm
  con = _dbxapi.getdbxconn(dbdrv, dbgetdrvfnc, dblibnm, dbvlibnm, dbdsn)
  return pydbxCnx(con)

C.O.M.M.E.N.T.S:

Listed below are comments from my friends regarding to this entry. If you wish, you may also add a comment by clicking new comment.

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment:








 

About Me

Jaimy Azle My name is
Jaimy Azle, hufflepuff, I am a software developer with Delphi and C/C++ as major programming language. Living in Ungaran, Jawa Tengah.

Search

Categories:

Tag Cloud

Archives:

Meta:


Get Firefox!
Perhatian: Informasi yang terdapat disini bener-bener hanya merepresentasikan cerita, pengalaman, ataupun pandangan yang merupakan opini pribadi dari saya sebagai ego. Segala hal yang bersifat pandangan ke depan sebenarnya lebih bersifat spekulatif dan bisa saja berobah meski saya bukan bunglon.