2025-10-15

取出 MSSQL DB 中 ZIP 壓縮過的二進位 .eml 檔並取出 Subject 等資訊

Mail eml 格式檔案 ZIP 後存在 DB 裡,但某日發現資料庫中有許多 Record 的 Subject 都錯誤
推測是歸檔程式有 Bug 的關係。
因為會影響搜尋結果,所以必須從資料庫中取出 ZIP 過的 eml 檔案
解壓後分析,再修正資料庫中的資料

以下程式包含上述功能但有更多其他功能,就不多說,有需要自取

需要 
SharpZipLib
這是用來忽略解壓過程中發生錯誤,強制解壓的工具
不知道為什麼 DB 中的 ZIP 二進位資料取出後解壓會有問題,但忽略錯誤就可以

只用 Powershell 實在沒辦法對 Email 做良好的 Parser,必須丟到 Python 去處理
所以需要用 Powershell 處理到一半,用 Python 讀檔,再回 Powershell 處理
因為我實在不熟 Python,不然應該要用 Python 從頭寫到瑋才對

以下程式有許多部份都是用 Copilot 協助產生的,再加上自己修改而成

GetMSGFromDB.ps1

# SharpZipLib
# https://www.nuget.org/packages/SharpZipLib/#supportedframeworks-body-tab
#
# Powershell 7
# https://github.com/PowerShell/PowerShell

if ($PSVersionTable.PSEdition -ne 'Core') {
write-host ('Need Powershell 7');
exit;
};

$TempPath = 'Q:\Temp';
if ((Test-Path -Path $TempPath) -ne $True) {
exit;
};

$CommandPath = 'C:\Command';


$ScriptStartTime = (Get-Date).ToUniversalTime().AddHours(8).ToString('yyyy-MM-dd HH:mm:ss');
$DateTimeString = (Get-Date).ToUniversalTime().AddHours(8).ToString('yyyyMMdd HHmmss')
$TranscriptLog = $CommandPath + '\Logs\' + ('Correct MSG Subject ' + $DateTimeString + ".log")

$LogStatus = start-transcript -LiteralPath $TranscriptLog
Set-Location -Path $CommandPath

$Global:Utf8NoBomEncoding = New-Object System.Text.UTF8Encoding $False;

Add-Type -Path "D:\tmp\lib\netstandard2.0\ICSharpCode.SharpZipLib.dll"

# 設定資料庫連線字串
$connectionString = "Server=127.0.0.1;Database=Q20254;Integrated Security=True"

# 建立 SQL 連線並執行查詢
$connection = New-Object System.Data.SqlClient.SqlConnection($connectionString)

$connection.Open()

$DBNames = @();
$DBNames += "2025Q4";
$DBNames += "2025Q3";

