Skip to main content

Sync Configuration Guide

This guide covers configuring data synchronization between Supernal Coding and external systems, including real-time sync, batch processing, and conflict resolution strategies.

Overview

Data synchronization ensures consistency between Supernal Coding and your existing systems. This guide covers:

  • Sync configuration and setup
  • Real-time vs batch synchronization
  • Conflict resolution strategies
  • Performance optimization
  • Monitoring and troubleshooting

Sync Configuration

Basic Configuration

interface SyncConfig {
source: {
type: 'database' | 'api' | 'file' | 'webhook';
connection: ConnectionConfig;
authentication: AuthConfig;
};
target: {
endpoint: string;
authentication: AuthConfig;
};
sync: {
mode: 'real-time' | 'batch' | 'hybrid';
frequency?: string; // For batch mode
batchSize?: number;
retryPolicy: RetryConfig;
};
mapping: FieldMapping[];
filters: FilterConfig[];
conflictResolution: ConflictResolutionConfig;
}

const syncConfig: SyncConfig = {
source: {
type: 'database',
connection: {
host: 'localhost',
port: 5432,
database: 'compliance_db',
ssl: true,
},
authentication: {
type: 'credentials',
username: process.env.DB_USERNAME,
password: process.env.DB_PASSWORD,
},
},
target: {
endpoint: 'https://api.supernal-coding.com/v1/sync',
authentication: {
type: 'api-key',
apiKey: process.env.SUPERNAL_API_KEY,
},
},
sync: {
mode: 'hybrid',
frequency: '*/15 * * * *', // Every 15 minutes
batchSize: 100,
retryPolicy: {
maxRetries: 3,
backoffStrategy: 'exponential',
initialDelay: 1000,
},
},
mapping: [
{
source: 'requirement_id',
target: 'id',
transform: (value) => `req-${value}`,
},
{
source: 'title',
target: 'title',
},
{
source: 'compliance_framework',
target: 'framework',
transform: (value) => value.toLowerCase(),
},
],
filters: [
{
field: 'status',
operator: 'in',
values: ['active', 'pending'],
},
],
conflictResolution: {
strategy: 'last-writer-wins',
customRules: [],
},
};

Advanced Configuration

class SyncConfigurationManager {
private configs: Map<string, SyncConfig> = new Map();
private validators: Map<string, ConfigValidator> = new Map();

addSyncConfiguration(name: string, config: SyncConfig): void {
// Validate configuration
const validator = this.getValidator(config.source.type);
const validation = validator.validate(config);

if (!validation.isValid) {
throw new Error(`Invalid configuration: ${validation.errors.join(', ')}`);
}

this.configs.set(name, config);
}

getSyncConfiguration(name: string): SyncConfig | undefined {
return this.configs.get(name);
}

updateSyncConfiguration(name: string, updates: Partial<SyncConfig>): void {
const existing = this.configs.get(name);
if (!existing) {
throw new Error(`Configuration '${name}' not found`);
}

const updated = { ...existing, ...updates };
this.addSyncConfiguration(name, updated);
}

listConfigurations(): string[] {
return Array.from(this.configs.keys());
}

private getValidator(sourceType: string): ConfigValidator {
const validator = this.validators.get(sourceType);
if (!validator) {
throw new Error(`No validator found for source type: ${sourceType}`);
}
return validator;
}
}

Synchronization Modes

Real-Time Synchronization

class RealTimeSyncManager {
private connections: Map<string, SyncConnection> = new Map();
private eventBus: EventBus;

constructor(eventBus: EventBus) {
this.eventBus = eventBus;
}

async startRealTimeSync(
configName: string,
config: SyncConfig
): Promise<void> {
const connection = await this.createConnection(config);

// Set up change detection
connection.onDataChange(async (change: DataChange) => {
try {
await this.processChange(change, config);
this.eventBus.emit('sync.change_processed', { configName, change });
} catch (error) {
this.eventBus.emit('sync.change_error', { configName, change, error });
await this.handleSyncError(error, change, config);
}
});

await connection.start();
this.connections.set(configName, connection);
}

async stopRealTimeSync(configName: string): Promise<void> {
const connection = this.connections.get(configName);
if (connection) {
await connection.stop();
this.connections.delete(configName);
}
}

private async processChange(
change: DataChange,
config: SyncConfig
): Promise<void> {
// Apply filters
if (!this.passesFilters(change.data, config.filters)) {
return;
}

// Transform data
const transformedData = this.transformData(change.data, config.mapping);

// Send to target
await this.sendToTarget(transformedData, change.operation, config.target);
}

private passesFilters(data: any, filters: FilterConfig[]): boolean {
return filters.every((filter) => {
const value = data[filter.field];

switch (filter.operator) {
case 'equals':
return value === filter.value;
case 'in':
return filter.values?.includes(value);
case 'not_null':
return value != null;
case 'greater_than':
return value > filter.value;
default:
return true;
}
});
}
}

