All checks were successful
build-and-push / build (push) Successful in 33s
- ProjectMailMsg(헤더 저장)·ProjectMailState(동기화 상태) 모델, AutoMigrate 등록 - mailsync.FetchForDomain: nextPageToken 따라 전체 히스토리 페이지네이션(maxPerBox=0=전부) - 백그라운드 주기 동기화 StartMailSyncLoop(MAIL_SYNC_INTERVAL 기본 15m, 0=비활성) · 미동기화 프로젝트=full 백필, 이후=최신 페이지 top-up - GET /mails는 DB에서 읽어 참여자(from/to/cc) 필터 + 공동 메모 인라인 결합 + lastSyncedAt - POST /mails/sync(강제 풀싱크), PUT /mail-hide(프로젝트 단위 숨김) - mailCache 제거(DB가 캐시), config MailSyncInterval Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
346 lines
10 KiB
Go
346 lines
10 KiB
Go
// Package mailsync reads Gmail messages for a project's client domain using a
|
|
// Google Workspace service account with DOMAIN-WIDE DELEGATION — stdlib only, no
|
|
// third-party SDK (mirrors internal/push for the JWT/OAuth dance).
|
|
//
|
|
// It is OPTIONAL: with no credentials configured the Service is "disabled" and
|
|
// list calls return empty. This lets spin run without Google until the Workspace
|
|
// admin provisions a service account + domain-wide delegation (gmail.readonly).
|
|
//
|
|
// For a project the handler passes the set of member emails to impersonate; for
|
|
// each we mint a delegated token (sub=member) and search that mailbox for the
|
|
// client domain, then aggregate + dedup by RFC822 Message-ID across the team.
|
|
package mailsync
|
|
|
|
import (
|
|
"context"
|
|
"crypto"
|
|
"crypto/rsa"
|
|
"crypto/sha256"
|
|
"crypto/x509"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"encoding/pem"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const gmailScope = "https://www.googleapis.com/auth/gmail.readonly"
|
|
|
|
type serviceAccount struct {
|
|
ProjectID string `json:"project_id"`
|
|
PrivateKey string `json:"private_key"`
|
|
ClientEmail string `json:"client_email"`
|
|
TokenURI string `json:"token_uri"`
|
|
}
|
|
|
|
// Message is a slim mail summary surfaced to the project mail tab.
|
|
type Message struct {
|
|
ID string `json:"id"` // RFC822 Message-ID (stable dedup key)
|
|
ThreadID string `json:"threadId"`
|
|
From string `json:"from"`
|
|
To string `json:"to"`
|
|
Cc string `json:"cc"`
|
|
Subject string `json:"subject"`
|
|
Date string `json:"date"` // raw Date header
|
|
Snippet string `json:"snippet"`
|
|
Mailbox string `json:"mailbox"` // whose mailbox surfaced it
|
|
TS int64 `json:"ts"` // internalDate (ms) for sorting/formatting
|
|
}
|
|
|
|
// Involves reports whether the given email appears in From/To/Cc (case-insensitive).
|
|
// Used to gate per-viewer visibility: a member only sees mail they're a party to.
|
|
func (m Message) Involves(email string) bool {
|
|
if email == "" {
|
|
return false
|
|
}
|
|
hay := strings.ToLower(m.From + " " + m.To + " " + m.Cc)
|
|
return strings.Contains(hay, strings.ToLower(email))
|
|
}
|
|
|
|
type tokenEntry struct {
|
|
token string
|
|
expires time.Time
|
|
}
|
|
|
|
// Service holds the delegated service-account credentials and a per-subject
|
|
// (impersonated user) access-token cache.
|
|
type Service struct {
|
|
sa *serviceAccount
|
|
key *rsa.PrivateKey
|
|
client *http.Client
|
|
mu sync.Mutex
|
|
tokens map[string]tokenEntry
|
|
}
|
|
|
|
// New builds a Service from a service-account JSON file path. Empty path or a
|
|
// read/parse error yields a disabled Service.
|
|
func New(credsPath string) *Service {
|
|
if strings.TrimSpace(credsPath) == "" {
|
|
log.Printf("mailsync: Gmail disabled (no credentials configured)")
|
|
return &Service{}
|
|
}
|
|
raw, err := os.ReadFile(credsPath)
|
|
if err != nil {
|
|
log.Printf("mailsync: Gmail disabled (cannot read %s: %v)", credsPath, err)
|
|
return &Service{}
|
|
}
|
|
var sa serviceAccount
|
|
if err := json.Unmarshal(raw, &sa); err != nil || sa.PrivateKey == "" || sa.ClientEmail == "" {
|
|
log.Printf("mailsync: Gmail disabled (invalid service account JSON)")
|
|
return &Service{}
|
|
}
|
|
key, err := parsePrivateKey(sa.PrivateKey)
|
|
if err != nil {
|
|
log.Printf("mailsync: Gmail disabled (bad private key: %v)", err)
|
|
return &Service{}
|
|
}
|
|
if sa.TokenURI == "" {
|
|
sa.TokenURI = "https://oauth2.googleapis.com/token"
|
|
}
|
|
log.Printf("mailsync: Gmail enabled (sa=%s)", sa.ClientEmail)
|
|
return &Service{sa: &sa, key: key, client: &http.Client{Timeout: 15 * time.Second}, tokens: map[string]tokenEntry{}}
|
|
}
|
|
|
|
// Enabled reports whether real Gmail access is configured.
|
|
func (s *Service) Enabled() bool { return s != nil && s.sa != nil }
|
|
|
|
// FetchForDomain impersonates each mailbox in turn and pages through its full
|
|
// history for the client domain (maxPerBox caps pages per mailbox; <=0 = all),
|
|
// returning the aggregated, de-duplicated, newest-first list.
|
|
func (s *Service) FetchForDomain(ctx context.Context, mailboxes []string, domain string, maxPerBox int) ([]Message, error) {
|
|
if !s.Enabled() {
|
|
return nil, nil
|
|
}
|
|
domain = strings.TrimSpace(strings.TrimPrefix(domain, "@"))
|
|
if domain == "" {
|
|
return nil, nil
|
|
}
|
|
if maxPerBox <= 0 {
|
|
maxPerBox = 1 << 30 // effectively unbounded (whole history)
|
|
}
|
|
q := fmt.Sprintf("(from:%s OR to:%s OR cc:%s) -in:chats", domain, domain, domain)
|
|
seen := map[string]bool{}
|
|
var all []Message
|
|
var firstErr error
|
|
for _, box := range mailboxes {
|
|
box = strings.TrimSpace(box)
|
|
if box == "" {
|
|
continue
|
|
}
|
|
pageToken := ""
|
|
fetched := 0
|
|
for {
|
|
ids, next, err := s.listPage(ctx, box, q, pageToken)
|
|
if err != nil {
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
break
|
|
}
|
|
for _, id := range ids {
|
|
if fetched >= maxPerBox {
|
|
break
|
|
}
|
|
m, err := s.getMeta(ctx, box, id)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
key := m.ID
|
|
if key == "" {
|
|
key = box + "/" + id
|
|
}
|
|
if !seen[key] {
|
|
seen[key] = true
|
|
all = append(all, m)
|
|
}
|
|
fetched++
|
|
}
|
|
if next == "" || fetched >= maxPerBox {
|
|
break
|
|
}
|
|
pageToken = next
|
|
}
|
|
}
|
|
sort.Slice(all, func(i, j int) bool { return all[i].TS > all[j].TS })
|
|
return all, firstErr
|
|
}
|
|
|
|
func (s *Service) listPage(ctx context.Context, subject, q, pageToken string) (ids []string, next string, err error) {
|
|
tok, err := s.accessToken(ctx, subject)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
u := fmt.Sprintf("https://gmail.googleapis.com/gmail/v1/users/me/messages?maxResults=100&q=%s", url.QueryEscape(q))
|
|
if pageToken != "" {
|
|
u += "&pageToken=" + url.QueryEscape(pageToken)
|
|
}
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
|
req.Header.Set("Authorization", "Bearer "+tok)
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return nil, "", fmt.Errorf("gmail list %d for %s", resp.StatusCode, subject)
|
|
}
|
|
var out struct {
|
|
Messages []struct {
|
|
ID string `json:"id"`
|
|
} `json:"messages"`
|
|
NextPageToken string `json:"nextPageToken"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
return nil, "", err
|
|
}
|
|
ids = make([]string, 0, len(out.Messages))
|
|
for _, m := range out.Messages {
|
|
ids = append(ids, m.ID)
|
|
}
|
|
return ids, out.NextPageToken, nil
|
|
}
|
|
|
|
func (s *Service) getMeta(ctx context.Context, subject, id string) (Message, error) {
|
|
tok, err := s.accessToken(ctx, subject)
|
|
if err != nil {
|
|
return Message{}, err
|
|
}
|
|
u := fmt.Sprintf("https://gmail.googleapis.com/gmail/v1/users/me/messages/%s?format=metadata&metadataHeaders=From&metadataHeaders=To&metadataHeaders=Cc&metadataHeaders=Subject&metadataHeaders=Date&metadataHeaders=Message-ID", id)
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
|
req.Header.Set("Authorization", "Bearer "+tok)
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return Message{}, err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode >= 300 {
|
|
return Message{}, fmt.Errorf("gmail get %d", resp.StatusCode)
|
|
}
|
|
var out struct {
|
|
ID string `json:"id"`
|
|
ThreadID string `json:"threadId"`
|
|
Snippet string `json:"snippet"`
|
|
InternalDate string `json:"internalDate"`
|
|
Payload struct {
|
|
Headers []struct {
|
|
Name string `json:"name"`
|
|
Value string `json:"value"`
|
|
} `json:"headers"`
|
|
} `json:"payload"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
return Message{}, err
|
|
}
|
|
m := Message{ThreadID: out.ThreadID, Snippet: out.Snippet, Mailbox: subject}
|
|
for _, h := range out.Payload.Headers {
|
|
switch strings.ToLower(h.Name) {
|
|
case "from":
|
|
m.From = h.Value
|
|
case "to":
|
|
m.To = h.Value
|
|
case "cc":
|
|
m.Cc = h.Value
|
|
case "subject":
|
|
m.Subject = h.Value
|
|
case "date":
|
|
m.Date = h.Value
|
|
case "message-id":
|
|
m.ID = h.Value
|
|
}
|
|
}
|
|
if m.ID == "" {
|
|
m.ID = subject + "/" + out.ID
|
|
}
|
|
fmt.Sscan(out.InternalDate, &m.TS)
|
|
return m, nil
|
|
}
|
|
|
|
// accessToken returns a cached delegated token for the given subject (impersonated
|
|
// user), minting a new one via the service-account JWT grant when expired.
|
|
func (s *Service) accessToken(ctx context.Context, subject string) (string, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if e, ok := s.tokens[subject]; ok && e.token != "" && time.Now().Before(e.expires.Add(-60*time.Second)) {
|
|
return e.token, nil
|
|
}
|
|
now := time.Now()
|
|
claims := map[string]any{
|
|
"iss": s.sa.ClientEmail,
|
|
"sub": subject, // domain-wide delegation: act as this Workspace user
|
|
"scope": gmailScope,
|
|
"aud": s.sa.TokenURI,
|
|
"iat": now.Unix(),
|
|
"exp": now.Add(time.Hour).Unix(),
|
|
}
|
|
assertion, err := s.signJWT(claims)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
form := url.Values{}
|
|
form.Set("grant_type", "urn:ietf:params:oauth:grant-type:jwt-bearer")
|
|
form.Set("assertion", assertion)
|
|
req, _ := http.NewRequestWithContext(ctx, http.MethodPost, s.sa.TokenURI, strings.NewReader(form.Encode()))
|
|
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
|
resp, err := s.client.Do(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer resp.Body.Close()
|
|
var out struct {
|
|
AccessToken string `json:"access_token"`
|
|
ExpiresIn int `json:"expires_in"`
|
|
Error string `json:"error"`
|
|
ErrorDesc string `json:"error_description"`
|
|
}
|
|
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
|
return "", err
|
|
}
|
|
if out.AccessToken == "" {
|
|
return "", fmt.Errorf("token exchange failed for %s: %s %s", subject, out.Error, out.ErrorDesc)
|
|
}
|
|
s.tokens[subject] = tokenEntry{token: out.AccessToken, expires: now.Add(time.Duration(out.ExpiresIn) * time.Second)}
|
|
return out.AccessToken, nil
|
|
}
|
|
|
|
func (s *Service) signJWT(claims map[string]any) (string, error) {
|
|
header := map[string]string{"alg": "RS256", "typ": "JWT"}
|
|
hb, _ := json.Marshal(header)
|
|
cb, _ := json.Marshal(claims)
|
|
signingInput := b64(hb) + "." + b64(cb)
|
|
h := sha256.Sum256([]byte(signingInput))
|
|
sig, err := rsa.SignPKCS1v15(nil, s.key, crypto.SHA256, h[:])
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return signingInput + "." + base64.RawURLEncoding.EncodeToString(sig), nil
|
|
}
|
|
|
|
func b64(b []byte) string { return base64.RawURLEncoding.EncodeToString(b) }
|
|
|
|
func parsePrivateKey(pemStr string) (*rsa.PrivateKey, error) {
|
|
block, _ := pem.Decode([]byte(pemStr))
|
|
if block == nil {
|
|
return nil, errors.New("no PEM block")
|
|
}
|
|
if k, err := x509.ParsePKCS1PrivateKey(block.Bytes); err == nil {
|
|
return k, nil
|
|
}
|
|
keyAny, err := x509.ParsePKCS8PrivateKey(block.Bytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
k, ok := keyAny.(*rsa.PrivateKey)
|
|
if !ok {
|
|
return nil, errors.New("not an RSA private key")
|
|
}
|
|
return k, nil
|
|
}
|