不懂编程的同事往往拿给我们的都是csv,xls,excel等这样的文件,那么如果需要存入数据库就要进行大量的导入操作,这是实在是有点烦啊所以这里就简单的写个方法来处理这些东东.
首先需要了解的就是sqlalchemy,如果动态创建一个表呢
type("", (Base, ), table_col) type很重要,它能够将数据转换成需要的类,继承自
from sqlalchemy.ext.declarativeimport declarative_base
Base = declarative_base()
它就能动态的创建一个表,table_col是一系列的列名dict类型,比如
example = Column(String(255))
table_col还需要一个key就是__tablename__来定义表的名字
import pandasas pd
data = pd.read_csv(filepath, na_values = ['null'])
columns = data.columns
table_col['__tablename__'] = tablename
其次需要了解的包就是pandas 对,用read_csv来读取数据,获取列名
import pandasas pd
data = pd.read_csv(filepath, na_values = ['null'])
columns = data.columns
这里有个异常的问题,需要来处理,表已经定义了会报错defined extends_exists什么的
table_col['__table_args__'] = {'extend_existing':True} # 用这个来处理下
那么就可以创建一个表了
Base.metadata.create_all(engine)
最后一步,插入csv文件里所有的数据到这个表中
for k, vin data.iterrows():
item = {}
for k, vin dict(v).items():
if str(v) =='nan':
item[k] =''
else:
item[k] = vif vis not None else ''
print('item: ', item)
insert_data(table, item)
所有代码如下:
from sqlalchemy.ext.declarativeimport declarative_base
from sqlalchemyimport Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.ormimport sessionmaker, relationship
from sqlalchemyimport create_engine
from sqlalchemy.excimport IntegrityError
engine = create_engine("mssql+pymssql://sa:yourpassword@yourhost/WebApp?charset=utf8", encoding ='utf8', pool_size =100, pool_timeout =1000)
Base = declarative_base()
DBSession = sessionmaker(bind = engine)
def init_db():# 创建表
Base.metadata.create_all(engine)
def drop_db():# 删除表
Base.metadata.drop_all(engine)
def create_twe_detail_db(tablename, filepath):
'''读取csv文件用它来新建一个表将里面的数据写入表里'''
import pandasas pd
data = pd.read_csv(filepath, na_values = ['null'])
columns = data.columns
# 创建表
table_col = {}
table_col['__tablename__'] = tablename
table_col['__table_args__'] = {'extend_existing':True}
for cin columns:
if c =='统一信用代码':
table_col[c] = Column(String(255), primary_key=True)
else:
table_col[c] = Column(String(255))
table =type("", (Base, ), table_col)
init_db()
def insert_data(table, data):
"""
插入数据
:param table:表 对象类
:param data: 数据dict
:return:
"""
try:
session = DBSession()
session.add(table(**data))
session.commit()
session.close()
except IntegrityError:
pass
for k, vin data.iterrows():
item = {}
for k, vin dict(v).items():
if str(v) =='nan':
item[k] =''
else:
item[k] = vif vis not None else ''
print('item: ', item)
insert_data(table, item)
if __name__ =='__main__':
create_twe_detail_db('suspect', r'E:\Data\targetinfo_five.csv')