Batch Synchronization

class BatchSyncManager {
private scheduler: CronScheduler;
private batchProcessor: BatchProcessor;

constructor() {
this.scheduler = new CronScheduler();
this.batchProcessor = new BatchProcessor();
}

scheduleBatchSync(configName: string, config: SyncConfig): void {
if (config.sync.mode !== 'batch' && config.sync.mode !== 'hybrid') {
throw new Error('Batch sync requires batch or hybrid mode');
}

const job = this.scheduler.schedule(config.sync.frequency!, async () => {
await this.runBatchSync(configName, config);
});

console.log(
`Scheduled batch sync '${configName}' with frequency: ${config.sync.frequency}`
);
}

async runBatchSync(
configName: string,
config: SyncConfig
): Promise<BatchSyncResult> {
const startTime = Date.now();
const result: BatchSyncResult = {
configName,
startTime: new Date(startTime),
endTime: new Date(),
recordsProcessed: 0,
recordsSucceeded: 0,
recordsFailed: 0,
errors: [],
};

try {
// Fetch data from source
const sourceData = await this.fetchSourceData(config);
result.recordsProcessed = sourceData.length;

// Process in batches
const batches = this.createBatches(
sourceData,
config.sync.batchSize || 100
);

for (const batch of batches) {
try {
await this.processBatch(batch, config);
result.recordsSucceeded += batch.length;
} catch (error) {
result.recordsFailed += batch.length;
result.errors.push({
batch: batch.map((item) => item.id),
error: error.message,
});
}
}
} catch (error) {
result.errors.push({
batch: [],
error: `Batch sync failed: ${error.message}`,
});
}

result.endTime = new Date();
return result;
}

private async fetchSourceData(config: SyncConfig): Promise<any[]> {
const connection = await this.createSourceConnection(config);

try {
const query = this.buildQuery(config.filters);
return await connection.query(query);
} finally {
await connection.close();
}
}

private createBatches<T>(data: T[], batchSize: number): T[][] {
const batches: T[][] = [];

for (let i = 0; i < data.length; i += batchSize) {
batches.push(data.slice(i, i + batchSize));
}

return batches;
}

private async processBatch(batch: any[], config: SyncConfig): Promise<void> {
const transformedBatch = batch.map((item) =>
this.transformData(item, config.mapping)
);

await this.sendBatchToTarget(transformedBatch, config.target);
}
}

Hybrid Synchronization

class HybridSyncManager {
private realTimeManager: RealTimeSyncManager;
private batchManager: BatchSyncManager;
private changeTracker: ChangeTracker;

constructor() {
this.realTimeManager = new RealTimeSyncManager(new EventBus());
this.batchManager = new BatchSyncManager();
this.changeTracker = new ChangeTracker();
}

async startHybridSync(configName: string, config: SyncConfig): Promise<void> {
// Start real-time sync for immediate changes
await this.realTimeManager.startRealTimeSync(configName, config);

// Schedule batch sync for missed changes
this.batchManager.scheduleBatchSync(`${configName}-batch`, {
...config,
sync: {
...config.sync,
mode: 'batch',
},
});

// Set up change tracking for conflict detection
this.changeTracker.startTracking(configName, config);
}

async stopHybridSync(configName: string): Promise<void> {
await this.realTimeManager.stopRealTimeSync(configName);
this.batchManager.cancelBatchSync(`${configName}-batch`);
this.changeTracker.stopTracking(configName);
}
}

Data Transformation

Field Mapping

interface FieldMapping {
source: string;
target: string;
transform?: (value: any, record: any) => any;
validate?: (value: any) => boolean;
required?: boolean;
defaultValue?: any;
}

