李成笔记网

专注域名、站长SEO知识分享与实战技巧

Gmail API配置+Python实现google邮件发送完整指南,出海必备!

最近在做个海外项目,需要使用邮箱通知功能!最开始时候用的163邮箱发现各种收件延迟,无奈之下决定弃用国内邮箱,改用海外最大用户平台谷歌的gmail进行邮件发送。不过登录最新的Gmail邮箱设置界面,可以看到google已经把IMAP权限关闭了,只保留了POP收邮件的功能,所以无法通过IMAP里用户名密码方式直接发送邮件,经过一番折腾,终于把发件功能调通了,接下来我将详细分享下如何配置gmail以及发邮件的源码

开启Gmail的API邮件发送功能

1、google cloud创建API项目

首先进到谷歌云API控制台,创建一个Email项目:
https://console.cloud.google.com/apis/dashboard

2、配置Gmail API

接下来,进到API控制台:
https://console.cloud.google.com/apis/library/browse,搜索Gmail API

点击启用Gmail API

3、配置OAuth consent screen

上一步开启Gmail API后,Enable按钮会变成Manager,点击Manager按钮进入以下页面,选择OAuth consent screen

点击OAuth consent screen后会进到以下页面,点击Get started按钮

这里输入你的应用名,选择完你的gmail邮箱后点击Next

接下来进入到Audience页面,"Audience"选项用于指定谁可以访问您的应用程序。这有助于控制应用程序的访问范围和发布流程。以下是两个选项的作用:

  1. Internal(内部): 仅限于组织内部的用户访问。 您不需要将应用提交进行验证。 适用于仅供内部员工或团队成员使用的应用。
  2. External(外部): 可供任何拥有 Google 帐号的测试用户访问。 应用程序开始时处于测试模式,仅对您添加到测试用户列表中的用户可用。 一旦准备好推向生产环境,可能需要对应用进行验证。 适用于希望在更广泛的用户群体中进行测试或最终发布给公众的应用。

选择合适的选项可以帮助你根据应用的目标用户群体和发布策略来配置访问权限,这里我选择的是External,大家可以根据自己实际情况选择,然后继续点击Next

填写联系人信息,这里可以写个你自己的常用邮箱,点击Next

选择同意协议,点击创建

4、创建OAuth client

等待5秒左右,底下会弹出OAuth配置创建完成消息,继续点击Create OAuth client

这里输入你的OAuth客户端名称,并选择客户端的类型,我使用的Desktop,这里后面发布APP会要用到,不过我们用的Test模式,这里可以随便选个类型!

点击Create后,就会进入到Client ID的生成页面,这里Client ID可以先复制下来

5、获取OAuth Client的json密钥

点击OK后,会自动进到Clients列表,这里我们点击编辑图标

在client secret右侧点击下载图标,下载client和secret的json配置信息重命名为client_secret.json

进入到 /tmp目录,把重命名后的client_secret.json放在/tmp目录下,这个文件后面会用到

生成代码授权token

接下来,我们就可以开始写代码发送邮件了。因google要求首次调用邮件发送接口必须到页面再去授权一次,生成授权token到本地后再去调用发送邮件就无需再调用了!

1、安装python环境

