- 脚本实现了将非utf8编码的文本去掉,自动按照csv文件的字段上传数据到mysql,并且在上传时检测数据是否重复,重复则不上传.
代码实现
conn = pymysql.connect(host=host, user=user, password=password, database=database)
host = 'localhost'
user = 'root'
password = '123456'
database = 'root'
cursor = conn.cursor()
cursor.execute('sql语句')
- 如果执行的语句不是查询,而是有写入等功能,需要提交
conn.commit()
import csv
from pydoc import plain
from typing import ValuesView
from venv import create
import pymysql
import re
import time
def remove_non_utf8(text):
utf8_text = re.sub(r'[^\x00-\x7F\u4E00-\u9FA5]+', '', text)
return utf8_text
def import_csv_to_mysql(csv_file, host, user, password, database, table_name):
try:
conn = pymysql.connect(host=host, user=user, password=password, database=database)
cursor = conn.cursor()
except:
print("数据库连接失败")
time.sleep(10000)
try:
with open(csv_file, 'r',encoding="utf-8-sig") as file:
csv_data = csv.reader(file)
headers = next(csv_data)
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ({','.join([f'{header} TEXT' for header in headers])})"
cr1="ALTER TABLE %s CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"%table_name
cursor.execute(create_table_query)
cursor.execute(cr1)
print(headers)
existing_data_query = f"SELECT * FROM {table_name}"
cursor.execute(existing_data_query)
existing_data = cursor.fetchall()
existing_data_set = set(tuple(row) for row in existing_data)
insert_query = f"INSERT INTO {table_name}({','.join(headers)}) VALUES({','.join(['%s'] * len(headers))})"
inserted_rows = 0
duplicate_rows = 0
csv_data=list(csv_data)
for j in range(0,len(csv_data)-1):
row=csv_data[j]
for i in range(0,len(row)-1):
row[i]=remove_non_utf8(row[i])
for row in csv_data:
if tuple(row) in existing_data_set:
duplicate_rows += 1
else:
cursor.execute(insert_query, tuple(row))
inserted_rows += 1
conn.commit()
print(f"数据成功导入表: {table_name}")
conn = pymysql.connect(host=host, user=user, password=password, database=database)
except Exception as e:
print(f"导入数据出错: {str(e)}")
conn.rollback()
finally:
cursor.close()
conn.close()
host = 'localhost'
user = 'root'
password = '123456'
database = 'root'
conn = pymysql.connect(host=host, user=user, password=password, database=database)
cursor = conn.cursor()
csv_file = 'wtmp.csv'
table_name = 'r5'
import_csv_to_mysql(csv_file, host, user, password, database, table_name)
time.sleep(60)