Data Synchronization Guide¶
This guide covers how to efficiently synchronize data from the WFRMLS API, including incremental updates, change tracking, and maintaining local data consistency.
Overview¶
Data synchronization is crucial for applications that need to maintain up-to-date real estate information. The WFRMLS API provides several mechanisms for efficient data synchronization:
- Timestamp-based incremental updates - Track changes since last sync
- Change tracking - Monitor specific field modifications
- Deleted record tracking - Handle removed listings
- Batch processing - Efficiently process large datasets
Synchronization Strategies¶
Full Synchronization¶
Complete data refresh - useful for initial setup or periodic validation:
from wfrmls import WFRMLSClient
from datetime import datetime
import json
client = WFRMLSClient()
def full_sync_properties():
"""Perform a complete synchronization of all active properties."""
print("Starting full property synchronization...")
start_time = datetime.now()
all_properties = []
skip = 0
batch_size = 100
while True:
# Get batch of properties
batch = client.property.get_properties(
filter_query="StandardStatus eq 'Active'",
select=[
"ListingId", "ListPrice", "StandardStatus", "ModificationTimestamp",
"PropertyAddress", "PropertyCity", "BedroomsTotal", "BathroomsTotalInteger",
"LivingArea", "ListAgentKey", "OfficeKey"
],
orderby="ModificationTimestamp asc",
top=batch_size,
skip=skip
)
if not batch:
break
all_properties.extend(batch)
skip += batch_size
print(f"Processed {len(all_properties)} properties...")
# Optional: Save progress periodically
if len(all_properties) % 1000 == 0:
save_sync_progress(all_properties, "full_sync_progress.json")
end_time = datetime.now()
duration = end_time - start_time
print(f"Full sync completed: {len(all_properties)} properties in {duration}")
# Save final results
save_sync_data(all_properties, "full_sync_properties.json")
update_sync_metadata("properties", "full", end_time, len(all_properties))
return all_properties
def save_sync_data(data, filename):
"""Save synchronization data to file."""
with open(filename, 'w') as f:
json.dump(data, f, indent=2, default=str)
def save_sync_progress(data, filename):
"""Save synchronization progress."""
progress = {
"timestamp": datetime.now().isoformat(),
"count": len(data),
"data": data
}
with open(filename, 'w') as f:
json.dump(progress, f, indent=2, default=str)
def update_sync_metadata(resource_type, sync_type, timestamp, count):
"""Update synchronization metadata."""
metadata = {
"resource_type": resource_type,
"sync_type": sync_type,
"last_sync": timestamp.isoformat(),
"record_count": count
}
with open(f"sync_metadata_{resource_type}.json", 'w') as f:
json.dump(metadata, f, indent=2)
Incremental Synchronization¶
Update only changed records since last sync:
def incremental_sync_properties(last_sync_time=None):
"""Perform incremental synchronization of properties."""
if last_sync_time is None:
last_sync_time = get_last_sync_time("properties")
if last_sync_time is None:
print("No previous sync found, performing full sync...")
return full_sync_properties()
print(f"Starting incremental sync since {last_sync_time}")
start_time = datetime.now()
# Get properties modified since last sync
modified_properties = client.property.get_properties(
filter_query=f"ModificationTimestamp gt {last_sync_time.isoformat()}Z",
select=[
"ListingId", "ListPrice", "StandardStatus", "ModificationTimestamp",
"PropertyAddress", "PropertyCity", "BedroomsTotal", "BathroomsTotalInteger",
"LivingArea", "ListAgentKey", "OfficeKey"
],
orderby="ModificationTimestamp asc"
)
# Process changes
changes = process_property_changes(modified_properties, last_sync_time)
end_time = datetime.now()
duration = end_time - start_time
print(f"Incremental sync completed: {len(modified_properties)} changes in {duration}")
# Save results
save_sync_data(changes, f"incremental_sync_{start_time.strftime('%Y%m%d_%H%M%S')}.json")
update_sync_metadata("properties", "incremental", end_time, len(modified_properties))
return changes
def get_last_sync_time(resource_type):
"""Get the timestamp of the last synchronization."""
try:
with open(f"sync_metadata_{resource_type}.json", 'r') as f:
metadata = json.load(f)
return datetime.fromisoformat(metadata["last_sync"])
except FileNotFoundError:
return None
def process_property_changes(properties, last_sync_time):
"""Process and categorize property changes."""
changes = {
"new_listings": [],
"updated_listings": [],
"status_changes": [],
"price_changes": []
}
# Load existing data for comparison
existing_properties = load_existing_properties()
for prop in properties:
listing_id = prop["ListingId"]
if listing_id not in existing_properties:
# New listing
changes["new_listings"].append(prop)
else:
# Updated listing
existing_prop = existing_properties[listing_id]
# Check for status changes
if prop["StandardStatus"] != existing_prop.get("StandardStatus"):
changes["status_changes"].append({
"listing_id": listing_id,
"old_status": existing_prop.get("StandardStatus"),
"new_status": prop["StandardStatus"],
"property": prop
})
# Check for price changes
if prop["ListPrice"] != existing_prop.get("ListPrice"):
changes["price_changes"].append({
"listing_id": listing_id,
"old_price": existing_prop.get("ListPrice"),
"new_price": prop["ListPrice"],
"property": prop
})
changes["updated_listings"].append(prop)
return changes
def load_existing_properties():
"""Load existing property data for comparison."""
try:
with open("full_sync_properties.json", 'r') as f:
properties = json.load(f)
return {prop["ListingId"]: prop for prop in properties}
except FileNotFoundError:
return {}
Change Tracking¶
Field-Level Change Detection¶
Track specific field changes for detailed monitoring:
class PropertyChangeTracker:
def __init__(self, client):
self.client = client
self.tracked_fields = [
"ListPrice", "StandardStatus", "DaysOnMarket", "ListAgentKey",
"PropertyAddress", "BedroomsTotal", "BathroomsTotalInteger"
]
def track_changes(self, listing_ids, since_timestamp):
"""Track changes for specific properties."""
changes_by_property = {}
for listing_id in listing_ids:
try:
# Get current property data. get_property() returns a single
# property dictionary with top-level fields like ParcelNumber.
current_property = self.client.property.get_property(listing_id)
# Load previous data
previous_property = self.load_previous_property_data(listing_id)
if previous_property:
changes = self.detect_field_changes(
previous_property, current_property
)
if changes:
changes_by_property[listing_id] = {
"changes": changes,
"current_data": current_property,
"previous_data": previous_property
}
# Save current data for future comparisons
self.save_property_data(listing_id, current_property)
except Exception as e:
print(f"Error tracking changes for {listing_id}: {e}")
return changes_by_property
def detect_field_changes(self, previous, current):
"""Detect changes between previous and current property data."""
changes = []
for field in self.tracked_fields:
prev_value = previous.get(field)
curr_value = current.get(field)
if prev_value != curr_value:
changes.append({
"field": field,
"previous_value": prev_value,
"current_value": curr_value,
"change_timestamp": current.get("ModificationTimestamp")
})
return changes
def load_previous_property_data(self, listing_id):
"""Load previous property data for comparison."""
try:
with open(f"property_data/{listing_id}.json", 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
def save_property_data(self, listing_id, property_data):
"""Save property data for future comparisons."""
import os
os.makedirs("property_data", exist_ok=True)
with open(f"property_data/{listing_id}.json", 'w') as f:
json.dump(property_data, f, indent=2, default=str)
# Usage
tracker = PropertyChangeTracker(client)
# Track changes for specific properties
listing_ids = ["12345678", "87654321", "11223344"]
since_time = datetime.now() - timedelta(hours=24)
changes = tracker.track_changes(listing_ids, since_time)
for listing_id, change_data in changes.items():
print(f"\nChanges for {listing_id}:")
for change in change_data["changes"]:
print(f" {change['field']}: {change['previous_value']} → {change['current_value']}")
Status Change Monitoring¶
Monitor specific status transitions:
def monitor_status_changes():
"""Monitor property status changes."""
# Define status transitions to monitor
monitored_transitions = [
("Active", "Pending"),
("Pending", "Closed"),
("Active", "Withdrawn"),
("Active", "Expired")
]
last_check = get_last_sync_time("status_monitoring")
if not last_check:
last_check = datetime.now() - timedelta(hours=1)
# Get recently modified properties
recent_properties = client.property.get_properties(
filter_query=f"ModificationTimestamp gt {last_check.isoformat()}Z",
select=["ListingId", "StandardStatus", "ModificationTimestamp", "PropertyAddress"],
orderby="ModificationTimestamp asc"
)
status_changes = []
for prop in recent_properties:
listing_id = prop["ListingId"]
current_status = prop["StandardStatus"]
# Get previous status
previous_data = load_previous_property_data(listing_id)
if previous_data:
previous_status = previous_data.get("StandardStatus")
# Check if this is a monitored transition
transition = (previous_status, current_status)
if transition in monitored_transitions:
status_changes.append({
"listing_id": listing_id,
"address": prop["PropertyAddress"],
"previous_status": previous_status,
"current_status": current_status,
"change_timestamp": prop["ModificationTimestamp"],
"transition": f"{previous_status} → {current_status}"
})
# Save status changes
if status_changes:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_sync_data(status_changes, f"status_changes_{timestamp}.json")
print(f"Detected {len(status_changes)} status changes:")
for change in status_changes:
print(f" {change['listing_id']}: {change['transition']}")
update_sync_metadata("status_monitoring", "incremental", datetime.now(), len(status_changes))
return status_changes
Deleted Record Handling¶
Tracking Deleted Listings¶
Handle properties that have been removed from the MLS:
def sync_deleted_properties():
"""Synchronize deleted property records."""
last_sync = get_last_sync_time("deleted_properties")
if not last_sync:
last_sync = datetime.now() - timedelta(days=7)
# Get deleted properties since last sync
deleted_properties = client.deleted.get_deleted_properties(
filter_query=f"DeletedDate gt {last_sync.isoformat()}Z",
select=["ListingId", "DeletedDate", "DeletedReason"],
orderby="DeletedDate asc"
)
print(f"Found {len(deleted_properties)} deleted properties since {last_sync}")
# Process deletions
deletion_summary = process_deletions(deleted_properties)
# Update sync metadata
update_sync_metadata("deleted_properties", "incremental", datetime.now(), len(deleted_properties))
return deletion_summary
def process_deletions(deleted_properties):
"""Process deleted property records."""
deletion_summary = {
"total_deleted": len(deleted_properties),
"by_reason": {},
"processed_deletions": []
}
for deleted_prop in deleted_properties:
listing_id = deleted_prop["ListingId"]
reason = deleted_prop.get("DeletedReason", "Unknown")
# Count by reason
deletion_summary["by_reason"][reason] = deletion_summary["by_reason"].get(reason, 0) + 1
# Remove from local storage
remove_local_property_data(listing_id)
deletion_summary["processed_deletions"].append({
"listing_id": listing_id,
"deleted_date": deleted_prop["DeletedDate"],
"reason": reason
})
print(f"Processed deletion: {listing_id} ({reason})")
# Save deletion summary
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_sync_data(deletion_summary, f"deletions_{timestamp}.json")
return deletion_summary
def remove_local_property_data(listing_id):
"""Remove local property data files."""
import os
files_to_remove = [
f"property_data/{listing_id}.json",
f"property_images/{listing_id}/",
f"property_cache/{listing_id}.json"
]
for file_path in files_to_remove:
try:
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
import shutil
shutil.rmtree(file_path)
except Exception as e:
print(f"Error removing {file_path}: {e}")
Batch Processing¶
Efficient Bulk Operations¶
Process large datasets efficiently:
class BatchProcessor:
def __init__(self, client, batch_size=100):
self.client = client
self.batch_size = batch_size
def process_all_properties(self, processor_func, filter_query="StandardStatus eq 'Active'"):
"""Process all properties in batches."""
total_processed = 0
skip = 0
while True:
# Get batch
batch = self.client.property.get_properties(
filter_query=filter_query,
top=self.batch_size,
skip=skip,
orderby="ModificationTimestamp asc"
)
if not batch:
break
# Process batch
try:
processor_func(batch)
total_processed += len(batch)
skip += self.batch_size
print(f"Processed batch: {len(batch)} properties (total: {total_processed})")
except Exception as e:
print(f"Error processing batch at skip {skip}: {e}")
# Continue with next batch
skip += self.batch_size
return total_processed
def sync_property_images(self, properties):
"""Sync property images for a batch of properties."""
for prop in properties:
listing_id = prop["ListingId"]
try:
# Get property media
media = self.client.media.get_media(
filter_query=f"ResourceRecordKey eq '{listing_id}'",
select=["MediaKey", "MediaURL", "MediaType", "MediaModificationTimestamp"]
)
# Process media files
self.process_property_media(listing_id, media)
except Exception as e:
print(f"Error syncing images for {listing_id}: {e}")
def process_property_media(self, listing_id, media):
"""Process media files for a property."""
import os
import requests
media_dir = f"property_images/{listing_id}"
os.makedirs(media_dir, exist_ok=True)
for media_item in media:
if media_item["MediaType"] == "Photo":
media_url = media_item["MediaURL"]
media_key = media_item["MediaKey"]
# Download image if not already cached
image_path = f"{media_dir}/{media_key}.jpg"
if not os.path.exists(image_path):
try:
response = requests.get(media_url)
response.raise_for_status()
with open(image_path, 'wb') as f:
f.write(response.content)
print(f"Downloaded image: {media_key}")
except Exception as e:
print(f"Error downloading image {media_key}: {e}")
# Usage
processor = BatchProcessor(client, batch_size=50)
# Process all active properties to sync images
def sync_images_batch(properties):
processor.sync_property_images(properties)
total_processed = processor.process_all_properties(sync_images_batch)
print(f"Completed image sync for {total_processed} properties")
Parallel Processing¶
Use threading for improved performance:
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
class ParallelSyncProcessor:
def __init__(self, client, max_workers=5):
self.client = client
self.max_workers = max_workers
self.results_queue = Queue()
def parallel_property_sync(self, listing_ids):
"""Sync properties in parallel."""
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_listing = {
executor.submit(self.sync_single_property, listing_id): listing_id
for listing_id in listing_ids
}
# Collect results
for future in as_completed(future_to_listing):
listing_id = future_to_listing[future]
try:
result = future.result()
results.append(result)
print(f"Completed sync for {listing_id}")
except Exception as e:
print(f"Error syncing {listing_id}: {e}")
return results
def sync_single_property(self, listing_id):
"""Sync a single property with full details."""
try:
# Get property details
property_data = self.client.property.get_property(listing_id)
# Get property media
media = self.client.media.get_media(
filter_query=f"ResourceRecordKey eq '{listing_id}'"
)
# Get open houses
open_houses = self.client.openhouse.get_openhouses(
filter_query=f"PropertyKey eq '{listing_id}'"
)
# Combine all data
complete_data = {
"property": property_data,
"media": media,
"open_houses": open_houses,
"sync_timestamp": datetime.now().isoformat()
}
# Save to file
self.save_complete_property_data(listing_id, complete_data)
return {
"listing_id": listing_id,
"status": "success",
"media_count": len(media),
"open_house_count": len(open_houses)
}
except Exception as e:
return {
"listing_id": listing_id,
"status": "error",
"error": str(e)
}
def save_complete_property_data(self, listing_id, data):
"""Save complete property data."""
import os
os.makedirs("complete_property_data", exist_ok=True)
with open(f"complete_property_data/{listing_id}.json", 'w') as f:
json.dump(data, f, indent=2, default=str)
# Usage
parallel_processor = ParallelSyncProcessor(client, max_workers=3)
# Get list of properties to sync
recent_properties = client.property.get_properties(
filter_query="ModificationTimestamp gt 2024-01-01T00:00:00Z",
select=["ListingId"],
top=100
)
listing_ids = [prop["ListingId"] for prop in recent_properties]
# Sync in parallel
results = parallel_processor.parallel_property_sync(listing_ids)
# Analyze results
successful = [r for r in results if r["status"] == "success"]
failed = [r for r in results if r["status"] == "error"]
print(f"Parallel sync completed: {len(successful)} successful, {len(failed)} failed")
Synchronization Scheduling¶
Automated Sync Jobs¶
Set up automated synchronization schedules:
import schedule
import time
from datetime import datetime, timedelta
class SyncScheduler:
def __init__(self, client):
self.client = client
self.setup_schedules()
def setup_schedules(self):
"""Set up synchronization schedules."""
# Full sync weekly on Sunday at 2 AM
schedule.every().sunday.at("02:00").do(self.run_full_sync)
# Incremental sync every hour
schedule.every().hour.do(self.run_incremental_sync)
# Status monitoring every 15 minutes
schedule.every(15).minutes.do(self.run_status_monitoring)
# Deleted records sync daily at 3 AM
schedule.every().day.at("03:00").do(self.run_deleted_sync)
def run_full_sync(self):
"""Run full synchronization."""
try:
print(f"Starting full sync at {datetime.now()}")
result = full_sync_properties()
print(f"Full sync completed: {len(result)} properties")
except Exception as e:
print(f"Full sync failed: {e}")
def run_incremental_sync(self):
"""Run incremental synchronization."""
try:
print(f"Starting incremental sync at {datetime.now()}")
result = incremental_sync_properties()
print(f"Incremental sync completed: {len(result.get('updated_listings', []))} updates")
except Exception as e:
print(f"Incremental sync failed: {e}")
def run_status_monitoring(self):
"""Run status change monitoring."""
try:
result = monitor_status_changes()
if result:
print(f"Status monitoring: {len(result)} changes detected")
except Exception as e:
print(f"Status monitoring failed: {e}")
def run_deleted_sync(self):
"""Run deleted records synchronization."""
try:
result = sync_deleted_properties()
print(f"Deleted sync completed: {result['total_deleted']} deletions")
except Exception as e:
print(f"Deleted sync failed: {e}")
def start_scheduler(self):
"""Start the synchronization scheduler."""
print("Starting sync scheduler...")
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
# Usage
scheduler = SyncScheduler(client)
# Run scheduler (this will run indefinitely)
# scheduler.start_scheduler()
# Or run specific sync operations manually
scheduler.run_incremental_sync()
Error Handling and Recovery¶
Robust Sync Operations¶
Handle errors gracefully and provide recovery mechanisms:
class RobustSyncManager:
def __init__(self, client):
self.client = client
self.max_retries = 3
self.retry_delay = 5 # seconds
def robust_sync_with_retry(self, sync_function, *args, **kwargs):
"""Execute sync function with retry logic."""
for attempt in range(self.max_retries):
try:
result = sync_function(*args, **kwargs)
return result
except Exception as e:
print(f"Sync attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
print(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
else:
print("All retry attempts failed")
raise e
def sync_with_checkpoint(self, sync_function, checkpoint_interval=100):
"""Sync with periodic checkpoints for recovery."""
checkpoint_file = "sync_checkpoint.json"
# Load previous checkpoint
checkpoint = self.load_checkpoint(checkpoint_file)
start_skip = checkpoint.get("last_processed", 0)
skip = start_skip
batch_size = 100
try:
while True:
# Get batch
batch = self.client.property.get_properties(
filter_query="StandardStatus eq 'Active'",
top=batch_size,
skip=skip,
orderby="ModificationTimestamp asc"
)
if not batch:
break
# Process batch
sync_function(batch)
skip += len(batch)
# Save checkpoint
if skip % checkpoint_interval == 0:
self.save_checkpoint(checkpoint_file, {
"last_processed": skip,
"timestamp": datetime.now().isoformat()
})
print(f"Checkpoint saved at {skip} records")
except Exception as e:
print(f"Sync failed at record {skip}: {e}")
self.save_checkpoint(checkpoint_file, {
"last_processed": skip,
"timestamp": datetime.now().isoformat(),
"error": str(e)
})
raise e
finally:
# Clean up checkpoint on successful completion
if skip > start_skip:
self.clear_checkpoint(checkpoint_file)
def load_checkpoint(self, filename):
"""Load sync checkpoint."""
try:
with open(filename, 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
def save_checkpoint(self, filename, checkpoint_data):
"""Save sync checkpoint."""
with open(filename, 'w') as f:
json.dump(checkpoint_data, f, indent=2)
def clear_checkpoint(self, filename):
"""Clear sync checkpoint."""
import os
try:
os.remove(filename)
except FileNotFoundError:
pass
# Usage
robust_sync = RobustSyncManager(client)
# Sync with retry logic
def my_sync_function():
return incremental_sync_properties()
result = robust_sync.robust_sync_with_retry(my_sync_function)
# Sync with checkpoints
def process_batch(properties):
for prop in properties:
# Process individual property
print(f"Processing {prop['ListingId']}")
robust_sync.sync_with_checkpoint(process_batch)
Performance Monitoring¶
Sync Performance Metrics¶
Monitor synchronization performance:
class SyncPerformanceMonitor:
def __init__(self):
self.metrics = {}
def time_sync_operation(self, operation_name, sync_function, *args, **kwargs):
"""Time a sync operation and collect metrics."""
start_time = datetime.now()
start_memory = self.get_memory_usage()
try:
result = sync_function(*args, **kwargs)
end_time = datetime.now()
end_memory = self.get_memory_usage()
duration = (end_time - start_time).total_seconds()
memory_delta = end_memory - start_memory
# Record metrics
self.metrics[operation_name] = {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_seconds": duration,
"memory_usage_mb": memory_delta,
"records_processed": len(result) if isinstance(result, list) else 1,
"records_per_second": len(result) / duration if isinstance(result, list) and duration > 0 else 0,
"status": "success"
}
print(f"{operation_name} completed in {duration:.2f}s ({len(result) if isinstance(result, list) else 1} records)")
return result
except Exception as e:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
self.metrics[operation_name] = {
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_seconds": duration,
"status": "error",
"error": str(e)
}
raise e
def get_memory_usage(self):
"""Get current memory usage in MB."""
import psutil
import os
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
def generate_performance_report(self):
"""Generate performance report."""
report = {
"report_timestamp": datetime.now().isoformat(),
"operations": self.metrics,
"summary": {
"total_operations": len(self.metrics),
"successful_operations": len([m for m in self.metrics.values() if m["status"] == "success"]),
"failed_operations": len([m for m in self.metrics.values() if m["status"] == "error"]),
"total_duration": sum(m["duration_seconds"] for m in self.metrics.values()),
"average_duration": sum(m["duration_seconds"] for m in self.metrics.values()) / len(self.metrics) if self.metrics else 0
}
}
# Save report
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"sync_performance_report_{timestamp}.json", 'w') as f:
json.dump(report, f, indent=2)
return report
# Usage
monitor = SyncPerformanceMonitor()
# Monitor sync operations
properties = monitor.time_sync_operation("incremental_sync", incremental_sync_properties)
deleted = monitor.time_sync_operation("deleted_sync", sync_deleted_properties)
# Generate performance report
report = monitor.generate_performance_report()
print(f"Performance report generated: {report['summary']}")
Related Resources¶
- Property API - Property data access methods
- Analytics API - Market data synchronization
- Error Handling Guide - Robust error management
- Rate Limits Guide - API usage optimization