sync_engine
sync_engine
¶
SyncEngine — checkpoint/resume orchestration for connector syncs.
Wraps IngestionPipeline with a lightweight SQLite state database so that
long-running syncs can be interrupted and resumed from the last saved cursor.
Typical usage::
store = KnowledgeStore(db_path=":memory:")
pipeline = IngestionPipeline(store)
engine = SyncEngine(pipeline)
items = engine.sync(connector) # first run
items = engine.sync(connector) # resumes from saved cursor
cp = engine.get_checkpoint(connector.connector_id)
Classes¶
SyncEngine
¶
SyncEngine(pipeline: IngestionPipeline, *, state_db: str = '')
Orchestrate connector syncs with checkpoint/resume tracking.
| PARAMETER | DESCRIPTION |
|---|---|
pipeline
|
The
TYPE:
|
state_db
|
Path to the SQLite database used for checkpoint state. If empty,
defaults to
TYPE:
|
Source code in src/openjarvis/connectors/sync_engine.py
Functions¶
sync
¶
sync(connector: BaseConnector) -> int
Run a full sync for connector and return the number of items ingested.
Resumes from the last saved cursor if one exists. Documents are batched in groups of 100 before being handed to the pipeline; a checkpoint is saved after every batch and once more at the end.
On error the checkpoint is updated with the error message and the exception is re-raised so callers can handle it.
Source code in src/openjarvis/connectors/sync_engine.py
get_checkpoint
¶
Return the last checkpoint, or None if never synced.