这里选择python因为它代码可以直接pip安装google-api的包,使用其余语言可以参考我下面代码用AI工具(
https://ai.quanyouhulian.com/)再去转一道,这里推荐使用python3.11,安装教程可以参考:
https://mp.weixin.qq.com/s/Me5H_ESVk4SvzqTrm8RqMg

2、安装谷歌google-api以及google-auth依赖

安装完python3.11后,进入到venv虚拟环境中,直接使用以下命令安装相关的依赖包

pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib

安装如下

3、编写python发送邮件的代码

这里我使用的IDE工具是Cursor,先写一个谷歌邮件发送功能类

# -*- coding: utf-8 -*-
import base64
import logging
import os
import sys
import traceback
import time
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from types import TracebackType
from typing import Optional

from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import webbrowser

logging.basicConfig(level=logging.INFO)


class GoogleEmail:
    def __init__(self, client_secret_path: str = "", token_path: str = "", headless: bool = False) -> None:
        """
        Initialize an instance of the GoogleEmail class.

        Args:
            client_secret_path (str): The path of the client secret json file. Defaults to "".
            token_path (str): The path for saving/loading the token. If empty, uses default path. Defaults to "".
            headless (bool): Whether to use console mode for OAuth (for servers without browser). Defaults to False.

        Returns:
            None
        """
        self._client_secret_path = client_secret_path
        self._token_path = token_path
        self._headless = headless
        self._gmail_service = None
        self._init()

    def _init(self) -> None:
        """
        Initialize the gmail_service.

        Returns:
            None
        """
        if not os.path.exists(self._client_secret_path):
            logging.error(f"客户端密钥文件不存在: {self._client_secret_path}")
            sys.exit(1)
            
        tmp_dir = os.path.join(os.path.dirname(__file__), "tmp")
        os.makedirs(tmp_dir, exist_ok=True)
        
        # 使用用户指定的token路径或默认路径
        google_token_path = self._token_path if self._token_path else os.path.abspath(os.path.join(tmp_dir, "google_email_token.json"))
        
        credentials = None
        if os.path.exists(google_token_path):
            try:
                credentials = Credentials.from_authorized_user_file(
                    filename=google_token_path,
                    scopes=["https://www.googleapis.com/auth/gmail.send"]
                )
            except Exception as e:
                logging.error(f"读取token文件失败: {e}\n{traceback.format_exc()}")
                sys.exit(1)
                
        if not credentials or not credentials.valid:
            if credentials and credentials.expired and credentials.refresh_token:
                try:
                    credentials.refresh(Request())
                except Exception as e:
                    logging.error(f"刷新token失败: {e}\n{traceback.format_exc()}")
                    # 如果刷新失败,删除旧token文件并重新授权
                    if os.path.exists(google_token_path):
                        os.remove(google_token_path)
            else:
                try:
                    flow = InstalledAppFlow.from_client_secrets_file(
                        client_secrets_file=self._client_secret_path,
                        scopes=["https://www.googleapis.com/auth/gmail.send"]
                    )
                    
                    if self._headless:
                        # 无头模式下显示URL并接收输入的验证码
                        logging.info(f"请在浏览器中打开以下URL进行授权:")
                        logging.info(flow.authorization_url()[0])
                        code = input("请输入授权码: ")
                        flow.fetch_token(code=code)
                        credentials = flow.credentials
                    else:
                        try:
                            # 尝试检测是否有可用的浏览器
                            webbrowser.get()
                            credentials = flow.run_local_server(port=0)
                        except webbrowser.Error:
                            logging.warning("未检测到可用的浏览器,切换到手动授权模式")
                            logging.info(f"请在浏览器中打开以下URL进行授权:")
                            logging.info(flow.authorization_url()[0])
                            code = input("请输入授权码: ")
                            flow.fetch_token(code=code)
                            credentials = flow.credentials
                except Exception as e:
                    logging.error(f"OAuth授权失败: {e}\n{traceback.format_exc()}")
                    sys.exit(1)
                    
            with open(google_token_path, "w") as f:
                f.write(credentials.to_json())

        try:
            # 设置超时参数
            self._gmail_service = build("gmail", "v1", credentials=credentials, cache_discovery=False)
        except Exception as e:
            logging.error(f"创建Gmail服务失败: {e}\n{traceback.format_exc()}")
            sys.exit(1)

    def send(self,
             subject: str,
             body: str,
             to_recipients: str,
             cc_recipients: Optional[str] = None,
             bcc_recipients: Optional[str] = None,
             attachment_path: Optional[str] = None,
             sender_name: Optional[str] = None,
             sender_email: Optional[str] = None,
             retry_times: int = 3,
             retry_interval: int = 5,
             timeout: int = 30
             ) -> None:
        """
        Send an email using Gmail API.

        Args:
            subject (str): The email subject.
            body (str): The email body.
            to_recipients (str): Comma-separated email addresses of the primary recipients.
            cc_recipients (str): Comma-separated email addresses of the CC recipients. Default is None.
            bcc_recipients (str): Comma-separated email addresses of the BCC recipients. Default is None.
            attachment_path (str): Path to the file to be attached. Default is None (no attachment).
            sender_name (str): Display name for the sender. Default is None (uses email address).
            sender_email (str): Email address to display with sender name. Default is None (uses "me").
            retry_times (int): Number of retries if sending fails. Default is 3.
            retry_interval (int): Interval between retries in seconds. Default is 5.
            timeout (int): Timeout for HTTP requests in seconds. Default is 30.

        Returns:
            None
        """
        message = MIMEMultipart()
        message["subject"] = subject
        message.attach(MIMEText(body, "plain"))

        message["to"] = to_recipients

        if cc_recipients:
            message["cc"] = cc_recipients

        if bcc_recipients:
            message["bcc"] = bcc_recipients
            
        if sender_name:
            # 使用formataddr格式化发件人信息为"Name <email>"格式
            email = sender_email if sender_email else "me"
            message["from"] = formataddr((sender_name, email))

        if attachment_path:
            if not os.path.exists(attachment_path):
                logging.error(f"附件不存在: {attachment_path}")
                return
                
            attachment_name = os.path.basename(attachment_path)
            attachment = MIMEBase("application", "octet-stream")
            with open(attachment_path, "rb") as f:
                attachment.set_payload(f.read())
            encoders.encode_base64(attachment)
            attachment.add_header("Content-Disposition", f"attachment; filename={attachment_name}")
            message.attach(attachment)

        raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8")
        
        if self._gmail_service is None:
            logging.error("Gmail服务未初始化")
            return
        
        for attempt in range(retry_times):
            try:
                logging.info(f"尝试发送邮件 (尝试 {attempt+1}/{retry_times})...")
                request = self._gmail_service.users().messages().send(
                    userId="me", 
                    body={"raw": raw_message}
                )
                # 设置请求超时
                request.http.timeout = timeout
                response = request.execute()
                logging.info(f"邮件发送成功, 响应: {response}")
                return
            except HttpError as e:
                logging.error(f"发送邮件时出现HTTP错误: {e.status_code} - {e.reason}")
                if e.status_code == 429 or e.status_code >= 500:  # 限流或服务器错误
                    if attempt < retry_times - 1:
                        logging.info(f"将在 {retry_interval} 秒后重试...")
                        time.sleep(retry_interval)
                    else:
                        logging.error("达到最大重试次数,放弃发送")
                else:
                    logging.error(f"发送邮件失败: {e}")
                    break
            except Exception as e:
                logging.error(f"发送邮件失败: {e}\n{traceback.format_exc()}")
                if attempt < retry_times - 1:
                    logging.info(f"将在 {retry_interval} 秒后重试...")
                    time.sleep(retry_interval)
                else:
                    logging.error("达到最大重试次数,放弃发送")

    @staticmethod
    def generate_token(client_secret_path: str, token_output_path: str) -> None:
        """
        在本地生成令牌文件,用于上传到无浏览器环境的服务器

        Args:
            client_secret_path (str): 客户端密钥文件路径
            token_output_path (str): 令牌输出路径

        Returns:
            None
        """
        if not os.path.exists(client_secret_path):
            logging.error(f"客户端密钥文件不存在: {client_secret_path}")
            sys.exit(1)
            
        try:
            flow = InstalledAppFlow.from_client_secrets_file(
                client_secrets_file=client_secret_path,
                scopes=["https://www.googleapis.com/auth/gmail.send"]
            )
            credentials = flow.run_local_server(port=0)
            with open(token_output_path, "w") as f:
                f.write(credentials.to_json())
            logging.info(f"令牌已成功生成到: {token_output_path}")
        except Exception as e:
            logging.error(f"生成令牌失败: {e}\n{traceback.format_exc()}")
            sys.exit(1)


if __name__ == "__main__":
    import argparse
    
    parser = argparse.ArgumentParser(description="Google Email Sender")
    parser.add_argument("--generate-token", action="store_true", help="仅生成令牌文件")
    parser.add_argument("--client-secret", type=str, default="/tmp/client_secret.json", help="客户端密钥文件路径")
    parser.add_argument("--token-path", type=str, default="", help="令牌文件路径")
    parser.add_argument("--headless", action="store_true", help="使用无头模式(无浏览器环境)")
    parser.add_argument("--to", type=str, default="", help="收件人邮箱")
    parser.add_argument("--subject", type=str, default="Test Email", help="邮件主题")
    parser.add_argument("--body", type=str, default="This is a test email.", help="邮件内容")
    parser.add_argument("--sender-name", type=str, default="", help="发件人显示名称")
    parser.add_argument("--sender-email", type=str, default="", help="发件人邮箱地址")
    parser.add_argument("--timeout", type=int, default=30, help="请求超时时间(秒)")
    
    args = parser.parse_args()
    
    if args.generate_token:
        token_path = args.token_path if args.token_path else os.path.join(os.path.dirname(__file__), "tmp", "google_email_token.json")
        GoogleEmail.generate_token(args.client_secret, token_path)
    else:
        if not args.to:
            logging.error("请指定收件人邮箱地址")
            sys.exit(1)
            
        # 发送邮件
        GoogleEmail(
            client_secret_path=args.client_secret,
            token_path=args.token_path,
            headless=args.headless
        ).send(
            to_recipients=args.to,
            subject=args.subject,
            body=args.body,
            sender_name=args.sender_name,
            sender_email=args.sender_email,
            timeout=args.timeout
        )

4、生成授权token.json文件

因为我发送邮件功能要部署到云服务器,但云服务器又打不开浏览器完成授权,所以这里我们要分三步走:

  • 4.1、先本地运行python文件,跳到web页面去做google的授权,把授权的token.json文件存储到本地
  • 4.2、再把本地的token.json文件推送到云服务器
  • 4.3、再云服务器启动时候直接读取本地的token.json方式做鉴权,就能实现直接发送了

先在本地计算机运行以下命令,这里通过本地的client_secret.json去生成google的授权token.json后,再把授权后的token.json存储在本地/tmp下

# 在本地计算机上运行
python google_email.py --generate-token --client-secret=/tmp/client_secret.json --token-path=/tmp/token.json

这段代码执行后,会直接拉起浏览器授权页面,如下:

我这里踩了个坑,没有开权限,这里授权不通过,没有做授权

所以需要重新回到API控制台(
https://console.cloud.google.com/apis/dashboard),点击左侧的OAuth consent screen界面

点击左侧Audience,选择Test users,把刚刚gmail邮件用户添加进来

注意:不要点击Publish app,如果你选择了发布app,需要去左侧Branding菜单配置一堆域名等信息,由于Gmail属于用户敏感信息,必须向Google提交申请才能用于生产环境。如果仅仅是个人或小规模企业使用,保持Testing状态即可。

添加完Test users后,再重新执行在本地计算机上运行命令

# 在本地计算机上运行
python google_email.py --generate-token --client-secret=/tmp/client_secret.json --token-path=/tmp/token.json

授权通过后会进入以下页面,点击Continue

继续点击Continue

授权完成,关闭当前页面

回到/tmp目录下,可以看到会生成一个token.json文件

5、上传token.json文件到云服务器

这里使用scp命令把本地文件上传到服务器去,user和server改成你对应的云服务器的用户名和服务器ip地址

 # 然后将生成的token.json上传到云服务器
scp /tmp/token.json user@server:/tmp/   

6、服务器运行时指定token路径

接下来,登录云服务器,执行以下命令运行发送邮件测试下

# 在服务器上运行时指定token路径
python google_email.py --token-path=/tmp/token.json --to=test@163.com

可以看到成功发送了

7、提供对外API调用接口

这里我们写个controller层,使用fastapi提供对外调用的API接口,再封装下入参

import asyncio
import json
import sys
import traceback
import unittest
import uuid
from io import StringIO
from typing import Dict, List
from unittest.mock import MagicMock, patch

import aiohttp
import requests
import uvicorn
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse

# 添加邮件发送接口
@app.post("/chat/send_gmail", summary="发送Gmail邮件")
async def send_gmail(request: Request) -> Dict:
    # copilot begin
    try:
        # 获取请求数据
        body_bytes = await request.body()
        data = json.loads(body_bytes)
        
        # 从请求中获取必要参数
        subject = data.get("subject", "")
        body = data.get("body", "")
        to_recipients = data.get("to", "")
        cc_recipients = data.get("cc", None)
        bcc_recipients = data.get("bcc", None)
        attachment_path = data.get("attachment", None)
        # 这里修改发件人名称为你自己的
        sender_name = data.get("sender_name", "发件人名称")
        # 这里修改为你自己的gmail邮箱
        sender_email = data.get("sender_email", "你自己的@gmail.com")
        retry_times = data.get("retry_times", 3)
        retry_interval = data.get("retry_interval", 5)
        timeout = data.get("timeout", 30)
        
        # 从配置获取客户端密钥文件路径和token路径
        client_secret_path = get_config("gmail.client_secret_path", "/tmp/client_secret.json")
        token_path = get_config("gmail.token_path", "/tmp/token.json")
        
        # 检查必要参数
        if not subject or not body or not to_recipients:
            return {"code": 1, "msg": "邮件主题、内容和收件人不能为空"}
            
        # 检查token文件是否存在
        import os
        if not os.path.exists(token_path):
            return {"code": 1, "msg": f"Token文件不存在: {token_path}, 请先使用命令行工具生成token"}
        
        # 导入GoogleEmail类
        from ai_shop.base.google_email import GoogleEmail
        
        # 创建GoogleEmail实例并发送邮件,强制使用headless模式避免浏览器弹出
        email_sender = GoogleEmail(
            client_secret_path=client_secret_path,
            token_path=token_path,
            headless=True
        )
        
        email_sender.send(
            subject=subject,
            body=body,
            to_recipients=to_recipients,
            cc_recipients=cc_recipients,
            bcc_recipients=bcc_recipients,
            attachment_path=attachment_path,
            sender_name=sender_name,
            sender_email=sender_email,
            retry_times=retry_times,
            retry_interval=retry_interval,
            timeout=timeout
        )
        
        logger.info(f"邮件发送成功,收件人: {to_recipients}, 主题: {subject}")
        return {"code": 0, "msg": "邮件发送成功"}
    except Exception as e:
        logger.error(f"邮件发送失败: {str(e)}, {traceback.format_exc()}")
        return {"code": 1, "msg": f"邮件发送失败: {str(e)}"}
    # copilot end

# 添加邮件发送接口OPTIONS请求处理
@app.options("/chat/send_gmail")
async def options_send_gmail():
    return Response(status_code=200)

if __name__ == "__main__":
    # 从配置获取服务器设置
    host = get_config("server.host", "0.0.0.0")
    port = get_config("server.port", 8000)

    logger.info(f"正在启动简单AI Shop服务,地址: {host}:{port}")
    logger.info(f"API接口: http://localhost:{port}/chat/completions")
    logger.info(f"API接口: http://localhost:{port}/chat/knowledge/insert")
    logger.info(f"API文档: http://localhost:{port}/docs")
    logger.info("CORS已配置,允许所有来源的跨域请求")
    uvicorn.run(app, host=host, port=port)
# copilot end

测试邮件发送

postman测试发送邮件接口

上面的云服务器工程启动后,使用postman测试下接口,前面填写你自己的云服务器公网ip:端口

收件人邮箱登录验证

进到我的163邮箱收件箱,可以看到成功收到了刚发送的测试邮件

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言