class DataTransformer {
transform(data: any, mappings: FieldMapping[]): any {
const result: any = {};
const errors: string[] = [];

for (const mapping of mappings) {
try {
let value = this.getNestedValue(data, mapping.source);

// Apply default value if needed
if (value === undefined && mapping.defaultValue !== undefined) {
value = mapping.defaultValue;
}

// Check required fields
if (mapping.required && (value === undefined || value === null)) {
errors.push(`Required field '${mapping.target}' is missing`);
continue;
}

// Apply transformation
if (value !== undefined && mapping.transform) {
value = mapping.transform(value, data);
}

// Validate transformed value
if (
value !== undefined &&
mapping.validate &&
!mapping.validate(value)
) {
errors.push(`Validation failed for field '${mapping.target}'`);
continue;
}

// Set transformed value
if (value !== undefined) {
this.setNestedValue(result, mapping.target, value);
}
} catch (error) {
errors.push(
`Error transforming field '${mapping.source}': ${error.message}`
);
}
}

if (errors.length > 0) {
throw new Error(`Transformation errors: ${errors.join(', ')}`);
}

return result;
}

private getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((current, key) => current?.[key], obj);
}

private setNestedValue(obj: any, path: string, value: any): void {
const keys = path.split('.');
const lastKey = keys.pop()!;
const target = keys.reduce((current, key) => {
if (!current[key]) current[key] = {};
return current[key];
}, obj);

target[lastKey] = value;
}
}

Custom Transformations

class CustomTransformations {
static dateToISO(value: any): string {
if (!value) return '';
return new Date(value).toISOString();
}

static normalizeFramework(value: string): string {
const frameworkMap: Record<string, string> = {
'iso-13485': 'iso13485',
ISO_13485: 'iso13485',
'fda-21-cfr-11': 'fda21cfr11',
FDA_21_CFR_11: 'fda21cfr11',
gdpr: 'gdpr',
GDPR: 'gdpr',
'soc-2': 'soc2',
SOC_2: 'soc2',
};

return frameworkMap[value] || value.toLowerCase();
}

static parseJSON(value: string): any {
try {
return JSON.parse(value);
} catch {
return null;
}
}

static generateId(value: any, record: any): string {
return `${record.type}-${value}`;
}

static concatenateFields(fields: string[]) {
return (value: any, record: any) => {
return fields
.map((field) => record[field])
.filter(Boolean)
.join(' ');
};
}
}

Conflict Resolution

Conflict Detection

class ConflictDetector {
detectConflicts(localData: any, remoteData: any): DataConflict[] {
const conflicts: DataConflict[] = [];

// Compare each field
for (const [key, localValue] of Object.entries(localData)) {
const remoteValue = remoteData[key];

if (remoteValue !== undefined && localValue !== remoteValue) {
conflicts.push({
field: key,
localValue,
remoteValue,
type: this.getConflictType(localValue, remoteValue),
});
}
}

return conflicts;
}

private getConflictType(localValue: any, remoteValue: any): ConflictType {
if (typeof localValue !== typeof remoteValue) {
return 'type_mismatch';
}

if (Array.isArray(localValue) && Array.isArray(remoteValue)) {
return 'array_conflict';
}

if (typeof localValue === 'object' && localValue !== null) {
return 'object_conflict';
}

return 'value_conflict';
}
}

Resolution Strategies

interface ConflictResolutionStrategy {
resolve(conflict: DataConflict, context: ResolutionContext): any;
}

class LastWriterWinsStrategy implements ConflictResolutionStrategy {
resolve(conflict: DataConflict, context: ResolutionContext): any {
return context.remoteTimestamp > context.localTimestamp
? conflict.remoteValue
: conflict.localValue;
}
}

class MergeArrayStrategy implements ConflictResolutionStrategy {
resolve(conflict: DataConflict, context: ResolutionContext): any {
if (
!Array.isArray(conflict.localValue) ||
!Array.isArray(conflict.remoteValue)
) {
throw new Error('Merge array strategy requires array values');
}

// Merge arrays and remove duplicates
const merged = [...conflict.localValue, ...conflict.remoteValue];
return Array.from(new Set(merged));
}
}

