mirror of
https://github.com/nexu-io/open-design.git
synced 2026-06-01 03:14:35 +07:00
* feat: pre-generation research (Tavily) for grounded generation
Adds an optional pre-generation research step so the agent can produce
slides / prototypes / decks grounded in real sources instead of guessing.
User flow:
1. Settings -> Tavily Search -> paste API key (or set TAVILY_API_KEY).
2. Click the new Research button in the chat composer.
3. On send, the daemon runs a Tavily search, prepends the findings
as a <research_context> block ahead of the system prompt, and
spawns the agent. Research progress shows up as status pills in
the chat stream; the agent cites sources inline as [1]/[2]/...
Phase 1 surface:
- Single provider (Tavily), single depth ('shallow'), no LLM
synthesis pass (Tavily's `answer` is the summary).
- Composer toggle only; no popover / depth picker yet.
- Reuses the existing `status` SSE agent payload + StatusPill UI
so no new event variants or renderer code are needed.
Layers touched:
- contracts: ResearchOptions / Source / Findings DTOs;
ChatRequest.research; export from index.
- daemon: apps/daemon/src/research/{index,tavily}.ts orchestrator
+ provider; tavily added to MEDIA_PROVIDERS and ENV_KEYS; hook
in startChatRun before prompt assembly.
- web: ChatComposer toggle + ChatSendMeta; threaded through
ChatPane / ProjectView / streamViaDaemon into ChatRequest.
Side fix (required to land the feature, but useful on its own):
contracts internal relative imports lacked the `.js` suffix that
NodeNext module resolution requires. This was already breaking
`pnpm --filter @open-design/daemon typecheck` on main; without the
fix, none of the new research types were visible to the daemon.
All internal contracts imports now carry `.js`.
Spec: specs/current/research-feature.md (phases 2-4 outlined for
follow-up: composer popover, multi-provider, deep recursion, example
skills with research_recommends).
Verified:
- pnpm --filter @open-design/contracts typecheck/test
- pnpm --filter @open-design/daemon typecheck (the chokidar
project-watchers test is a pre-existing flake, unrelated)
- pnpm --filter @open-design/web typecheck
- node scripts/verify-media-models.mjs
* fix(daemon): clamp Tavily max_results to 20
Tavily's /search endpoint requires `max_results` in [0, 20]; sending a
larger value (e.g. when `research.depth: "deep"` resolves to 30) returns
400 and `runResearch` silently falls back to no-research. Clamp at the
provider boundary so Phase 2 depth tiers above 20 still produce results
instead of failing the request.
Generated-By: looper 0.6.1 (runner=fixer, agent=claude-code)
* Remove stale research merge leftovers
* Add agent-callable research search
* Fix Indonesian locale typecheck
* Fix research command invocation edge cases
* Harden slash search prompt expansion
* Honor research source caps in command contract
* Require search reports in design files
* Add research data provider settings
* Wire web research provider fallback order
* Update research provider fallback wording
* Revert "Update research provider fallback wording"
This reverts commit 86fb6001e3.
* Revert "Wire web research provider fallback order"
This reverts commit 4c9e16036b.
* Revert "Add research data provider settings"
This reverts commit 23630d1746.
* Add Dexter and Last30Days research skills
* Add DCF and Last30Days OD skills
* Add Last30Days and Dexter skills
* Resolve research review threads
---------
Co-authored-by: a1chzt <chizblank@gmail.com>
499 lines
17 KiB
Python
499 lines
17 KiB
Python
"""Normalization of source-specific payloads into the v3 generic item model."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
from . import dates, schema
|
|
|
|
|
|
def filter_by_date_range(
|
|
items: list[schema.SourceItem],
|
|
from_date: str,
|
|
to_date: str,
|
|
require_date: bool = False,
|
|
) -> list[schema.SourceItem]:
|
|
"""Keep only items within the requested window."""
|
|
filtered: list[schema.SourceItem] = []
|
|
for item in items:
|
|
if not item.published_at:
|
|
if not require_date:
|
|
filtered.append(item)
|
|
continue
|
|
if item.published_at < from_date or item.published_at > to_date:
|
|
continue
|
|
filtered.append(item)
|
|
return filtered
|
|
|
|
|
|
def normalize_source_items(
|
|
source: str,
|
|
items: list[dict[str, Any]],
|
|
from_date: str,
|
|
to_date: str,
|
|
freshness_mode: str = "balanced_recent",
|
|
) -> list[schema.SourceItem]:
|
|
"""Normalize raw source items, filter by date range, with evergreen fallback for how_to queries."""
|
|
source = source.lower()
|
|
normalizers = {
|
|
"reddit": _normalize_reddit,
|
|
"x": _normalize_x,
|
|
"youtube": _normalize_youtube,
|
|
"tiktok": lambda s, i, idx, fd, td: _normalize_shortform_video(s, i, idx, fd, td, "TK", "TikTok post"),
|
|
"instagram": lambda s, i, idx, fd, td: _normalize_shortform_video(s, i, idx, fd, td, "IG", "Instagram reel"),
|
|
"hackernews": _normalize_hackernews,
|
|
"bluesky": lambda s, i, idx, fd, td: _normalize_microblog(s, i, idx, fd, td, "BS", "Bluesky post"),
|
|
"truthsocial": lambda s, i, idx, fd, td: _normalize_microblog(s, i, idx, fd, td, "TS", "Truth Social post"),
|
|
"threads": lambda s, i, idx, fd, td: _normalize_microblog(s, i, idx, fd, td, "TH", "Threads post"),
|
|
"xquik": _normalize_x,
|
|
"pinterest": _normalize_pinterest,
|
|
"polymarket": _normalize_polymarket,
|
|
"grounding": _normalize_grounding,
|
|
"xiaohongshu": _normalize_grounding,
|
|
"github": _normalize_github,
|
|
"perplexity": _normalize_grounding,
|
|
}
|
|
normalizer = normalizers.get(source)
|
|
if normalizer is None:
|
|
raise ValueError(f"Unsupported source: {source}")
|
|
normalized = [normalizer(source, item, index, from_date, to_date) for index, item in enumerate(items)]
|
|
require_date = source == "grounding"
|
|
filtered = filter_by_date_range(normalized, from_date, to_date, require_date=require_date)
|
|
if filtered:
|
|
return filtered
|
|
if freshness_mode == "evergreen_ok" and source == "youtube":
|
|
if require_date:
|
|
return [item for item in normalized if item.published_at]
|
|
return normalized
|
|
return filtered
|
|
|
|
|
|
def _remap_comments(
|
|
raw: list[Any],
|
|
score_keys: tuple[str, ...],
|
|
excerpt_keys: tuple[str, ...],
|
|
) -> list[dict[str, Any]]:
|
|
"""Normalize comments from any source into the shared Reddit-compatible shape.
|
|
|
|
Downstream code (signals._top_comment_score, render._top_comments_list,
|
|
entity_extract, rerank) all expect `score` and `excerpt`. This helper maps
|
|
per-source field names (YT: likes/text, TikTok: digg_count/text) onto that
|
|
shape while preserving author/date/url passthrough.
|
|
"""
|
|
out: list[dict[str, Any]] = []
|
|
for raw_c in raw:
|
|
if not isinstance(raw_c, dict):
|
|
continue
|
|
score = _first_present(raw_c, score_keys, default=0)
|
|
excerpt = _first_present(raw_c, excerpt_keys, default="")
|
|
try:
|
|
score_int = int(score or 0)
|
|
except (TypeError, ValueError):
|
|
score_int = 0
|
|
entry: dict[str, Any] = {
|
|
"score": score_int,
|
|
"excerpt": str(excerpt or "")[:400],
|
|
"author": str(raw_c.get("author") or ""),
|
|
"date": str(raw_c.get("date") or ""),
|
|
}
|
|
if raw_c.get("url"):
|
|
entry["url"] = str(raw_c["url"])
|
|
out.append(entry)
|
|
return out
|
|
|
|
|
|
def _first_present(d: dict[str, Any], keys: tuple[str, ...], default: Any) -> Any:
|
|
for key in keys:
|
|
if key in d and d[key] not in (None, ""):
|
|
return d[key]
|
|
return default
|
|
|
|
|
|
def _join_comment_excerpts(
|
|
top_comments: list[Any],
|
|
key: str,
|
|
limit: int = 3,
|
|
) -> str:
|
|
"""Space-join the `key` field from the first `limit` dict-shaped comments."""
|
|
return " ".join(
|
|
str(comment.get(key) or "").strip()
|
|
for comment in top_comments[:limit]
|
|
if isinstance(comment, dict)
|
|
)
|
|
|
|
|
|
def _domain_from_url(url: str) -> str | None:
|
|
if not url:
|
|
return None
|
|
domain = urlparse(url).netloc.strip().lower()
|
|
return domain or None
|
|
|
|
|
|
def _date_confidence(item: dict[str, Any], from_date: str, to_date: str, default: str = "low") -> str:
|
|
if item.get("date_confidence"):
|
|
return str(item["date_confidence"])
|
|
date_value = item.get("date")
|
|
if not date_value:
|
|
return default
|
|
return dates.get_date_confidence(str(date_value), from_date, to_date)
|
|
|
|
|
|
def _source_item(
|
|
*,
|
|
item_id: str,
|
|
source: str,
|
|
title: str,
|
|
body: str,
|
|
url: str,
|
|
published_at: str | None,
|
|
date_confidence: str,
|
|
relevance_hint: float,
|
|
why_relevant: str,
|
|
author: str | None = None,
|
|
container: str | None = None,
|
|
engagement: dict[str, float | int] | None = None,
|
|
snippet: str = "",
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> schema.SourceItem:
|
|
return schema.SourceItem(
|
|
item_id=item_id,
|
|
source=source,
|
|
title=title.strip() or body.strip()[:160] or item_id,
|
|
body=body.strip(),
|
|
url=url.strip(),
|
|
author=(author or "").strip() or None,
|
|
container=(container or "").strip() or None,
|
|
published_at=published_at,
|
|
date_confidence=date_confidence,
|
|
engagement=engagement or {},
|
|
relevance_hint=max(0.0, min(1.0, float(relevance_hint or 0.0))),
|
|
why_relevant=why_relevant.strip(),
|
|
snippet=snippet.strip(),
|
|
metadata=metadata or {},
|
|
)
|
|
|
|
|
|
def _normalize_reddit(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
top_comments = item.get("top_comments") or []
|
|
comment_text = _join_comment_excerpts(top_comments, "excerpt")
|
|
body = "\n".join(
|
|
part
|
|
for part in [
|
|
str(item.get("title") or "").strip(),
|
|
str(item.get("selftext") or "").strip(),
|
|
comment_text,
|
|
]
|
|
if part
|
|
)
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"R{index + 1}"),
|
|
source=source,
|
|
title=str(item.get("title") or ""),
|
|
body=body,
|
|
url=str(item.get("url") or ""),
|
|
author=None,
|
|
container=str(item.get("subreddit") or ""),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=comment_text or str(item.get("selftext") or "")[:400],
|
|
metadata={
|
|
"top_comments": top_comments,
|
|
"comment_insights": item.get("comment_insights") or [],
|
|
},
|
|
)
|
|
|
|
|
|
def _normalize_x(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
text = str(item.get("text") or "").strip()
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"X{index + 1}"),
|
|
source=source,
|
|
title=text[:140] or f"X post {index + 1}",
|
|
body=text,
|
|
url=str(item.get("url") or ""),
|
|
author=str(item.get("author_handle") or "").lstrip("@"),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
)
|
|
|
|
|
|
def _normalize_youtube(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
transcript = str(item.get("transcript_snippet") or "").strip()
|
|
description = str(item.get("description") or "").strip()
|
|
title = str(item.get("title") or "").strip()
|
|
highlights = item.get("transcript_highlights") or []
|
|
metadata: dict[str, Any] = {}
|
|
if highlights:
|
|
metadata["transcript_highlights"] = highlights
|
|
metadata["top_comments"] = _remap_comments(
|
|
item.get("top_comments") or [],
|
|
score_keys=("score", "likes"),
|
|
excerpt_keys=("excerpt", "text"),
|
|
)
|
|
return _source_item(
|
|
item_id=str(item.get("video_id") or item.get("id") or f"YT{index + 1}"),
|
|
source=source,
|
|
title=title,
|
|
body="\n".join(part for part in [title, description, transcript] if part),
|
|
url=str(item.get("url") or ""),
|
|
author=str(item.get("channel_name") or ""),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="high"),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=transcript,
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
def _normalize_shortform_video(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
id_prefix: str,
|
|
default_title: str,
|
|
) -> schema.SourceItem:
|
|
"""Shared normalizer for TikTok and Instagram (identical structure)."""
|
|
caption = str(item.get("caption_snippet") or "").strip()
|
|
text = str(item.get("text") or "").strip()
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"{id_prefix}{index + 1}"),
|
|
source=source,
|
|
title=text[:140] or caption[:140] or f"{default_title} {index + 1}",
|
|
body="\n".join(part for part in [text, caption] if part),
|
|
url=str(item.get("url") or ""),
|
|
author=str(item.get("author_name") or ""),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="high"),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=caption,
|
|
metadata={
|
|
"hashtags": item.get("hashtags") or [],
|
|
"top_comments": _remap_comments(
|
|
item.get("top_comments") or [],
|
|
# TikTok uses digg_count as the vote field; Instagram has no
|
|
# comment fetcher today so the key is harmlessly absent.
|
|
score_keys=("score", "digg_count", "likes"),
|
|
excerpt_keys=("excerpt", "text"),
|
|
),
|
|
},
|
|
)
|
|
|
|
|
|
def _normalize_pinterest(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
"""Normalizer for Pinterest pins (visual content with descriptions).
|
|
|
|
Saves are the primary engagement signal, analogous to likes/upvotes.
|
|
"""
|
|
description = str(item.get("description") or "").strip()
|
|
return _source_item(
|
|
item_id=str(item.get("pin_id") or item.get("id") or f"PI{index + 1}"),
|
|
source=source,
|
|
title=description[:140] or f"Pinterest pin {index + 1}",
|
|
body=description,
|
|
url=str(item.get("url") or ""),
|
|
author=str(item.get("author") or ""),
|
|
container=str(item.get("board") or ""),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="low"),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=description[:400],
|
|
)
|
|
|
|
|
|
def _normalize_hackernews(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
top_comments = item.get("top_comments") or []
|
|
comment_text = _join_comment_excerpts(top_comments, "text")
|
|
title = str(item.get("title") or "").strip()
|
|
body = "\n".join(part for part in [title, str(item.get("text") or "").strip(), comment_text] if part)
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"HN{index + 1}"),
|
|
source=source,
|
|
title=title or f"HN story {index + 1}",
|
|
body=body,
|
|
url=str(item.get("url") or item.get("hn_url") or ""),
|
|
author=str(item.get("author") or ""),
|
|
container="Hacker News",
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="high"),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=comment_text,
|
|
metadata={
|
|
"hn_url": item.get("hn_url"),
|
|
"top_comments": top_comments,
|
|
"comment_insights": item.get("comment_insights") or [],
|
|
},
|
|
)
|
|
|
|
|
|
def _normalize_microblog(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
id_prefix: str,
|
|
default_title: str,
|
|
) -> schema.SourceItem:
|
|
"""Shared normalizer for Bluesky and Truth Social (identical structure)."""
|
|
text = str(item.get("text") or "").strip()
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"{id_prefix}{index + 1}"),
|
|
source=source,
|
|
title=text[:140] or f"{default_title} {index + 1}",
|
|
body=text,
|
|
url=str(item.get("url") or ""),
|
|
author=str(item.get("handle") or item.get("author_handle") or "").lstrip("@"),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="high"),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
metadata={"display_name": item.get("display_name")},
|
|
)
|
|
|
|
|
|
def _normalize_polymarket(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
title = str(item.get("title") or "").strip()
|
|
question = str(item.get("question") or "").strip()
|
|
engagement = {
|
|
"volume": item.get("volume1mo") or item.get("volume24hr") or 0,
|
|
"liquidity": item.get("liquidity") or 0,
|
|
}
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"PM{index + 1}"),
|
|
source=source,
|
|
title=title or question or f"Polymarket event {index + 1}",
|
|
body="\n".join(part for part in [title, question, str(item.get("price_movement") or "")] if part),
|
|
url=str(item.get("url") or ""),
|
|
author=None,
|
|
container="Polymarket",
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="high"),
|
|
engagement=engagement,
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=str(item.get("price_movement") or ""),
|
|
metadata={
|
|
"question": question,
|
|
"end_date": item.get("end_date"),
|
|
"outcome_prices": item.get("outcome_prices") or [],
|
|
"outcomes_remaining": item.get("outcomes_remaining"),
|
|
},
|
|
)
|
|
|
|
|
|
|
|
def _normalize_github(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
title = str(item.get("title") or "").strip()
|
|
snippet_text = str(item.get("snippet") or "").strip()
|
|
top_comments = item.get("metadata", {}).get("top_comments") or []
|
|
comment_text = _join_comment_excerpts(top_comments, "excerpt")
|
|
body = "\n".join(part for part in [title, snippet_text, comment_text] if part)
|
|
metadata = item.get("metadata") or {}
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"GH{index + 1}"),
|
|
source=source,
|
|
title=title or f"GitHub item {index + 1}",
|
|
body=body,
|
|
url=str(item.get("url") or ""),
|
|
author=str(item.get("author") or ""),
|
|
container=str(item.get("container") or ""),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date, default="high"),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=comment_text or snippet_text[:400],
|
|
metadata={
|
|
"top_comments": top_comments,
|
|
"labels": metadata.get("labels") or [],
|
|
"state": metadata.get("state", ""),
|
|
"is_pr": metadata.get("is_pr", False),
|
|
},
|
|
)
|
|
|
|
def _normalize_grounding(
|
|
source: str,
|
|
item: dict[str, Any],
|
|
index: int,
|
|
from_date: str,
|
|
to_date: str,
|
|
) -> schema.SourceItem:
|
|
title = str(item.get("title") or "").strip()
|
|
snippet = str(item.get("snippet") or "").strip()
|
|
url = str(item.get("url") or "").strip()
|
|
return _source_item(
|
|
item_id=str(item.get("id") or f"W{index + 1}"),
|
|
source=source,
|
|
title=title or _domain_from_url(url) or f"Web result {index + 1}",
|
|
body="\n".join(part for part in [title, snippet] if part),
|
|
url=url,
|
|
author=None,
|
|
container=str(item.get("source_domain") or _domain_from_url(url) or ""),
|
|
published_at=item.get("date"),
|
|
date_confidence=_date_confidence(item, from_date, to_date),
|
|
engagement=item.get("engagement") or {},
|
|
relevance_hint=item.get("relevance", 0.5),
|
|
why_relevant=str(item.get("why_relevant") or ""),
|
|
snippet=snippet,
|
|
metadata=item.get("metadata") or {},
|
|
)
|