Python高效处理excel实例–这鱼是非摸不可了

领导静悄悄地走到我的身旁,拍了拍我的肩膀
我感觉后背发凉,有一种天将降大任于斯人也的慌张
领导嘴角微扬:“给你发了几份设备台账,尽快处理好了发到我邮箱”
我颤抖着点开微信弹窗,真想回一句:“这尼玛哪是几份的数量,难怪你的头这么反光!”

需求分析

吐槽归吐槽,活始终得干,看一下初始的excel的样子:

20250922082315955-image

类似的“几份”excel放在同一个文件夹下:

20250922082337429-image

要做以下几个处理:
  • 按照设备序列号分组,每个设备的信息合并成一行
  • 每个设备的那一行,将记录日期、类型1和类型2中的参数拼接起来
  • 批量处理文件夹下所有excel,命名规则:原文件名_xx年{上个月}月

分步搞

读取、分组合并
用pandas模块读取excel,根据第二列的设备ID来分组,处理分组数据时分为两部分(设备名称ID和设备参数信息)来处理,注意空白单元格的处理,处理后将两部分拼接后保存

注意

脚本中的两个第三方库,使用前需要安装

pip install pandas openpyxl

 

不要急着把处理结果发送给可爱的领导

适时告知领导这活可不好干,“尽快”不了

干活中途,多喝水,多走动,多摸鱼

完整代码

# -*- coding: utf-8 -*-
"""
@Time    : 2025/8/10 9:30
@Author  : 数据处理优化版
@功能说明: 批量处理Excel设备台账,支持按序列号分组合并、自定义表头与输出命名
"""
import datetime
import logging
from pathlib import Path
from typing import Union, List

import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Alignment
from openpyxl.workbook.workbook import Workbook
from openpyxl.worksheet.worksheet import Worksheet

# -------------------------- 配置参数(可根据实际需求修改) --------------------------
# Excel结构配置
HEADER_ROW_INDEX = 1  # 列名所在行(原数据第二行,索引从0开始)
DATA_START_ROW = 2    # 数据开始行(原数据第三行,索引从0开始)
GROUP_COL_INDEX = 1   # 分组列(设备序列号,第二列,索引从0开始)
MERGE_SYMBOL = "|"    # 多记录拼接符号
# 表头合并配置(行从1开始,列从1开始)
HEADER_MERGE_CONFIG = [
    {"start_row": 1, "start_col": 4, "end_row": 1, "end_col": 6, "value": "类型1"},
    {"start_row": 1, "start_col": 7, "end_row": 1, "end_col": 11, "value": "类型2"}
]
# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)


def read_excel_data(file_path: Union[str, Path]) -> pd.DataFrame:
    """
    读取Excel文件,提取列名与数据
    :param file_path: Excel文件路径
    :return: 包含列名与数据的DataFrame
    :raises FileNotFoundError: 文件不存在时抛出
    :raises Exception: 其他读取错误时抛出
    """
    try:
        # 读取全部内容(不自动识别表头)
        full_df = pd.read_excel(file_path, header=None, engine="openpyxl")
        # 提取列名(指定行)与数据(指定行之后)
        columns = full_df.iloc[HEADER_ROW_INDEX].tolist()
        data_df = full_df.iloc[DATA_START_ROW:].copy()
        data_df.columns = columns
        # 过滤空行(仅保留至少有一列非空的行)
        data_df = data_df.dropna(how="all")
        logger.info(f"成功读取文件:{file_path.name},数据行数:{len(data_df)}")
        return data_df
    except FileNotFoundError:
        logger.error(f"文件不存在:{file_path}")
        raise
    except Exception as e:
        logger.error(f"读取文件{file_path.name}失败:{str(e)}")
        raise


def merge_device_data(group_df: pd.DataFrame) -> pd.Series:
    """
    按设备分组合并数据:前2列取首行,其余列拼接非空值
    :param group_df: 单设备的多条记录DataFrame
    :return: 合并后的单条记录Series
    """
    # 前2列(名称、序列号)取第一行数据
    basic_info = group_df.iloc[0, :2].copy()
    # 其余列(记录日期、参数)拼接非空值
    param_info = group_df.iloc[:, 2:].apply(
        lambda col: MERGE_SYMBOL.join(col.dropna().astype(str)),
        axis=0
    )
    # 合并基础信息与参数信息
    merged_series = pd.concat([basic_info, param_info])
    return merged_series


def process_single_excel(input_path: Union[str, Path], output_path: Union[str, Path]) -> None:
    """
    处理单个Excel文件:读取→分组合并→保存
    :param input_path: 输入文件路径
    :param output_path: 输出文件路径
    """
    try:
        # 1. 读取数据
        data_df = read_excel_data(input_path)
        if data_df.empty:
            logger.warning(f"文件{input_path.name}无有效数据,跳过处理")
            return
        
        # 2. 按设备序列号分组合并
        merged_df = data_df.groupby(
            data_df.columns[GROUP_COL_INDEX],
            sort=False  # 保持原数据顺序
        ).apply(merge_device_data).reset_index(drop=True)
        
        # 3. 保存合并后的数据
        merged_df.to_excel(output_path, index=False, engine="openpyxl")
        logger.info(f"合并数据已保存至:{output_path.name}")
        
        # 4. 添加合并表头
        add_merged_header(output_path)
    except Exception as e:
        logger.error(f"处理文件{input_path.name}失败:{str(e)}")
        raise


