1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
import StringIO, csv, cx_Oracle, sys, os, datetime, time
class LengthNotEqualErr(Exception): """Err type for too many arguments.""" pass
def getFieldnames(csvFile): """ Read the first row and store values in a tuple """ with open(csvFile) as csvfile: firstRow = csvfile.readlines(1) fieldnames = tuple(firstRow[0].strip('\n').split(",")) return fieldnames
def parseValues(values, dataType = None): from ast import literal_eval import re regxpPattern = re.compile('\d+[^.\d]+')
for i, value in enumerate(values): if (dataType is None) or (dataType[i] == 'float' or dataType[i] == 'int'): try: if len(regxpPattern.findall(value)) is 0: nValue = literal_eval(value) values[i] = nValue except ValueError: pass except SyntaxError: pass return values
def readCsvWithTypeConv(csvFileName, dataType = None): """ Convert csv rows into an array of dictionaries All data types are automatically checked and converted """ from itertools import islice fieldnames = getFieldnames(csvFileName) dataTypeMap = None if dataType is not None: dataTypeMap = [dataType[x] for x in list(fieldnames)] if len(dataType) is not len(fieldnames): raise LengthNotEqualErr('The length of dataType must be equal to the lenght of fieldnames.') cursor = [] with open(csvFileName) as csvFile: for row in islice(csvFile, 1, None): values = parseValues(row.strip('\n').split(","), dataTypeMap) cursor.append(dict(zip(fieldnames, values))) return cursor
if __name__ == "__main__": oracleSystemLoginInfo = u'system/qscf12356@192.168.0.120:1521/orcl' pythonOracleTypeMap = dict([["str", "VARCHAR(255)"], ["int", "NUMBER"], \ ["float", "FLOAT"]])
tableList = """dataName,path,uploadShema,uploadTblName iris,testData/iris.csv,datasets_1,iris forestfires,testData/forestfires.csv,datasets_1,forestfires car,testData/car.csv,datasets_1,car credit,testData/credit.csv,datasets_2,credit adult,testData/adult.csv,datasets_2,adult"""
fid = StringIO.StringIO(tableList) reader = csv.reader(fid, delimiter=',') dataTableList = list(); for i,row in enumerate(reader): if i > 0: dataTableList.append(row)
oracleConn = cx_Oracle.connect(oracleSystemLoginInfo) oracleCursor = oracleConn.cursor()
oracleCursor.execute("SELECT USERNAME FROM all_users") orclUserNames = [x[0] for x in oracleCursor.fetchall()] uniqueUserNames = list(set([x for x in ['C##' + y[2].upper() for y in dataTableList]])) nonexistOrcl = [x not in orclUserNames for x in uniqueUserNames]
oracleCursor.execute('''SELECT t.TABLE_NAME FROM all_tables t WHERE t.OWNER NOT IN ('SYSTEM', 'SYS', 'MDSYS', 'LBACSYS', 'CTXSYS', 'WMSYS', 'XDB', 'APPQOSSYS', 'ORDSYS', 'OJVMSYS', 'DVSYS')''') oracleAllTables = [x[0] for x in oracleCursor.fetchall()]
if any(nonexistOrcl): print 'Please create users first with following SQL:' for i,x in enumerate(uniqueUserNames): if nonexistOrcl[i]: print ''' CREATE TABLESPACE userNameVar DATAFILE 'userNameVar.dat' SIZE 40M REUSE AUTOEXTEND ON;
CREATE TEMPORARY TABLESPACE userNameVar_tmp TEMPFILE 'userNameVar_tmp.dbf' SIZE 10M REUSE AUTOEXTEND ON;
CREATE USER C##userNameVar IDENTIFIED BY userNameVar DEFAULT TABLESPACE userNameVar TEMPORARY TABLESPACE userNameVar_tmp quota unlimited on userNameVar;
GRANT CREATE SESSION, CREATE TABLE, CREATE VIEW, CREATE PROCEDURE TO C##userNameVar;'''.replace('userNameVar', x.replace('C##', '')) sys.exit(0)
oracleCursor.close() oracleConn.close()
for row in dataTableList: print "Now uplaod {} data to Oracle DB...".format(row[0])
if os.path.isfile(row[1]): dataTable = readCsvWithTypeConv(row[1]) colNames = [name.replace(' ', '_').replace('-', '_').upper() for name in dataTable[0].keys()] colDataType = [type(r).__name__ for r in dataTable[0].values()] newDataType = [type(r).__name__ for r in dataTable[0].values()] convert = False for rowDataTbl in dataTable: otherRowDataType = [type(r).__name__ for r in rowDataTbl.values()] typeNotEqual = [x != y for x,y in zip(colDataType, otherRowDataType)] if any(typeNotEqual): missConversion = [(i, otherRowDataType[i], colDataType[i]) for\ i, x in enumerate(typeNotEqual) if x] for i,x,y in missConversion: if x != y: if x == 'str' or y == 'str': newDataType[i] = 'str' elif (x == 'float' and y == 'int') or (x == 'float' and y == 'int'): newDataType[i] = 'float' if any([x != y for x,y in zip(colDataType, newDataType)]): dataTable = readCsvWithTypeConv(row[1], \ dict(zip(dataTable[0].keys(), newDataType))) colDataType = newDataType oracleType = [pythonOracleTypeMap[dataType] for dataType in colDataType]
orclLogin = oracleSystemLoginInfo.replace('system', 'C##' + row[2].upper())\ .replace('qscf12356', row[2]) oracleConn = cx_Oracle.connect(orclLogin) oracleCursor = oracleConn.cursor()
if row[3].upper() not in oracleAllTables: oracleCursor.execute('''CREATE TABLE {}({})'''.format(row[3].upper(), \ ','.join([x + ' ' + y for (x,y) in zip(colNames, oracleType)])))
print 'start upload at %s...' % datetime.datetime.now() st = time.clock()
oracleCursor.executemany( '''INSERT INTO {}({}) values ({})'''.format(row[3], ','.join(colNames), \ ','.join([':{}'.format(i) for i in range(len(colNames))])), \ [d.values() for d in dataTable])
print 'End upload at %s...' % datetime.datetime.now() print 'The total upload time for %i cells is %s seconds...\n' % \ (len(dataTable) * len(dataTable[0]), time.clock() - st)
oracleConn.commit()
oracleCursor.close() oracleConn.close()
|