class CustomRuleStrategy implements ConflictResolutionStrategy {
private rules: ConflictRule[];

constructor(rules: ConflictRule[]) {
this.rules = rules;
}

resolve(conflict: DataConflict, context: ResolutionContext): any {
for (const rule of this.rules) {
if (this.matchesRule(conflict, context, rule)) {
return rule.resolution(conflict, context);
}
}

// Default to last writer wins
return new LastWriterWinsStrategy().resolve(conflict, context);
}

private matchesRule(
conflict: DataConflict,
context: ResolutionContext,
rule: ConflictRule
): boolean {
return rule.condition(conflict, context);
}
}

Performance Optimization

Batch Processing Optimization

class OptimizedBatchProcessor {
private connectionPool: ConnectionPool;
private compressionEnabled: boolean;
private parallelBatches: number;

constructor(config: BatchProcessorConfig) {
this.connectionPool = new ConnectionPool(config.connectionPool);
this.compressionEnabled = config.compression || false;
this.parallelBatches = config.parallelBatches || 3;
}

async processBatches(
batches: any[][],
config: SyncConfig
): Promise<BatchResult[]> {
const results: BatchResult[] = [];

// Process batches in parallel with controlled concurrency
const semaphore = new Semaphore(this.parallelBatches);

const promises = batches.map(async (batch, index) => {
await semaphore.acquire();

try {
const result = await this.processSingleBatch(batch, config, index);
results[index] = result;
} finally {
semaphore.release();
}
});

await Promise.all(promises);
return results;
}

private async processSingleBatch(
batch: any[],
config: SyncConfig,
batchIndex: number
): Promise<BatchResult> {
const connection = await this.connectionPool.acquire();

try {
// Compress batch data if enabled
const data = this.compressionEnabled
? await this.compressData(batch)
: batch;

// Send batch with retry logic
const result = await this.sendWithRetry(data, config, batchIndex);

return {
batchIndex,
recordCount: batch.length,
success: true,
processingTime: result.processingTime,
};
} catch (error) {
return {
batchIndex,
recordCount: batch.length,
success: false,
error: error.message,
};
} finally {
this.connectionPool.release(connection);
}
}

private async sendWithRetry(
data: any,
config: SyncConfig,
batchIndex: number
): Promise<SendResult> {
const retryPolicy = config.sync.retryPolicy;
let lastError: Error;

for (let attempt = 0; attempt <= retryPolicy.maxRetries; attempt++) {
try {
const startTime = Date.now();
await this.sendToTarget(data, config.target);

return {
success: true,
processingTime: Date.now() - startTime,
};
} catch (error) {
lastError = error as Error;

if (attempt < retryPolicy.maxRetries) {
const delay = this.calculateDelay(attempt, retryPolicy);
await this.sleep(delay);
}
}
}

throw lastError!;
}

private calculateDelay(attempt: number, retryPolicy: RetryConfig): number {
switch (retryPolicy.backoffStrategy) {
case 'exponential':
return retryPolicy.initialDelay * Math.pow(2, attempt);
case 'linear':
return retryPolicy.initialDelay * (attempt + 1);
case 'fixed':
default:
return retryPolicy.initialDelay;
}
}
}

Memory Management

class MemoryEfficientSyncProcessor {
private maxMemoryUsage: number;
private currentMemoryUsage: number = 0;

constructor(maxMemoryMB: number) {
this.maxMemoryUsage = maxMemoryMB * 1024 * 1024; // Convert to bytes
}

async processLargeDataset(
dataSource: DataSource,
config: SyncConfig
): Promise<ProcessingResult> {
const stream = dataSource.createStream();
const processor = new StreamProcessor(config);

let processedCount = 0;
let errorCount = 0;

return new Promise((resolve, reject) => {
stream.on('data', async (chunk: any[]) => {
try {
// Check memory usage
if (this.currentMemoryUsage > this.maxMemoryUsage) {
await this.waitForMemoryRelease();
}

await processor.processChunk(chunk);
processedCount += chunk.length;
} catch (error) {
errorCount++;
console.error('Chunk processing error:', error);
}
});

stream.on('end', () => {
resolve({
processedCount,
errorCount,
success: errorCount === 0,
});
});

stream.on('error', reject);
});
}

private async waitForMemoryRelease(): Promise<void> {
// Force garbage collection if available
if (global.gc) {
global.gc();
}

// Wait for memory to be released
while (this.currentMemoryUsage > this.maxMemoryUsage * 0.8) {
await this.sleep(100);
this.updateMemoryUsage();
}
}

private updateMemoryUsage(): void {
const usage = process.memoryUsage();
this.currentMemoryUsage = usage.heapUsed;
}
}

