思路:
1. 获取表结构
2. 根据表字段类型利用faker生成数据
3. 拼装insert sql
4. 连接db执行插入sql
1. 获取表结构
def get_table_schema(self, table, database):
'''
获取业务库表的表结构
:param table: 表名
:param database: 数据库名
:return:
'''
try:
schema_sql ='select COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE,IS_NULLABLE from information_schema.columns where table_schema ="{database}" and table_name="{table}";'
schema_sql = schema_sql.format(database=database, table=table)
print(schema_sql)
schema_info =self.exec_sql(schema_sql)
print(schema_info)
return schema_info
except Exception as e:
print("执行sql失败:%s" % e)
traceback.print_exc()
return None
2. 根据表字段类型利用faker生成数据
def gen_data_per_schema(self, schema_info):
'''
根据业务库表的表字段类型和长度,生成插入数据
:param schema_info: 业务库的表字段数据
:return: 插入数据的list
'''
fake = Faker()
char_type = ['char','varchar']
text_type = ['text','longtext']
int_type = ['tinyint','int']# ,'smallint','bigint'
float_type = ['float']
decimal_type = ['decimal']
date_type = ['datetime','date','timestamp','time']
row_data = []
# COLUMN_NAME,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,NUMERIC_PRECISION,NUMERIC_SCALE
for field_infoin schema_info:
field_type = field_info['DATA_TYPE']
if ('id' in field_info['COLUMN_NAME'].lower()or 'guid' in field_info['COLUMN_NAME'].lower())and field_typein char_type:
row_data.append('"'+fake.uuid4()+'"')
elif field_typein char_type:
# len = int(field_info['CHARACTER_MAXIMUM_LENGTH'])
row_data.append('"'+fake.pystr(min_chars=None, max_chars=int(field_info['CHARACTER_MAXIMUM_LENGTH']))+'"')
elif field_typein text_type:
row_data.append('"'+fake.text()+'"')
elif field_typein int_type:
# numerify():三位随机数字
# random_digit():0~9随机数
# random_digit_not_null():1~9的随机数
# random_int():随机数字,默认0~9999,可以通过设置min,max来设置
# random_number():随机数字,参数digits设置生成的数字位数
# pyfloat():
# left_digits=5 #生成的整数位数, right_digits=2 #生成的小数位数, positive=True #是否只有正数
# pyint():随机Int数字(参考random_int()参数)
# pydecimal():随机Decimal数字(参考pyfloat参数)
row_data.append(fake.random_digit_not_null)
elif field_typein float_type:
row_data.append(str(fake.pyfloat(left_digits=(field_info['NUMERIC_PRECISION']-field_info['NUMERIC_SCALE']),
right_digits=field_info['NUMERIC_SCALE'],
positive=True)))
elif field_typein decimal_type:
row_data.append(
str(fake.pydecimal(left_digits=(field_info['NUMERIC_PRECISION']-field_info['NUMERIC_SCALE']),
right_digits=field_info['NUMERIC_SCALE'],
positive=True)))
elif field_typein date_type:
row_data.append('"'+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+'"')
else:
row_data.append('NULL')
# insert_data.append(row_data)
# print(row_data)
return row_data
3. 拼装insert sql
def insert_data_to_rds_table(self, table, database,record_num=1):
# 获取rds的表结构
# 表结构信息:[{'COLUMN_NAME': 'OrderGUID', 'DATA_TYPE': 'char', 'CHARACTER_MAXIMUM_LENGTH': 36}]
schema_info =self.get_table_schema(table,database)
# 根据表结构的字段类型和长度构建数据
insert_data = []
for iin range(record_num):
row_date =self.gen_data_per_schema(schema_info)
insert_data.append(row_date)
# 根据字典拼装插入sql,支持单条和批量插入,批量限制条数?
table_sql = database +'.' + table
columns = []
for columnin schema_info:
columns.append(column['COLUMN_NAME'])
columns_sql =','.join(columns)
values_sql =''
for rowin insert_data:
value_sql =','.join(row)
value_sql ='(' + value_sql +'),'
values_sql += value_sql
values_sql = values_sql[:-1]
insert_sql ='insert into %s (%s) values %s; ' % (table_sql, columns_sql, values_sql)
print('插入语句:%s' % insert_sql)
print("开始执行插入sql")
try:
result =self.insert_data(insert_sql)
print("插入结果:" +str(result))
print("插入%s条记录到sql成功到表成功:%s" % (record_num, table))
except Exception as e:
print("插入rds sql失败:%s" % e)
traceback.print_exc()