Schedule-Based Sync Integrations
New to building integrations? Start with our step-by-step contributor guide for a quick overview.
When to Use Schedule-Based Sync
Choose schedule-based integrations when:- The service doesn’t provide webhook events
- You want to batch process multiple API calls
- The service has rate limits that require careful timing
- Real-time sync isn’t critical (5-15 minute delays acceptable)
- You need to poll multiple endpoints and aggregate data
Architecture Overview
Cron Schedule (every 5-15 min)
↓
SYNC Event Triggered
↓
Fetch New Activities
↓
Compare with Last Sync Time
↓
Create Activity Messages
↓
Update State with New Sync Time
↓
Knowledge Graph
Key Concepts
State Management
Schedule-based integrations maintain state between runs:interface GitHubSettings {
lastSyncTime?: string; // Last time we synced notifications
lastUserEventTime?: string; // Last time we synced user events
username?: string; // GitHub username
}
state message:
return [
// ... activity messages
{
type: 'state',
data: {
lastSyncTime: new Date().toISOString(),
lastUserEventTime: new Date().toISOString(),
username: 'octocat'
}
}
];
Pagination
Handle large datasets by paginating through results:let page = 1;
let hasMorePages = true;
while (hasMorePages) {
const data = await fetchData(page);
if (!data || data.length === 0) {
hasMorePages = false;
break;
}
// Process data
for (const item of data) {
// Create activities
}
// Check if more pages exist
if (data.length < perPage) {
hasMorePages = false;
} else {
page++;
}
}
Incremental Sync
Only fetch new data since last sync:const lastSyncTime = state.lastSyncTime || getDefaultSyncTime(); // 24 hours ago
// Use lastSyncTime to filter API requests
const notifications = await fetch(
`https://api.github.com/notifications?since=${lastSyncTime}`
);
Full Example: GitHub Integration
spec.json
{
"name": "GitHub extension",
"key": "github",
"description": "Plan, track, and manage your projects in GitHub",
"icon": "github",
"schedule": {
"frequency": "*/5 * * * *" // Run every 5 minutes
},
"auth": {
"OAuth2": {
"token_url": "https://github.com/login/oauth/access_token",
"authorization_url": "https://github.com/login/oauth/authorize",
"scopes": [
"user",
"public_repo",
"repo",
"notifications",
"read:org"
],
"scope_separator": ","
}
},
"mcp": {
"type": "http",
"url": "https://api.githubcopilot.com/mcp/",
"headers": {
"Authorization": "Bearer ${config:access_token}",
"Content-Type": "application/json"
}
}
}
index.ts
import { handleSchedule } from './schedule';
import { integrationCreate } from './account-create';
import {
IntegrationCLI,
IntegrationEventPayload,
IntegrationEventType,
Spec,
} from '@redplanethq/sdk';
export async function run(eventPayload: IntegrationEventPayload) {
switch (eventPayload.event) {
case IntegrationEventType.SETUP:
return await integrationCreate(eventPayload.eventBody);
case IntegrationEventType.SYNC:
// Handle scheduled sync
return await handleSchedule(eventPayload.config, eventPayload.state);
default:
return { message: `Unknown event type: ${eventPayload.event}` };
}
}
class GitHubCLI extends IntegrationCLI {
constructor() {
super('github', '1.0.0');
}
protected async handleEvent(eventPayload: IntegrationEventPayload): Promise<any> {
return await run(eventPayload);
}
protected async getSpec(): Promise<Spec> {
return {
name: 'GitHub extension',
key: 'github',
// ... rest of spec
};
}
}
function main() {
const githubCLI = new GitHubCLI();
githubCLI.parse();
}
main();
schedule.ts
import { getUserEvents, getGithubData } from './utils';
interface GitHubActivityCreateParams {
text: string;
sourceURL: string;
}
interface GitHubSettings {
lastSyncTime?: string;
lastUserEventTime?: string;
username?: string;
}
function createActivityMessage(params: GitHubActivityCreateParams) {
return {
type: 'activity',
data: {
text: params.text,
sourceURL: params.sourceURL,
},
};
}
function getDefaultSyncTime(): string {
// Default to 24 hours ago
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}
async function processNotifications(
accessToken: string,
lastSyncTime: string,
username: string
): Promise<any[]> {
const activities = [];
// Filter for relevant notification reasons
const allowedReasons = [
'assign',
'review_requested',
'mention',
'state_change',
'author',
'comment',
'team_mention',
];
let page = 1;
let hasMorePages = true;
while (hasMorePages) {
try {
// Fetch notifications with pagination
const notifications = await getGithubData(
`https://api.github.com/notifications?page=${page}&per_page=50&all=true&since=${lastSyncTime}`,
accessToken
);
if (!notifications || notifications.length === 0) {
hasMorePages = false;
break;
}
// Check if more pages exist
if (notifications.length < 50) {
hasMorePages = false;
} else {
page++;
}
// Process each notification
for (const notification of notifications) {
try {
// Filter by allowed reasons
if (!allowedReasons.includes(notification.reason)) {
continue;
}
const repository = notification.repository;
const subject = notification.subject;
// Fetch detailed data for the notification
let githubData: any = {};
if (subject.url) {
try {
githubData = await getGithubData(subject.url, accessToken);
} catch (error) {
continue; // Skip if we can't fetch details
}
}
const url = githubData.html_url || notification.subject.url || '';
const isIssue = subject.type === 'Issue';
const isPR = subject.type === 'PullRequest';
let title = '';
// Build contextual activity based on notification reason
switch (notification.reason) {
case 'assign':
title = `${isIssue ? 'Issue' : 'PR'} #${githubData.number} assigned to ${username} in ${repository.full_name}: ${githubData.title}`;
break;
case 'review_requested':
title = `${username} requested to review PR #${githubData.number} in ${repository.full_name}: ${githubData.title}`;
break;
case 'mention':
title = `${githubData.user?.login} mentioned ${username} in ${repository.full_name} ${isIssue ? 'issue' : 'PR'} #${githubData.number}: ${githubData.body}`;
break;
case 'comment':
title = `${githubData.user?.login} commented on ${isIssue ? 'issue' : 'PR'} #${githubData.number} in ${repository.full_name}: ${githubData.body}`;
break;
case 'state_change':
const stateInfo = githubData.state ? `to ${githubData.state}` : '';
title = `${githubData.user?.login} changed ${isIssue ? 'issue' : 'PR'} #${githubData.number} state ${stateInfo} in ${repository.full_name}: ${githubData.title}`;
break;
default:
title = `GitHub notification for ${username} in ${repository.full_name}`;
break;
}
if (title && url) {
activities.push(createActivityMessage({
text: title,
sourceURL: url,
}));
}
} catch (error) {
// Silently skip errors for individual notifications
continue;
}
}
} catch (error) {
hasMorePages = false;
}
}
return activities;
}
async function processUserEvents(
username: string,
accessToken: string,
lastUserEventTime: string
): Promise<any[]> {
const activities = [];
let page = 1;
let hasMorePages = true;
while (hasMorePages) {
try {
// Fetch user events (PRs, issues they created)
const userEvents = await getUserEvents(username, page, accessToken, lastUserEventTime);
if (!userEvents || userEvents.length === 0) {
hasMorePages = false;
break;
}
if (userEvents.length < 30) {
hasMorePages = false;
} else {
page++;
}
for (const event of userEvents) {
try {
let title = '';
const sourceURL = event.html_url || '';
switch (event.type) {
case 'pr':
title = `${username} created PR #${event.number}: ${event.title}`;
break;
case 'issue':
title = `${username} created issue #${event.number}: ${event.title}`;
break;
case 'pr_comment':
title = `${username} commented on PR #${event.number}: ${event.title}`;
break;
case 'issue_comment':
title = `${username} commented on issue #${event.number}: ${event.title}`;
break;
default:
title = `GitHub activity: ${event.title || 'Unknown'}`;
break;
}
if (title && sourceURL) {
activities.push(createActivityMessage({
text: title,
sourceURL: sourceURL,
}));
}
} catch (error) {
continue;
}
}
} catch (error) {
hasMorePages = false;
}
}
return activities;
}
export async function handleSchedule(config: any, state: any) {
try {
const integrationConfiguration = config;
// Check for valid access token
if (!integrationConfiguration?.access_token) {
return [];
}
// Get settings or initialize
let settings = (state || {}) as GitHubSettings;
// Default to 24 hours ago if no last sync times
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
const lastUserEventTime = settings.lastUserEventTime || getDefaultSyncTime();
// Fetch user info to get username
let user;
try {
user = await getGithubData('https://api.github.com/user', integrationConfiguration.access_token);
} catch (error) {
return [];
}
if (!user) {
return [];
}
// Update username in settings
if (!settings.username && user.login) {
settings.username = user.login;
}
// Collect all messages
const messages = [];
// Process notifications
try {
const notificationActivities = await processNotifications(
integrationConfiguration.access_token,
lastSyncTime,
settings.username || 'user'
);
messages.push(...notificationActivities);
} catch (error) {
// Continue even if notifications fail
}
// Process user events
if (settings.username) {
try {
const userEventActivities = await processUserEvents(
settings.username,
integrationConfiguration.access_token,
lastUserEventTime
);
messages.push(...userEventActivities);
} catch (error) {
// Continue even if user events fail
}
}
// Update last sync times
const newSyncTime = new Date().toISOString();
// Add state message to persist settings
messages.push({
type: 'state',
data: {
...settings,
lastSyncTime: newSyncTime,
lastUserEventTime: newSyncTime,
},
});
return messages;
} catch (error) {
return [];
}
}
Best Practices
1. State Management
Always persist state to track sync progress:// Load state at start
let settings = (state || {}) as Settings;
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
// ... fetch and process data
// Save state at end
messages.push({
type: 'state',
data: {
...settings,
lastSyncTime: new Date().toISOString(),
}
});
2. Default Sync Window
On first sync, fetch last 24 hours of data:function getDefaultSyncTime(): string {
return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
3. Efficient Pagination
Stop paginating when no more data:while (hasMorePages) {
const data = await fetchData(page, perPage);
// No data left
if (!data || data.length === 0) {
hasMorePages = false;
break;
}
// Process data...
// Less than full page means no more data
if (data.length < perPage) {
hasMorePages = false;
} else {
page++;
}
}
4. Error Resilience
Continue processing even if individual items fail:for (const item of items) {
try {
// Process item
const activity = await processItem(item);
activities.push(activity);
} catch (error) {
// Log but continue
console.error('Failed to process item:', error);
continue; // Don't let one failure stop everything
}
}
5. Rate Limit Handling
Respect API rate limits:async function fetchWithRetry(url: string, accessToken: string, retries = 3) {
for (let i = 0; i < retries; i++) {
try {
const response = await fetch(url, {
headers: { Authorization: `Bearer ${accessToken}` }
});
if (response.status === 429) {
// Rate limited - wait and retry
const retryAfter = response.headers.get('Retry-After') || 60;
await sleep(parseInt(retryAfter) * 1000);
continue;
}
return await response.json();
} catch (error) {
if (i === retries - 1) throw error;
await sleep(1000 * (i + 1)); // Exponential backoff
}
}
}
6. Multiple Data Sources
Fetch from multiple endpoints and aggregate:const messages = [];
// Fetch from multiple sources in parallel
const [notifications, pullRequests, issues] = await Promise.all([
processNotifications(accessToken, lastSyncTime),
processPullRequests(accessToken, lastSyncTime),
processIssues(accessToken, lastSyncTime)
]);
// Aggregate all activities
messages.push(...notifications, ...pullRequests, ...issues);
// Add state
messages.push({ type: 'state', data: newState });
return messages;
7. Schedule Configuration
Choose appropriate sync frequency:{
"schedule": {
"frequency": "*/5 * * * *" // Every 5 minutes - real-time feel
"frequency": "*/15 * * * *" // Every 15 minutes - balance
"frequency": "0 * * * *" // Every hour - low volume
}
}
Handling Account Setup
Set initial sync schedule during OAuth:// account-create.ts
export async function integrationCreate(data: any) {
const { oauthResponse } = data;
const user = await getGithubData(
'https://api.github.com/user',
oauthResponse.access_token
);
return [{
type: 'account',
data: {
accountId: user.id.toString(),
config: {
access_token: oauthResponse.access_token,
refresh_token: oauthResponse.refresh_token,
mcp: { tokens: { access_token: oauthResponse.access_token } }
},
settings: {
username: user.login,
schedule: {
frequency: '*/15 * * * *' // Can customize per user
}
}
}
}];
}
Testing Schedule-Based Integrations
- Manual Trigger: Call SYNC event handler directly with test state
- Time Travel: Mock date/time to simulate different sync scenarios
- State Testing: Test with various lastSyncTime values
- Pagination: Test with large datasets requiring multiple pages
- Error Scenarios: Test with network failures, rate limits, invalid tokens
Performance Optimization
Batch API Calls
// Bad: Sequential calls
for (const id of ids) {
const data = await fetchData(id);
process(data);
}
// Good: Parallel calls with limit
const chunks = chunkArray(ids, 10); // Process 10 at a time
for (const chunk of chunks) {
const results = await Promise.all(chunk.map(id => fetchData(id)));
results.forEach(process);
}
Cache Frequently Used Data
const userCache = new Map();
async function getUserDetails(userId: string, accessToken: string) {
if (userCache.has(userId)) {
return userCache.get(userId);
}
const user = await fetch(`https://api.github.com/users/${userId}`, {
headers: { Authorization: `Bearer ${accessToken}` }
});
userCache.set(userId, user);
return user;
}