Monitoring and Troubleshooting

Sync Monitoring

class SyncMonitor {
private metrics: MetricsCollector;
private alertManager: AlertManager;

constructor() {
this.metrics = new MetricsCollector();
this.alertManager = new AlertManager();
}

async monitorSyncHealth(configName: string): Promise<SyncHealthReport> {
const metrics = await this.collectSyncMetrics(configName);
const health = this.assessHealth(metrics);

if (health.status !== 'healthy') {
await this.alertManager.sendAlert({
type: 'sync_health_issue',
configName,
status: health.status,
issues: health.issues,
});
}

return health;
}

private async collectSyncMetrics(configName: string): Promise<SyncMetrics> {
const now = Date.now();
const oneHourAgo = now - 60 * 60 * 1000;

return {
successRate: await this.metrics.getSuccessRate(
configName,
oneHourAgo,
now
),
averageLatency: await this.metrics.getAverageLatency(
configName,
oneHourAgo,
now
),
errorRate: await this.metrics.getErrorRate(configName, oneHourAgo, now),
throughput: await this.metrics.getThroughput(configName, oneHourAgo, now),
lastSyncTime: await this.metrics.getLastSyncTime(configName),
};
}

private assessHealth(metrics: SyncMetrics): SyncHealthReport {
const issues: string[] = [];
let status: 'healthy' | 'warning' | 'critical' = 'healthy';

// Check success rate
if (metrics.successRate < 0.95) {
issues.push(
`Low success rate: ${(metrics.successRate * 100).toFixed(1)}%`
);
status = metrics.successRate < 0.8 ? 'critical' : 'warning';
}

// Check latency
if (metrics.averageLatency > 5000) {
issues.push(`High latency: ${metrics.averageLatency}ms`);
status = metrics.averageLatency > 10000 ? 'critical' : 'warning';
}

// Check if sync is stale
const stalenessThreshold = 30 * 60 * 1000; // 30 minutes
if (Date.now() - metrics.lastSyncTime > stalenessThreshold) {
issues.push('Sync appears to be stale');
status = 'critical';
}

return {
status,
issues,
metrics,
timestamp: new Date(),
};
}
}

Error Handling and Recovery

class SyncErrorHandler {
private deadLetterQueue: DeadLetterQueue;
private retryScheduler: RetryScheduler;

constructor() {
this.deadLetterQueue = new DeadLetterQueue();
this.retryScheduler = new RetryScheduler();
}

async handleSyncError(
error: SyncError,
data: any,
config: SyncConfig
): Promise<void> {
// Log error details
console.error('Sync error:', {
configName: config.name,
error: error.message,
dataId: data.id,
timestamp: new Date(),
});

// Determine if error is retryable
if (this.isRetryableError(error)) {
await this.scheduleRetry(data, config, error.retryCount || 0);
} else {
await this.sendToDeadLetterQueue(data, config, error);
}
}

private isRetryableError(error: SyncError): boolean {
const retryableErrors = [
'NETWORK_ERROR',
'TIMEOUT_ERROR',
'RATE_LIMIT_ERROR',
'TEMPORARY_SERVER_ERROR',
];

return retryableErrors.includes(error.type);
}

private async scheduleRetry(
data: any,
config: SyncConfig,
retryCount: number
): Promise<void> {
const maxRetries = config.sync.retryPolicy.maxRetries;

if (retryCount >= maxRetries) {
await this.sendToDeadLetterQueue(
data,
config,
new Error('Max retries exceeded')
);
return;
}

const delay = this.calculateRetryDelay(retryCount, config.sync.retryPolicy);

await this.retryScheduler.schedule({
data,
config,
retryCount: retryCount + 1,
scheduledTime: Date.now() + delay,
});
}

private async sendToDeadLetterQueue(
data: any,
config: SyncConfig,
error: Error
): Promise<void> {
await this.deadLetterQueue.add({
data,
configName: config.name,
error: error.message,
timestamp: new Date(),
requiresManualIntervention: true,
});
}
}

Proper sync configuration ensures reliable data consistency between Supernal Coding and your existing systems while maintaining performance and data integrity.