This project provides a robust and scalable Change Data Capture (CDC) system for synchronizing data between two PostgreSQL databases in near real-time. It is designed to be highly configurable and easy to deploy, making it ideal for a variety of use cases, including data replication, ETL pipelines, and database migrations.
- Real-time Data Sync: Captures and applies changes as they happen.
- Declarative Configuration: Define your sync jobs using a simple YAML file.
- Schema-Aware: Automatically detects your database schema to simplify configuration.
- Extensible Architecture: The system is composed of a CLI, a Control Plane, and an Agent, which can be scaled and customized independently.
- LLM-Powered: Leverages Large Language Models for advanced features like conversational configuration edits and automatic schema drift handling.
These instructions will get you a copy of the project up and running on your local machine for development and testing purposes.
- Docker
- Go (version 1.19 or later)
- Node.js (version 14 or later)
- Python (version 3.9 or later) with
pip
-
Clone the repository:
git clone [email protected]:mintunitish/repgine.git cd repgine
-
Install Python dependencies:
pip install -r cli/requirements.txt
-
Install Node.js dependencies:
npm install --prefix control/api npm install --prefix control/ui
-
Set up environment variables: The
initcommand requires theSOURCE_DSNenvironment variable to be set to your source PostgreSQL database connection string.export SOURCE_DSN="postgresql://user:password@host:port/dbname"
The primary way to interact with the system is through the repgine CLI.
Generate a sync.yaml file by introspecting your source database.
python cli/main.py initThis will create a sync.yaml file in your project root, pre-populated with the tables and columns from your source database.
Open the generated sync.yaml and customize it to your needs. You can select which tables to sync, rename columns, or apply transformations.
version: "1.0"
source_dsn: "postgresql://user:pass@host:port/sourcedb"
target_dsn: "postgresql://user:pass@host:port/targetdb"
tables:
- table_name: "public.users"
columns:
- "id"
- "name"
- "email"
- table_name: "public.orders"
# This table will be excluded
sync: falseCheck your sync.yaml file for correctness against the schema.
python cli/main.py validate sync.yamlStart the control plane and deploy the agent to begin the synchronization process.
# Start the control plane API
node control/api/index.js &
# Deploy the sync job
python cli/main.py deployCheck the status and metrics of your running sync job.
python cli/main.py statusThe system is composed of three core components that work together to provide a seamless data synchronization experience.
+-----------------+ +-----------------+ +---------------+
| CLI |----->| Control Plane |<-----| Agent |
| (main.py) | | (API & UI) | | (reader, etc) |
+-----------------+ +-----------------+ +---------------+
- CLI (
cli): A Python-based command-line interface for initializing configurations, validating schemas, and deploying sync jobs. - Control Plane (
control): A Node.js application that provides a central API for managing and monitoring sync jobs. It also includes a minimal UI for observability. - Agent (
agent): A high-performance Go agent that performs the heavy lifting. It connects to the source database, reads the Write-Ahead Log (WAL) usingpgoutput, transforms the data according to thesync.yamlconfiguration, and applies it to the target database.
The Streamer is a key part of the agent responsible for applying changes from the source database to the target database. It reads WALMessage events (INSERT, UPDATE, DELETE) and constructs the appropriate SQL statements to execute against the target.
The Streamer is configured via the sync.yaml file. This file defines which tables to watch and how to handle them. The Streamer specifically uses the tables and pk fields to construct its SQL queries.
Here is an example of a sync.yaml configuration:
tables:
users:
name: users
pk: id
products:
name: products
pk: product_idIn this example, the Streamer will apply changes to the users and products tables, using the id and product_id columns as the primary keys for UPDATE and DELETE operations, respectively.
The Streamer includes both unit and integration tests.
- Unit Tests: These tests use mocking to verify the SQL generation logic without requiring a live database.
go test ./agent/stream/... - Integration Tests: These tests require live source and target database connections and verify the end-to-end data flow. To run them, set the
SOURCE_DB_DSNandTARGET_DB_DSNenvironment variables:export SOURCE_DB_DSN="postgresql://user:pass@host:port/sourcedb" export TARGET_DB_DSN="postgresql://user:pass@host:port/targetdb" go test -v -run TestStreamer_Integration ./agent/stream
To set up a development environment, ensure you have the prerequisites installed, then follow the installation steps.
The project includes unit and integration tests for each module.
- CLI Tests:
pytest tests/cli/
- Control Plane API Tests:
npm test --prefix control/api - Agent Tests:
go test ./agent/...
A Dockerfile is provided to build and run the agent in a containerized environment.
docker build -t repgine-agent .
docker run -e SOURCE_DSN="..." -e TARGET_DSN="..." repgine-agentContributions are welcome! Please feel free to submit a pull request.
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/AmazingFeature) - Commit your Changes (
git commit -m 'Add some AmazingFeature') - Push to the Branch (
git push origin feature/AmazingFeature) - Open a Pull Request
Repgine © 2025 by Nitish Kumar is licensed under CC BY-SA 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by-sa/4.0/