Pipeline Specification for Vishal¶
Audience: Vishal (CEO / Enterprise Software Architect) and the pipeline engineering team.
What this is: The contract — what the pipeline must produce, with what quality, freshness, and lineage. This does NOT prescribe architecture (event-driven vs batch, lakehouse vs warehouse, Spark vs Flink — Vishal's call). It defines the interface between the pipeline and the product/AI layers.
What this is not: A build plan, a tech-stack decision, or a timeline. Those live elsewhere.
1. Pipeline responsibilities (high level)¶
flowchart LR
Sources[Raw Sources<br/>MahaRERA, IGR, GRs,<br/>MahaBhulekh, GIS,<br/>News, Social, Internal]
Pipeline[Pipeline<br/>Vishal owns]
CanonStore[Canonical Attribute Store<br/>~140 attrs with quality passport]
DerivedEngine[Derived Engine<br/>Scores, indices, simulations]
ProductAPI[Product API<br/>Broker, Analytix, Fractional AI]
Sources --> Pipeline
Pipeline --> CanonStore
CanonStore --> DerivedEngine
DerivedEngine --> CanonStore
CanonStore --> ProductAPI
The pipeline is responsible for everything from Sources to Canonical Store. The Derived Engine may sit inside or outside the pipeline (Vishal's call), but it must consume and write to the same Canonical Store.
2. Input contract — what the pipeline ingests¶
2.1 Source adapters required (Day 1)¶
| Source | Format | Adapter type | Priority |
|---|---|---|---|
| MahaRERA project pages | HTML + PDFs | Scraper + LLM extraction | Critical |
| IGR Index-II (priority micromarkets) | Portal search + licensed feed CSV | Scraper + feed parser | Critical |
| Government Resolutions | PDF (Marathi/English) | Scraper + OCR + LLM classification | Critical |
| MahaBhulekh 7/12 | HTML / digital PDF | Per-property scraper | High |
| ASR / Ready Reckoner | Portal lookup | Scraper | High |
| PropPie Fractional DB | Internal Postgres / API | DB connector | High |
2.2 Source adapters required (Month 3-6)¶
| Source | Format | Adapter type | Priority |
|---|---|---|---|
| News feeds | RSS + HTML | Scraper + entity linker | High |
| Social listening | API / scraper | Feed parser + sentiment | Medium |
| GIS layers (MRSAC, OSM, Bhuvan) | Shapefiles / GeoJSON / WMS | Spatial ingestor | High |
| CERSAI | API | API adapter | High |
| Licensed feeds (PropEquity, CRE Matrix) | CSV / API | Feed parser | Medium |
| Macro data (RBI, MoSPI) | CSV / API | Scraper | Medium |
2.3 Extraction schema alignment¶
The pipeline already uses real_estate_extraction_schema.json (v2, 16 top-level objects). The canonical attribute catalogue in data-attributes.md maps to and extends this schema:
| Schema v2 object | Maps to attribute category | Notes |
|---|---|---|
project_identity |
01 Project Identity | Direct 1:1 |
location |
02 Location & Geospatial | Extends with micromarket + proximity |
land_and_area_metrics |
03 Land & Area Metrics | Direct 1:1 |
plot_details |
03 Land & Area Metrics | Merged |
land_transaction_history |
05 Legal & Compliance | Chain of title |
parties |
04 Parties & Developer | Extends with trust score |
legal_and_compliance |
05 Legal & Compliance | Extends with CERSAI, clarity score |
rera_form_b |
01, 09 | Project Identity + Unit & Config |
architectural_plans |
03, 09 | Areas + unit mix |
escrow_bank_account |
07 Financial | Direct |
loan_disclosure |
07 Financial | Direct |
financial_details |
07 Financial | Direct |
project_info |
01, 06 | Project Identity + Approvals |
documents |
06 Approvals | Document presence flags |
source_metadata |
Operational columns | Lineage |
person_or_entity |
04 Parties | Structured party |
source_document / source_pdf |
Operational columns | Document lineage |
unmapped_fields |
— | Overflow; review quarterly |
| (new — not in schema v2) | 07 Financial (IGR), 08 Tenant, 10-16 | New adapters needed |
Action for Vishal: extend schema v2 to cover IGR transactions (Category 07 sale-level), L&L agreements (Category 08), and prepare the operational columns (confidence, source_url, extracted_at, etc.) as mandatory on every record.
3. Output contract — what the pipeline must produce¶
3.1 Canonical Attribute Store¶
The pipeline writes to a single canonical store. Every record is an (entity_id, attribute_id, value, quality_passport) tuple.
Entity types:
| Entity | Primary key | Example |
|---|---|---|
| Project | proj.id (UUID) |
A MahaRERA-registered project |
| Transaction | txn.id (UUID) |
A single IGR-registered deed |
| Lease | lease.id (UUID) |
A single IGR-registered L&L |
| Micromarket | loc.micromarket_id |
A geographic cluster |
| Developer | party.promoter_pan |
A promoter entity |
| GR | gr.id (UUID) |
A single government resolution |
| Asset (Fractional) | frac.spv_id |
A PropPie Fractional SPV |
3.2 Quality passport (mandatory per attribute instance)¶
Every written value must include these columns (non-negotiable):
{
"source": "maharera",
"source_url": "https://maharera.maharashtra.gov.in/project/P52100012345",
"source_doc_id": "doc-uuid-abc",
"source_doc_page": 3,
"extracted_at": "2026-05-15T14:30:00+05:30",
"extraction_method": "llm_gpt4o",
"extraction_model_version": "gpt-4o-2025-12-01",
"confidence": 0.92,
"confidence_method": "model",
"human_verified": false,
"conflict_resolution_applied": null,
"last_changed_at": "2026-05-15T14:30:00+05:30"
}
If any field cannot be populated, set to null with null_reason.
3.3 Change history¶
For attributes that revision over time (completion dates, cost estimates, unit counts), maintain an append-only change history:
{
"change_history": [
{ "value": "2027-03-31", "source": "Form B v3", "extracted_at": "2026-01-14T00:00:00+05:30" },
{ "value": "2026-12-31", "source": "Form B v2", "extracted_at": "2025-06-20T00:00:00+05:30" }
]
}
4. Freshness SLAs¶
The pipeline must meet these ingestion cadences (from data-quality-framework.md):
| Source | Cadence | Stale after | Alert |
|---|---|---|---|
| MahaRERA project pages | Monthly re-scrape | 120 days | Yes |
| IGR (priority micromarkets) | Daily | 3 days | Yes |
| IGR (other) | Weekly | 14 days | Yes |
| GRs | Daily | 48 hours | Yes |
| MahaBhulekh | On-demand | 180 days | No |
| GIS layers | Quarterly | 6 months | No |
| News | Hourly | 6 hours | Yes |
| Social | Hourly | 6 hours | No (non-critical) |
| Macro | Weekly | 14 days | Yes |
| Internal (Fractional) | Daily | 7 days | Yes |
Pipeline must produce a daily freshness report surfacing any source past its staleness threshold.
5. Confidence SLAs¶
Target confidence by attribute priority (from data-quality-framework.md):
| Priority | Confidence floor |
|---|---|
critical |
0.85 |
high |
0.70 |
medium |
0.60 |
low |
No floor |
Pipeline must reject (do not write to canonical store) any extraction with confidence < 0.30. Values between 0.30 and the floor for their priority are written but flagged for review.
6. Conflict resolution rules¶
When the pipeline encounters a value that differs from what's already in the canonical store:
Priority hierarchy (highest to lowest):
IGR > MahaRERA > MahaBhulekh > GRs > GIS > Internal > Licensed > News > Social > Listings
Rules:
1. Higher-priority source → overwrite; log old value
2. Same-priority source → fresher extraction wins; log conflict
3. Lower-priority source → do NOT overwrite; store as supplementary
4. Same-priority + delta > 50% on numeric → flag for human review; do NOT auto-resolve
Pipeline must maintain a conflict log (see data-quality-framework.md section 6.3).
7. Entity resolution requirements¶
The hardest problem. The pipeline must resolve entities across sources:
7.1 Project resolution¶
Link a MahaRERA project to its IGR transactions:
| Signal | Weight | Notes |
|---|---|---|
| Survey / CTS number overlap | High | Same land parcel |
| Geocoded address proximity (< 200m) | High | With textual fuzzy match |
| Promoter name in IGR deed matches MahaRERA promoter | High | Buyer = promoter → primary sale |
| Project name in IGR property description | Medium | Often abbreviated or misspelled |
| Timing (sale date within project's active period) | Low | Necessary but not sufficient |
Expected match rate: 40-60% for residential; higher for commercial promoter→buyer sales.
Pipeline must emit a resolution_confidence per link (0-1) and a resolution_method (which signals matched).
7.2 Developer resolution¶
Link a promoter's PAN across projects, MCA filings, court records:
| Signal | Weight |
|---|---|
| PAN match | Definitive |
| CIN match (company) | Definitive |
| Name + address fuzzy match | High |
| Director overlap (from MCA) | High |
| Group-company linkage | Medium |
7.3 Micromarket assignment¶
Assign every project and transaction to a micromarket:
- See
loc.micromarket_idspec inderived-attributes-spec.md - Pipeline must geocode every address to lat/lng (Google Geocoding API fallback for MahaRERA addresses without coordinates)
- Clustering runs quarterly; new entities assigned on ingest using nearest-cluster assignment
8. Derived attribute computation¶
The pipeline (or a closely coupled engine) is responsible for computing derived attributes per the specs in derived-attributes-spec.md.
8.1 Computation responsibilities¶
| Derived attribute | Trigger | Latency target |
|---|---|---|
fin.asr_gap_pct |
New IGR transaction ingested | < 5 minutes |
fin.price_per_sqft_carpet |
New transaction + area available | < 5 minutes |
fin.cost_overrun_pct |
New Form B version | < 1 hour |
mkt.transaction_velocity_* |
Daily batch | Nightly |
mkt.median_price_per_sqft_* |
Daily batch | Nightly |
mkt.sector_momentum_pct |
Weekly batch | Weekly |
mkt.cap_rate_median, mkt.yield_benchmark |
Weekly batch | Weekly |
mkt.micromarket_lifecycle_stage |
Monthly batch | Monthly |
dev.trust_score |
Weekly batch | Weekly |
legal.title_clarity_score |
On new legal doc + monthly batch | < 1 hour (event) / nightly (batch) |
lease.tenant_anchor_quality_score |
On new L&L + weekly batch | < 1 hour / weekly |
risk.zone_risk_index |
Monthly batch | Monthly |
policy.tailwind_flags, policy.headwind_flags |
On new GR ingested | < 1 hour |
infra.upcoming_projects |
On new GR + weekly | Weekly |
loc.infra_proximity_score |
Quarterly | Quarterly |
ai.sentiment_score |
Daily batch | Nightly |
ai.*_narrative, ai.wealth_trajectory_*, ai.comparable_set, ai.persona_fit_score |
On-demand (per user query) | < 8 seconds |
8.2 Computation contract¶
Every derived computation must:
- Log inputs: attribute IDs + values at computation time (immutable audit record)
- Tag version: formula version string (e.g.,
trust_score_v1.2) - Compute confidence: per propagation rules in
data-quality-framework.md - Be reproducible: same inputs + same version → same output (seed stochastic methods)
9. Document storage and retrieval¶
9.1 Document ingestion¶
- Every PDF from MahaRERA, IGR, GRs stored in object storage with metadata
- Metadata:
doc_id,doc_type,source,source_url,ingested_at,page_count,language,ocr_quality - Full text extracted and indexed for RAG retrieval
9.2 RAG-ready output¶
For the AI layer to cite sources:
- Every document must be chunked (page-level or semantic chunks)
- Each chunk indexed in a vector store (Vishal's choice: Qdrant, pgvector, Pinecone)
- Chunk metadata: doc_id, page, chunk_index, entity_ids (which projects/transactions this chunk relates to)
The AI layer queries: "Give me chunks relevant to [project X] from [legal documents]" → pipeline returns ranked chunks with source metadata.
10. API surface (pipeline → product)¶
The pipeline exposes its data through an API (REST or GraphQL — Vishal's choice). Required endpoints/capabilities:
| Capability | Example | Latency |
|---|---|---|
| Get project by ID | /project/{proj.id} → all attributes + quality passports |
< 200ms |
| Get transactions for micromarket | /transactions?micromarket={id}&days=90 |
< 500ms |
| Get developer profile | /developer/{pan} → projects + trust score + change history |
< 300ms |
| Search projects | /search?q=hinjewadi+2bhk&segment=residential |
< 500ms |
| Get comparable set | /comparables?project={id}&k=10 |
< 1s |
| Get GR feed | /grs?micromarket={id}&days=90 |
< 500ms |
| Get document chunks (RAG) | /chunks?entity={id}&doc_type=legal&query=... |
< 1s |
| Quality dashboard metrics | /quality/freshness, /quality/confidence |
< 500ms |
| Conflict log | /quality/conflicts?attribute={id} |
< 500ms |
Authentication: internal service-to-service auth. Public API (if ever) is a separate layer.
11. Monitoring the pipeline emits¶
The pipeline must expose:
| Metric | Format | Consumed by |
|---|---|---|
| Extraction success/failure counts per source per hour | Time-series | Grafana/Prometheus |
| Confidence distribution per attribute category | Histogram | Quality dashboard |
| Fill rate per attribute | Gauge | Quality dashboard |
| Freshness per source (last successful extraction) | Gauge | Alerting |
| Conflict volume per attribute per day | Counter | Quality dashboard |
| Entity resolution match rate | Gauge | Quality dashboard |
| Derived computation latency | Histogram | SRE |
| API latency per endpoint | Histogram | SRE |
12. Architectural questions for Vishal¶
These are for Vishal to decide. Document decisions in ../../docs/90-memory/MEMORY.md when made.
| Question | Options | Implications |
|---|---|---|
| Storage paradigm | Lakehouse (Delta/Iceberg) vs data warehouse (Postgres/BigQuery) vs hybrid | Cost, query flexibility, schema evolution |
| Batch vs event-driven | Pure batch, pure event, hybrid | Freshness vs complexity trade-off |
| Orchestration | Airflow, Dagster, Prefect, Temporal | Team familiarity, observability |
| Vector store | pgvector (simple), Qdrant (dedicated), Pinecone (managed) | RAG performance, ops overhead |
| LLM provider | OpenAI, Anthropic, AWS Bedrock, self-hosted | Cost, latency, data residency |
| Geo stack | PostGIS + OSRM vs managed (Google Maps Platform) | Cost vs accuracy vs ops |
| Cloud | AWS, Azure, GCP | Team familiarity, pricing |
| Language for pipeline | Python (FastAPI/Celery), Go, Rust for hot paths | Team skills, performance needs |
| Schema evolution | Strict versioning vs loose (additionalProperties) | Stability vs agility |
| Multi-tenancy (for B2B) | Shared vs isolated per client | Cost, security, scaling |
13. Acceptance criteria¶
The pipeline is "production-ready for Broker beta" when:
- MahaRERA: all ~40K projects ingested with quality passport; fill rate ≥ 90% on critical attributes
- IGR: priority micromarkets (top 20 Pune + top 20 MMR) daily; others weekly
- GRs: last 12 months ingested and classified; daily going forward
- Entity resolution: project ↔ IGR link rate ≥ 40% in priority micromarkets
- Derived attributes: all
criticalderived attributes computed and validated on 100-project sample - Quality dashboard: live
- API: all required endpoints functional with documented latency
- RAG: documents chunked and queryable for priority micromarkets
14. What I (COO) need from the pipeline that I don't have today¶
This is the gap list. Every item here blocks a product feature.
| I need | For | Blocking |
|---|---|---|
| IGR transaction data at micromarket level | Transaction velocity, price benchmarks | B2C Broker beta |
| GR classification output | Policy tailwind/headwind | B2C + B2B |
| Entity-resolved project-to-IGR links | ASR Gap, comparable set | B2C + B2B |
| Developer cross-project profile | Trust score | B2C + B2B |
| Document chunks in vector store | Cited answers in Broker | B2C |
| L&L agreement parsing | Yield benchmarks, tenant intel | B2B Analytix |
| Quality dashboard | Team confidence in what we're serving | All |
See also:
- data-attributes.md — the attribute catalogue this spec implements
- derived-attributes-spec.md — math for every derived attribute
- data-quality-framework.md — SLAs and monitoring
- data-sources.md — source catalogue with access patterns