foreach ($DBName in $DBNames) {

$Remain = $True;

while ($Remain -eq $True) {

$command = $connection.CreateCommand()
# 查詢條件
$query = ("SELECT TOP (2) [OriginalID],[MSG] FROM [" + $DBName + "].[dbo].[MailInfoV4] where [Subject] like '%from ms01.contoso.com%' and [Subject] like '%MSW.contoso.com.tw%'");

$command.CommandText = $query
$reader = $command.ExecuteReader()

$QueryResults = @();
while ($reader.Read()) {
$QueryResults += New-Object -TypeName psobject -Property @{
OriginalID = $reader["OriginalID"];
MSG = $reader["MSG"];
};
}
$reader.Close()

if ($QueryResults.count -eq 0) {
$Remain = $False;
} else {

foreach ($QueryResult in $QueryResults) {

# 取得 OriginalID
$originalID = $QueryResult.OriginalID;
$bytes = $QueryResult.MSG;

# 假設 $bytes 是從資料庫讀取的 ZIP 二進位資料
$memoryStream = [System.IO.MemoryStream]::new($bytes)
$zipInputStream = New-Object ICSharpCode.SharpZipLib.Zip.ZipInputStream($memoryStream)

$emlContents = @()  # 用來存檔案名稱與內容

try {
$entry = $zipInputStream.GetNextEntry();
if (-not $entry) { break }

$fileStream = New-Object System.IO.MemoryStream
$buffer = New-Object byte[] 4096
while (($count = $zipInputStream.Read($buffer, 0, $buffer.Length)) -gt 0) {
$fileStream.Write($buffer, 0, $count)
}

# 將 MemoryStream 轉換成文字(UTF-8)
$fileStream.Position = 0
$MEMreader = New-Object System.IO.StreamReader($fileStream, [System.Text.Encoding]::UTF8)
$content = $MEMreader.ReadToEnd()

# 存入雜湊表
$emlContents += New-Object -TypeName psobject -Property @{
OriginalID = $originalID;
content = $content;
};

$MEMreader.Close()
$fileStream.Dispose()

} catch {
# 忽略錯誤,不顯示訊息
break
}

$zipInputStream.Close()
$memoryStream.Dispose()

# 顯示結果(例如第一個檔案內容)
if ($emlContents.Count -eq 1) {

$EMLFile = ($TempPath + '\' + $emlContents.originalID + '.eml');
$JsonFile = ($TempPath + '\' + $emlContents.originalID + '.json');
[System.IO.File]::WriteAllLines($EMLFile, $emlContents.content, $Global:Utf8NoBomEncoding);

python.exe ($CommandPath + '\EmailParser.py') $EMLFile

$WaitSeconds = 0;
while ((Test-Path -Path $JsonFile) -ne $True) {
Start-Sleep 1;
$WaitSeconds++;
if ($WaitSeconds -gt 5) {
exit;
};
};

$EMLJson = ((Get-Content -LiteralPath $JsonFile -Encoding UTF8) | ConvertFrom-Json);;

write-host ("[" + $DBName + "] UPDATE " + $emlContents.originalID + " Subject: '" + $EMLJson.Subject)

# 建立 SQL 命令
$command = $connection.CreateCommand()
$command.CommandText = ("UPDATE [" + $DBName + "].[dbo].[MailInfoV4] SET [Subject] = @subject WHERE [OriginalID] = @originalID")

# 加入參數(使用 Unicode)
$subjectText = $EMLJson.Subject
$originalID = $emlContents.originalID

$command.Parameters.Add((New-Object Data.SqlClient.SqlParameter("@subject", [Data.SqlDbType]::NVarChar, 500))).Value = $EMLJson.Subject
$command.Parameters.Add((New-Object Data.SqlClient.SqlParameter("@originalID", [Data.SqlDbType]::NVarChar, 50))).Value = $emlContents.originalID

# 執行更新
$rowsAffected = $command.ExecuteNonQuery()


Remove-Item -Path $EMLFile -Confirm:$False;
Remove-Item -Path $JsonFile -Confirm:$False;

#$String = $emlContents.content
#$Bytes = [System.Text.Encoding]::UTF8.GetBytes($String)
#$EncodedString = [System.Convert]::ToBase64String($Bytes)
#write-host $EncodedString

} else {
Write-Host ($originalID + "壓縮黨內檔案大於 2 個")
};

};
};
};
};

$connection.Close | out-null

$LogStatus = stop-transcript
EmailParser.py
import os
import email
from email import policy
from email.parser import BytesParser
from email.utils import getaddresses
import re
import json
# python -m pip install eml_parser
import eml_parser

def merge_received_headers(raw_text):
    lines = raw_text.splitlines()
    merged_lines = []
    buffer = ""
    for line in lines:
        if line.startswith("Received:"):
            if buffer:
                merged_lines.append(buffer)
            buffer = line
        elif buffer and (line.startswith(" ") or line.startswith("\t")):
            buffer += " " + line.strip()
        else:
            if buffer:
                merged_lines.append(buffer)
            buffer = ""
            merged_lines.append(line)
    if buffer:
        merged_lines.append(buffer)
    return "\n".join(merged_lines)

def sanitize_id(raw_id):
    cleaned = re.sub(r'@.*', '', raw_id)
    cleaned = re.sub(r'[\\/\\\\:*?"<>|]', '', cleaned)
    cleaned = re.sub(r'[^A-Za-z0-9]', '', cleaned)
    return cleaned
    
def extract_received_info(received_headers, msg):
    first_received = received_headers[0]
    if len(received_headers) >= 2:
        if "from ms01.contoso.com" in first_received or "from ms02.contoso.com" in first_received:
            first_received = received_headers[1]

    smtp_match = re.search(r'from\s+([^\s]+)', first_received)
    id_match = re.search(r'ESMTPS?\s+id\s+([^\s]+)', first_received)
    time_match = re.search(r';\s+(.*)', first_received)

    raw_id = id_match.group(1) if id_match else msg.get("Message-ID", "None")
    esmtp_id = sanitize_id(raw_id)

    if time_match:
        time_str = time_match.group(1)
        try:
            dt = email.utils.parsedate_to_datetime(time_str)
            time_prefix = dt.strftime("%Y%m%d%H%M%S")
        except Exception:
            time_prefix = "UnknownTime"
    else:
        time_prefix = "UnknownTime"

    output_id = f"{time_prefix}_{esmtp_id}"

    return {
        "Received From": smtp_match.group(1) if smtp_match else "None",
        "Mail ID": output_id,
        "Received Time": time_match.group(1) if time_match else "None"
    }

    fallback_id = sanitize_id(msg.get("Message-ID", "None"))
    return {
        "Received From": "None",
        "Mail ID": fallback_id,
        "Received Time": "None"
    }
    
def split_addresses(field):
    if not field or field == "None":
        return []
    return [addr.strip() for addr in field.split(',') if addr.strip()]

def extract_emails(field):
    if not field:
        return []
    return [addr for name, addr in getaddresses([field]) if addr]

def parse_eml(file_path):
    with open(file_path, 'rb') as f:
        raw_bytes = f.read()
    raw_text = raw_bytes.decode(errors='ignore')
    cleaned_text = merge_received_headers(raw_text)
    msg = BytesParser(policy=policy.default).parsebytes(cleaned_text.encode())

    received_headers = re.findall(r'Received:.*?(?=Received:|$)', cleaned_text, re.DOTALL)
    received_info = extract_received_info(received_headers, msg)

    metadata = {
        "Envelope": {
            "Sender": msg.get("Return-Path", "None"),
            "Recipient": msg.get("x-receiver", "None")
        },
        "Received From": received_info["Received From"],
        "Mail ID": received_info["Mail ID"],
        "Received Time": received_info["Received Time"],
        "Letterhead": {
            "Sender": extract_emails(msg.get("From", "")),
            "Recipient": extract_emails(msg.get("To", "")),
            "Cc": extract_emails(msg.get("Cc", "")),
            "Bcc": extract_emails(msg.get("Bcc", "")),
            "Sender_O": split_addresses(msg.get("From", "None")),
            "Recipient_O": split_addresses(msg.get("To", "None")),
            "Cc_O": split_addresses(msg.get("Cc", "None")),
            "Bcc_O": split_addresses(msg.get("Bcc", "None")),
            "Reply-To": msg.get("Reply-To", "None"),
            "Delivered-To": msg.get("Delivered-To", "None")
        },
        "Message-ID": msg.get("Message-ID", "None"),
        "Return-Path": msg.get("Return-Path", "None"),
        "Subject": msg.get("Subject", "None"),
        "Sent Time": msg.get("Date", "None"),
        "Size (bytes)": os.path.getsize(file_path),
        "Attachments": []
    }

    # 解析 EML
    ep = eml_parser.EmlParser()
    parsed_eml = ep.decode_email_bytes(raw_bytes)
    # 擷取附件資訊,只保留 filename, size, extension
    attachments_info = []
    for attachment in parsed_eml.get("attachment", []):
        metadata["Attachments"].append({
            "filename": attachment.get("filename"),
            "size": attachment.get("size"),
            "extension": attachment.get("extension")
        })

    output_id = received_info["Mail ID"]
    output_filename = os.path.splitext(file_path)[0] + ".json"
    with open(output_filename, "w", encoding="utf-8") as f:
        json.dump(metadata, f, indent=4, ensure_ascii=False)
# None Compress
        # print(json.dumps(metadata, indent=4, ensure_ascii=False))
# Compress
        # print(json.dumps(metadata, separators=(',', ':'), ensure_ascii=False))
    #print(f"✅ Metadata 已儲存為 {output_filename}")
    return metadata

# 使用方式
if __name__ == "__main__":
    import sys
    if len(sys.argv) != 2:
        print("⚠️ 使用方式: python EmailParser.py <eml檔案路徑>")
        sys.exit(1)
    file_path = sys.argv[1]
    if os.path.exists(file_path):
        parse_eml(file_path)
    else:
        print(f"⚠️ 找不到 {file_path} 檔案,請確認檔案位置與名稱")






沒有留言:

張貼留言