- 脚本实现了将非utf8编码的文本去掉,自动按照csv文件的字段上传数据到mysql,并且在上传时检测数据是否重复,重复则不上传.
代码实现
conn = pymysql.connect(host=host, user=user, password=password, database=database)
host = 'localhost' # MySQL主机地址
user = 'root' # MySQL用户名
password = '123456' # MySQL密码
database = 'root' # MySQL数据库名
cursor = conn.cursor()
cursor.execute('sql语句')
- 如果执行的语句不是查询,而是有写入等功能,需要提交
conn.commit()
import csv
from pydoc import plain
from typing import ValuesView
from venv import create
import pymysql
import re
import time
def remove_non_utf8(text):
# 使用正则表达式匹配非UTF-8字符并替换为空字符串
utf8_text = re.sub(r'[^\x00-\x7F\u4E00-\u9FA5]+', '', text)
return utf8_text
def import_csv_to_mysql(csv_file, host, user, password, database, table_name):
try:
conn = pymysql.connect(host=host, user=user, password=password, database=database)
cursor = conn.cursor()
except:
print("数据库连接失败")
time.sleep(10000)
try:
with open(csv_file, 'r',encoding="utf-8-sig") as file:
# 读取CSV文件数据
csv_data = csv.reader(file)
headers = next(csv_data) # 获取表头
# 创建表
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({','.join([f'{header} TEXT' for header in headers])})"
cr1="ALTER TABLE %s CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"%table_name
cursor.execute(create_table_query)
cursor.execute(cr1)
print(headers)
# 查询已存在的数据
existing_data_query = f"SELECT * FROM {table_name}"
cursor.execute(existing_data_query)
existing_data = cursor.fetchall()
# 将已存在的数据保存为集合以便进行快速查找
existing_data_set = set(tuple(row) for row in existing_data)
# 插入数据
insert_query = f"INSERT INTO {table_name}({','.join(headers)}) VALUES({','.join(['%s'] * len(headers))})"
inserted_rows = 0
duplicate_rows = 0
csv_data=list(csv_data)
for j in range(0,len(csv_data)-1):
row=csv_data[j]
for i in range(0,len(row)-1):
row[i]=remove_non_utf8(row[i])
#print(row)
for row in csv_data:
if tuple(row) in existing_data_set:
duplicate_rows += 1
else:
cursor.execute(insert_query, tuple(row))
inserted_rows += 1
# 提交事务
conn.commit()
print(f"数据成功导入表: {table_name}")
conn = pymysql.connect(host=host, user=user, password=password, database=database)
#cursor = conn.cursor()
# cursor.execute(new2)
#conn.commit()
except Exception as e:
print(f"导入数据出错: {str(e)}")
conn.rollback()
#conn = pymysql.connect(host=host, user=user, password=password, database=database)
#cursor = conn.cursor()
# cursor.execute(new3)
#conn.commit()
finally:
# 关闭数据库连接
cursor.close()
conn.close()
host = 'localhost' # MySQL主机地址
user = 'root' # MySQL用户名
password = '123456' # MySQL密码
database = 'root' # MySQL数据库名
# cr1 = 'drop table flag;'
# cr2 = f"CREATE TABLE IF NOT EXISTS flag(status int(1),nouse TEXT);"
# cha = "SELECT status FROM flag;"#看要不要加其他程序操作数据库时暂停一下的功能
# pre1 = 'ALTER TABLE flag DROP COLUMN status;'
# pre2 = 'INSERT INTO flag (status) VALUES (1);'
# new1 = "UPDATE flag SET status = 3;"
# new2 = "UPDATE flag SET status = 4;"
# new3 = "UPDATE flag SET status = 5;"
conn = pymysql.connect(host=host, user=user, password=password, database=database)
cursor = conn.cursor()
# try:
# cursor.execute(cr1)
# conn.commit()
# except:
# cursor.execute(cr2)
# conn.commit()
# cursor.execute(cr2)
# conn.commit()
# cursor.execute(pre2)
# conn.commit()
# cursor.execute(new1)
# conn.commit()
csv_file = 'wtmp.csv' # CSV文件路径
table_name = 'r5' # 数据库表名
import_csv_to_mysql(csv_file, host, user, password, database, table_name)
time.sleep(60)