def add_merged_header(excel_path: Union[str, Path]) -> None:
    """
    为Excel文件添加合并单元格表头(按配置执行)
    :param excel_path: Excel文件路径
    """
    try:
        # 打开工作簿并获取活动工作表
        wb: Workbook = load_workbook(excel_path)
        ws: Worksheet = wb.active
        
        # 在第一行插入空行(用于放置合并表头)
        ws.insert_rows(1, amount=1)
        
        # 配置表头样式(居中对齐)
        header_style = Alignment(horizontal="center", vertical="center")
        
        # 按配置合并单元格并写入内容
        for config in HEADER_MERGE_CONFIG:
            start_row = config["start_row"]
            start_col = config["start_col"]
            end_row = config["end_row"]
            end_col = config["end_col"]
            value = config["value"]
            
            # 合并单元格
            ws.merge_cells(
                start_row=start_row,
                start_column=start_col,
                end_row=end_row,
                end_column=end_col
            )
            # 写入内容并应用样式
            cell = ws.cell(row=start_row, column=start_col, value=value)
            cell.alignment = header_style
        
        # 保存修改
        wb.save(excel_path)
        logger.info(f"成功为文件{excel_path.name}添加表头")
    except Exception as e:
        logger.error(f"添加表头失败(文件:{excel_path.name}):{str(e)}")
        raise


def get_prev_month_info() -> tuple[int, int]:
    """
    获取当前时间的上一个月(年,月)
    :return: (上一年,上一月)
    """
    now = datetime.datetime.now()
    if now.month == 1:
        return now.year - 1, 12
    else:
        return now.year, now.month - 1


def batch_process_excel(folder_path: Union[str, Path]) -> None:
    """
    批量处理文件夹内所有Excel文件
    :param folder_path: 台账文件夹路径
    """
    folder = Path(folder_path)
    if not folder.is_dir():
        logger.error(f"文件夹不存在:{folder_path}")
        raise NotADirectoryError(f"无效文件夹路径:{folder_path}")
    
    # 获取上一个月信息(用于输出命名)
    prev_year, prev_month = get_prev_month_info()
    logger.info(f"批量处理开始,输出文件月份:{prev_year}年{prev_month}月")
    
    # 遍历文件夹内的Excel文件(仅处理.xlsx格式)
    excel_files: List[Path] = [f for f in folder.iterdir() if f.suffix.lower() == ".xlsx"]
    if not excel_files:
        logger.warning(f"文件夹{folder.name}内无.xlsx文件")
        return
    
    # 逐个处理文件
    for file in excel_files:
        # 生成输出文件名:原文件名_xx年xx月.xlsx
        output_filename = f"{file.stem}_{prev_year}年{prev_month}月.xlsx"
        output_path = file.parent / output_filename
        
        # 处理单个文件
        try:
            process_single_excel(file, output_path)
            logger.info(f"文件{file.name}处理完成,输出路径:{output_path}\n")
        except Exception:
            logger.error(f"文件{file.name}处理中断,跳过后续步骤\n", exc_info=False)
            continue
    
    logger.info(f"批量处理结束!共处理文件数:{len(excel_files)}")


if __name__ == "__main__":
    # 台账文件夹路径(可根据实际情况修改)
    TARGET_FOLDER = r"C:\Users\Administrator\Desktop\设备台账"
    try:
        batch_process_excel(TARGET_FOLDER)
    except Exception as e:
        logger.critical(f"批量处理整体失败:{str(e)}", exc_info=True)

大功告成

全部实现了前面的几个需求点

处理了20+文件,用时1.06秒

如果是人为手工处理的话,那就是:没有如果!

20250922083314548-image

20250922083331918-image

🎀 🌸

📜 重要提示:
如有解压密码:看下载页、看下载页、看下载页。
源码工具资源类具有可复制性: 建议具有一定思考和动手能力的用户购买。
请谨慎考虑: 小白用户和缺乏思考动手能力者不建议赞助。
虚拟商品购买须知: 虚拟类商品,一经打赏赞助,不支持退款。请谅解,谢谢合作!
邻兔跃官网:lt.lintuyue.com(如有解压密码看下载页说明)。

文章版权声明 1、本网站名称:邻兔跃lT
2、本站永久网址:https://lt.lintuyue.com/
3、本站内容主要来源于互联网优质资源整合、网友积极投稿以及部分原创内容,仅供内部学习研究软件设计思想和原理使用,学习研究后请自觉删除,请勿传播,因未及时删除所造成的任何后果责任自负,如有侵权,请联系站长进行删除处理。
4、本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
5、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
6、本站资源大多存储在云盘,如发现链接失效,请联系我们我们会第一时间更新。
© 版权声明
THE END
喜欢就支持一下吧
点赞11 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容