import mysql
from mysql.connector import pooling
import schedule
import time
import configparser
import datetime
从配置文件读取上次同步的最大ID
config = configparser.ConfigParser()
config.read('config.ini')
last_sync_id = config.getint('Sync', 'last_sync_id', fallback=0)
远程MySQL数据库连接信息
remote_db_config = {
"host": "192.168.0.10",
"database": "test",
"user": "root",
"password": "root",
"ssl_disabled": True
}
本地MySQL数据库连接信息
local_db_config = {
"host": "127.0.0.1",
"database": "test",
"user": "root",
"password": "root",
"ssl_disabled": True
}
创建连接池
remote_pool = pooling.MySQLConnectionPool(pool_name="remote_pool", pool_size=5, **remote_db_config)
local_pool = pooling.MySQLConnectionPool(pool_name="local_pool", pool_size=5, **local_db_config)
def sync_data():
global last_sync_id
try:
# 使用连接池获取远程MySQL数据库连接
remote_conn = remote_pool.get_connection()
remote_cursor = remote_conn.cursor()
# 查询远程MySQL数据库中的新数据,使用上次同步的最大ID作为比较条件
query = f"SELECT * FROM test WHERE id > {last_sync_id}"
remote_cursor.execute(query)
new_data = remote_cursor.fetchall()
if new_data:
# 使用连接池获取本地MySQL数据库连接
local_conn = local_pool.get_connection()
local_cursor = local_conn.cursor()
# 开启事务
local_conn.start_transaction()
# 批量插入新数据到本地MySQL数据库
insert_query = "INSERT IGNORE INTO test (id,qn,mn,payload,created_at) VALUES (%s,%s,%s,%s,%s)"
local_cursor.executemany(insert_query, new_data)
# 提交事务
local_conn.commit()
# 获取当前日期时间
current_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"synced {len(new_data)} records at {current_datetime}.")
# 更新上次同步的最大ID
last_sync_id = max(row[0] for row in new_data)
config.set('Sync', 'last_sync_id', str(last_sync_id))
with open('config.ini', 'w') as configfile:
config.write(configfile)
except Exception as e:
print(f"Error: {e}")
finally:
if remote_conn:
remote_conn.close()
if local_conn:
local_conn.close()
设置定时任务
schedule.every(10).seconds.do(sync_data)
while True:
schedule.run_pending()
time.sleep(10)