1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
import sys, cx_Oracle, datetime, time import cassandra.cluster import cassandra.concurrent from itertools import islice
class TooManyArgsError(Exception): """Err type for too many arguments.""" pass
def main(argv): oracleLoginInfo = u'system/qscf12356@192.168.0.120:1521/orcl' numFetch = 5000
cassandraIpAddrs = [u'192.168.0.161', u'192.168.0.162', \ u'192.168.0.163', u'192.168.0.164']; pythonCassTypeMap = dict([["str", "text"], ["int", "int"], \ ["float", "double"], ["datetime", "timestamp"]])
if len(argv) is 0: tableListFN = 'dataTable.csv' print """The default filename of the table list is '%s'.""" % tableListFN elif len(argv) > 1: raise TooManyArgsError('Too many input arguments!') else: tableListFN = argv[0]
print 'The filename you input is %s.' % tableListFN fieldnames = None with open(tableListFN) as csvfile: firstRow = csvfile.readlines(1) fieldnames = tuple(firstRow[0].strip('\n').split(","))
tableList = list() with open(tableListFN) as csvfile: for row in islice(csvfile, 1, None): values = [elem.upper() for elem in row.strip('\n').split(",")] tableList.append(dict(zip(fieldnames, values)))
if len(tableList) is 0: print 'There is no data in %s.' % tableListFN
for row in tableList: print 'Now is upload the dataset %s.%s...' % \ (row['oracleSchema'].lower(), row['oracleTblName'].lower()) oracleConn = cx_Oracle.connect(oracleLoginInfo) oracleCursor = oracleConn.cursor() oracleSql = "select rowid,t.* from %s.%s t" % \ (row['oracleSchema'], row['oracleTblName']) oracleCursor.execute(oracleSql)
casCon = cassandra.cluster.Cluster(cassandraIpAddrs) casSession = casCon.connect()
casSession.execute("""CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '2'} """ % row['cassandraKeyspace']) casSession.set_keyspace(row['cassandraKeyspace'].lower())
oracleRows = oracleCursor.fetchmany(numFetch) colDataType = [type(r).__name__ for r in oracleRows[0]] cassType = [pythonCassTypeMap[dataType] for dataType in colDataType] oracleColNames = [x[0].lower() for x in oracleCursor.description]
casSession.execute('''CREATE TABLE IF NOT EXISTS {}.{} ({}, PRIMARY KEY (rowid))'''.format(\ row['cassandraKeyspace'].lower(), row['cassandraTblName'].lower(),\ ','.join([x + ' ' + y for (x,y) in zip(oracleColNames, cassType)])))
insertCQL = """INSERT INTO {}.{} ({}) VALUES ({})""".format(\ row['cassandraKeyspace'].lower(), row['cassandraTblName'].lower(), \ ','.join(oracleColNames), \ ','.join(['%s' for x in range(len(oracleColNames))])) preparedCQL = casSession.prepare(insertCQL.replace('%s', '?'))
print 'start upload at %s...' % datetime.datetime.now() st = time.clock()
numRowsOracle = len(oracleRows) while len(oracleRows) is not 0: oracleRows = oracleCursor.fetchmany(numFetch) cassandra.concurrent.execute_concurrent_with_args(casSession, preparedCQL, oracleRows, concurrency = 50) numRowsOracle += len(oracleRows)
print 'End upload at %s...' % datetime.datetime.now() print 'The total upload time for %i cells is %s seconds...\n' % \ (numRowsOracle * len(oracleColNames), time.clock() - st)
casCon.shutdown() oracleCursor.close() oracleConn.close()
if __name__ == "__main__": main(sys.argv[1:])
|