Refactor Google Meet Transcripts Extension for Local Use

- Removed all cloud-related functionalities, including login prompts and token handling.
- Disabled Laxis cloud features, ensuring no data is sent to external servers.
- Updated manifest to reflect the new local-only functionality.
- Added a new Python server to handle transcripts locally, including WebSocket support.
- Implemented storage management for transcripts, including deduplication and file writing.
- Created a smoke test for the WebSocket server to simulate transcript updates.
- Updated README with setup instructions and usage details for the new local server.
This commit is contained in:
vahidaskari
2026-06-12 00:31:32 +03:30
parent 602dcb7430
commit 7bc34c79ed
35 changed files with 1069 additions and 840 deletions
+234
View File
@@ -0,0 +1,234 @@
"""
storage.py — مدل داده، dedup، و خواندن/نوشتن transcript روی دیسک.
این ماژول هیچ I/O شبکه‌ای ندارد؛ فقط منطقِ segment و فایل‌ها:
- segmentهای هر جلسه را نگه می‌دارد (با dedup بر اساس startedAt)
- برای هر جلسه سه فایل می‌نویسد: <sid>.srt / <sid>.txt / <sid>.json
- توابع خواندن برای لایه‌ی MCP فراهم می‌کند
چرا segment‌محور با dedup؟ افزونه برای هر «حرف» چند بار پیام می‌فرستد و هر بار متن
کمی بلندتر می‌شود (snapshotِ روبه‌رشد) ولی startedAt ثابت می‌ماند. پس startedAt کلیدِ
هویتِ segment است: تکرار شد → همان segment به‌روز می‌شود (نه append). این هم تکرار را
حذف می‌کند و هم چون startedAt/endedAt داریم، زمان‌بندیِ SRT دقیق می‌شود.
"""
import json
import re
import sys
import time
from pathlib import Path
TRANSCRIPTS_DIR = Path(__file__).parent / "transcripts"
TRANSCRIPTS_DIR.mkdir(exist_ok=True)
# segmentهای هر جلسه در حافظه: { sid: [ {seq, speaker, text, start, end, start_key}, ... ] }
sessions_segments: "dict[str, list[dict]]" = {}
def log(*args) -> None:
"""همه‌ی لاگ‌ها به stderr می‌روند؛ stdout برای پروتکل MCP رزرو است."""
print(*args, file=sys.stderr, flush=True)
def safe_session_id(session_id: str) -> str:
cleaned = re.sub(r"[^a-zA-Z0-9_-]", "", session_id or "")
return cleaned or "default"
# ---------------------------------------------------------------------------
# زمان و رندر
# ---------------------------------------------------------------------------
def to_epoch_s(v) -> "float | None":
"""startedAt/endedAt را به ثانیه‌ی epoch تبدیل می‌کند (epoch ms، رشته‌ی عددی یا ISO)."""
if v is None or isinstance(v, bool):
return None
if isinstance(v, (int, float)):
return v / 1000.0 if v > 1e11 else float(v)
if isinstance(v, str):
s = v.strip()
if not s:
return None
if s.isdigit():
n = float(s)
return n / 1000.0 if n > 1e11 else n
try:
from datetime import datetime
return datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp()
except ValueError:
return None
return None
def is_continuation(prev: str, cur: str) -> bool:
"""fallback وقتی timestamp نداریم: آیا cur ادامه/سوپرستِ همان حرفِ prev است؟"""
if not prev or not cur:
return False
short, long = (prev, cur) if len(prev) <= len(cur) else (cur, prev)
common = 0
for a, b in zip(short, long):
if a == b:
common += 1
else:
break
return common >= max(8, int(len(short) * 0.7))
def _fmt_srt(seconds: float) -> str:
if seconds < 0:
seconds = 0
ms = int(round(seconds * 1000))
h, ms = divmod(ms, 3600000)
m, ms = divmod(ms, 60000)
s, ms = divmod(ms, 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def fmt_clock(seconds: float) -> str:
if seconds < 0:
seconds = 0
total = int(seconds)
h, rem = divmod(total, 3600)
m, s = divmod(rem, 60)
return f"{h:02d}:{m:02d}:{s:02d}" if h else f"{m:02d}:{s:02d}"
def render_srt(segments: "list[dict]") -> str:
if not segments:
return ""
base = min(seg["start"] for seg in segments)
out = []
for i, seg in enumerate(segments, 1):
start = seg["start"] - base
end = max(seg["end"], seg["start"]) - base
if end <= start:
end = start + 1.0 # حداقل ۱ ثانیه نمایش
speaker = (seg.get("speaker") or "").strip()
text = (seg.get("text") or "").strip()
line = f"{speaker}: {text}" if speaker else text
out.append(f"{i}\n{_fmt_srt(start)} --> {_fmt_srt(end)}\n{line}\n")
return "\n".join(out)
def render_txt(segments: "list[dict]") -> str:
if not segments:
return ""
base = min(seg["start"] for seg in segments)
lines = []
for seg in segments:
ts = fmt_clock(seg["start"] - base)
speaker = (seg.get("speaker") or "").strip()
text = (seg.get("text") or "").strip()
lines.append(f"[{ts}] {speaker}: {text}" if speaker else f"[{ts}] {text}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# نوشتن / به‌روزرسانی
# ---------------------------------------------------------------------------
def ensure_loaded(sid: str) -> "list[dict]":
"""segmentهای جلسه را می‌دهد؛ اگر در حافظه نبود از <sid>.json می‌خواند (resume)."""
if sid not in sessions_segments:
p = TRANSCRIPTS_DIR / f"{sid}.json"
if p.exists():
try:
sessions_segments[sid] = json.loads(p.read_text(encoding="utf-8"))
except Exception:
sessions_segments[sid] = []
else:
sessions_segments[sid] = []
return sessions_segments[sid]
def persist(sid: str) -> None:
segs = sessions_segments.get(sid, [])
(TRANSCRIPTS_DIR / f"{sid}.json").write_text(
json.dumps(segs, ensure_ascii=False), encoding="utf-8")
(TRANSCRIPTS_DIR / f"{sid}.srt").write_text(render_srt(segs), encoding="utf-8")
(TRANSCRIPTS_DIR / f"{sid}.txt").write_text(render_txt(segs), encoding="utf-8")
def upsert_segment(session_id, speaker, text, started_at, ended_at) -> None:
"""یک caption را اضافه یا (اگر همان حرف باشد) به‌روز می‌کند، سپس روی دیسک می‌نویسد."""
sid = safe_session_id(session_id)
text = (text or "").strip()
if not text:
return
segs = ensure_loaded(sid)
start_s = to_epoch_s(started_at)
end_s = to_epoch_s(ended_at)
recv = time.time()
matched = False
if segs:
last = segs[-1]
if last.get("speaker") == speaker:
lk = last.get("start_key")
if start_s is not None and lk is not None:
matched = abs(lk - start_s) < 0.001
elif start_s is None and lk is None:
matched = is_continuation(last.get("text", ""), text)
if matched:
last = segs[-1]
last["text"] = text # جدیدترین snapshot کامل‌ترین است
last["end"] = end_s if end_s is not None else recv
else:
seq = (segs[-1]["seq"] + 1) if segs else 1
segs.append({
"seq": seq,
"speaker": speaker,
"text": text,
"start": start_s if start_s is not None else recv,
"end": end_s if end_s is not None else (start_s if start_s is not None else recv),
"start_key": start_s,
})
persist(sid)
# ---------------------------------------------------------------------------
# خواندن (برای لایه‌ی MCP)
# ---------------------------------------------------------------------------
def _read(sid: str, ext: str) -> "str | None":
p = TRANSCRIPTS_DIR / f"{safe_session_id(sid)}.{ext}"
if p.exists():
try:
return p.read_text(encoding="utf-8")
except Exception:
return None
return None
def read_txt(sid: str) -> "str | None":
return _read(sid, "txt")
def read_srt(sid: str) -> "str | None":
return _read(sid, "srt")
def read_segments(sid: str) -> "list[dict]":
p = TRANSCRIPTS_DIR / f"{safe_session_id(sid)}.json"
if p.exists():
try:
return json.loads(p.read_text(encoding="utf-8"))
except Exception:
return []
return []
def json_mtime(sid: str) -> float:
p = TRANSCRIPTS_DIR / f"{safe_session_id(sid)}.json"
return p.stat().st_mtime if p.exists() else 0.0
def latest_session() -> "str | None":
files = list(TRANSCRIPTS_DIR.glob("*.txt"))
if not files:
return None
return max(files, key=lambda p: p.stat().st_mtime).stem
def list_session_files():
"""فایل‌های .txt جلسه‌ها را به‌ترتیبِ جدیدترین برمی‌گرداند."""
return sorted(TRANSCRIPTS_DIR.glob("*.txt"),
key=lambda p: p.stat().st_mtime, reverse=True)