Skip to main content

Schedule-Based Sync Integrations

New to building integrations? Start with our step-by-step contributor guide for a quick overview.
Schedule-based integrations run on a cron schedule to poll external APIs and fetch new activities. This reference guide uses GitHub as the detailed example.

When to Use Schedule-Based Sync

Choose schedule-based integrations when:
  • The service doesn’t provide webhook events
  • You want to batch process multiple API calls
  • The service has rate limits that require careful timing
  • Real-time sync isn’t critical (5-15 minute delays acceptable)
  • You need to poll multiple endpoints and aggregate data

Architecture Overview

Cron Schedule (every 5-15 min)

    SYNC Event Triggered

    Fetch New Activities

    Compare with Last Sync Time

    Create Activity Messages

    Update State with New Sync Time

    Knowledge Graph

Key Concepts

State Management

Schedule-based integrations maintain state between runs:
interface GitHubSettings {
  lastSyncTime?: string;           // Last time we synced notifications
  lastUserEventTime?: string;      // Last time we synced user events
  username?: string;               // GitHub username
}
State is persisted by returning a state message:
return [
  // ... activity messages
  {
    type: 'state',
    data: {
      lastSyncTime: new Date().toISOString(),
      lastUserEventTime: new Date().toISOString(),
      username: 'octocat'
    }
  }
];

Pagination

Handle large datasets by paginating through results:
let page = 1;
let hasMorePages = true;

while (hasMorePages) {
  const data = await fetchData(page);

  if (!data || data.length === 0) {
    hasMorePages = false;
    break;
  }

  // Process data
  for (const item of data) {
    // Create activities
  }

  // Check if more pages exist
  if (data.length < perPage) {
    hasMorePages = false;
  } else {
    page++;
  }
}

Incremental Sync

Only fetch new data since last sync:
const lastSyncTime = state.lastSyncTime || getDefaultSyncTime(); // 24 hours ago

// Use lastSyncTime to filter API requests
const notifications = await fetch(
  `https://api.github.com/notifications?since=${lastSyncTime}`
);

Full Example: GitHub Integration

spec.json

{
  "name": "GitHub extension",
  "key": "github",
  "description": "Plan, track, and manage your projects in GitHub",
  "icon": "github",

  "schedule": {
    "frequency": "*/5 * * * *"  // Run every 5 minutes
  },

  "auth": {
    "OAuth2": {
      "token_url": "https://github.com/login/oauth/access_token",
      "authorization_url": "https://github.com/login/oauth/authorize",
      "scopes": [
        "user",
        "public_repo",
        "repo",
        "notifications",
        "read:org"
      ],
      "scope_separator": ","
    }
  },

  "mcp": {
    "type": "http",
    "url": "https://api.githubcopilot.com/mcp/",
    "headers": {
      "Authorization": "Bearer ${config:access_token}",
      "Content-Type": "application/json"
    }
  }
}

index.ts

import { handleSchedule } from './schedule';
import { integrationCreate } from './account-create';
import {
  IntegrationCLI,
  IntegrationEventPayload,
  IntegrationEventType,
  Spec,
} from '@redplanethq/sdk';

export async function run(eventPayload: IntegrationEventPayload) {
  switch (eventPayload.event) {
    case IntegrationEventType.SETUP:
      return await integrationCreate(eventPayload.eventBody);

    case IntegrationEventType.SYNC:
      // Handle scheduled sync
      return await handleSchedule(eventPayload.config, eventPayload.state);

    default:
      return { message: `Unknown event type: ${eventPayload.event}` };
  }
}

class GitHubCLI extends IntegrationCLI {
  constructor() {
    super('github', '1.0.0');
  }

  protected async handleEvent(eventPayload: IntegrationEventPayload): Promise<any> {
    return await run(eventPayload);
  }

  protected async getSpec(): Promise<Spec> {
    return {
      name: 'GitHub extension',
      key: 'github',
      // ... rest of spec
    };
  }
}

function main() {
  const githubCLI = new GitHubCLI();
  githubCLI.parse();
}

main();

schedule.ts

import { getUserEvents, getGithubData } from './utils';

interface GitHubActivityCreateParams {
  text: string;
  sourceURL: string;
}

interface GitHubSettings {
  lastSyncTime?: string;
  lastUserEventTime?: string;
  username?: string;
}

function createActivityMessage(params: GitHubActivityCreateParams) {
  return {
    type: 'activity',
    data: {
      text: params.text,
      sourceURL: params.sourceURL,
    },
  };
}

function getDefaultSyncTime(): string {
  // Default to 24 hours ago
  return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}

async function processNotifications(
  accessToken: string,
  lastSyncTime: string,
  username: string
): Promise<any[]> {
  const activities = [];

  // Filter for relevant notification reasons
  const allowedReasons = [
    'assign',
    'review_requested',
    'mention',
    'state_change',
    'author',
    'comment',
    'team_mention',
  ];

  let page = 1;
  let hasMorePages = true;

  while (hasMorePages) {
    try {
      // Fetch notifications with pagination
      const notifications = await getGithubData(
        `https://api.github.com/notifications?page=${page}&per_page=50&all=true&since=${lastSyncTime}`,
        accessToken
      );

      if (!notifications || notifications.length === 0) {
        hasMorePages = false;
        break;
      }

      // Check if more pages exist
      if (notifications.length < 50) {
        hasMorePages = false;
      } else {
        page++;
      }

      // Process each notification
      for (const notification of notifications) {
        try {
          // Filter by allowed reasons
          if (!allowedReasons.includes(notification.reason)) {
            continue;
          }

          const repository = notification.repository;
          const subject = notification.subject;

          // Fetch detailed data for the notification
          let githubData: any = {};
          if (subject.url) {
            try {
              githubData = await getGithubData(subject.url, accessToken);
            } catch (error) {
              continue; // Skip if we can't fetch details
            }
          }

          const url = githubData.html_url || notification.subject.url || '';
          const isIssue = subject.type === 'Issue';
          const isPR = subject.type === 'PullRequest';

          let title = '';

          // Build contextual activity based on notification reason
          switch (notification.reason) {
            case 'assign':
              title = `${isIssue ? 'Issue' : 'PR'} #${githubData.number} assigned to ${username} in ${repository.full_name}: ${githubData.title}`;
              break;

            case 'review_requested':
              title = `${username} requested to review PR #${githubData.number} in ${repository.full_name}: ${githubData.title}`;
              break;

            case 'mention':
              title = `${githubData.user?.login} mentioned ${username} in ${repository.full_name} ${isIssue ? 'issue' : 'PR'} #${githubData.number}: ${githubData.body}`;
              break;

            case 'comment':
              title = `${githubData.user?.login} commented on ${isIssue ? 'issue' : 'PR'} #${githubData.number} in ${repository.full_name}: ${githubData.body}`;
              break;

            case 'state_change':
              const stateInfo = githubData.state ? `to ${githubData.state}` : '';
              title = `${githubData.user?.login} changed ${isIssue ? 'issue' : 'PR'} #${githubData.number} state ${stateInfo} in ${repository.full_name}: ${githubData.title}`;
              break;

            default:
              title = `GitHub notification for ${username} in ${repository.full_name}`;
              break;
          }

          if (title && url) {
            activities.push(createActivityMessage({
              text: title,
              sourceURL: url,
            }));
          }
        } catch (error) {
          // Silently skip errors for individual notifications
          continue;
        }
      }
    } catch (error) {
      hasMorePages = false;
    }
  }

  return activities;
}

async function processUserEvents(
  username: string,
  accessToken: string,
  lastUserEventTime: string
): Promise<any[]> {
  const activities = [];
  let page = 1;
  let hasMorePages = true;

  while (hasMorePages) {
    try {
      // Fetch user events (PRs, issues they created)
      const userEvents = await getUserEvents(username, page, accessToken, lastUserEventTime);

      if (!userEvents || userEvents.length === 0) {
        hasMorePages = false;
        break;
      }

      if (userEvents.length < 30) {
        hasMorePages = false;
      } else {
        page++;
      }

      for (const event of userEvents) {
        try {
          let title = '';
          const sourceURL = event.html_url || '';

          switch (event.type) {
            case 'pr':
              title = `${username} created PR #${event.number}: ${event.title}`;
              break;
            case 'issue':
              title = `${username} created issue #${event.number}: ${event.title}`;
              break;
            case 'pr_comment':
              title = `${username} commented on PR #${event.number}: ${event.title}`;
              break;
            case 'issue_comment':
              title = `${username} commented on issue #${event.number}: ${event.title}`;
              break;
            default:
              title = `GitHub activity: ${event.title || 'Unknown'}`;
              break;
          }

          if (title && sourceURL) {
            activities.push(createActivityMessage({
              text: title,
              sourceURL: sourceURL,
            }));
          }
        } catch (error) {
          continue;
        }
      }
    } catch (error) {
      hasMorePages = false;
    }
  }

  return activities;
}

export async function handleSchedule(config: any, state: any) {
  try {
    const integrationConfiguration = config;

    // Check for valid access token
    if (!integrationConfiguration?.access_token) {
      return [];
    }

    // Get settings or initialize
    let settings = (state || {}) as GitHubSettings;

    // Default to 24 hours ago if no last sync times
    const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();
    const lastUserEventTime = settings.lastUserEventTime || getDefaultSyncTime();

    // Fetch user info to get username
    let user;
    try {
      user = await getGithubData('https://api.github.com/user', integrationConfiguration.access_token);
    } catch (error) {
      return [];
    }

    if (!user) {
      return [];
    }

    // Update username in settings
    if (!settings.username && user.login) {
      settings.username = user.login;
    }

    // Collect all messages
    const messages = [];

    // Process notifications
    try {
      const notificationActivities = await processNotifications(
        integrationConfiguration.access_token,
        lastSyncTime,
        settings.username || 'user'
      );
      messages.push(...notificationActivities);
    } catch (error) {
      // Continue even if notifications fail
    }

    // Process user events
    if (settings.username) {
      try {
        const userEventActivities = await processUserEvents(
          settings.username,
          integrationConfiguration.access_token,
          lastUserEventTime
        );
        messages.push(...userEventActivities);
      } catch (error) {
        // Continue even if user events fail
      }
    }

    // Update last sync times
    const newSyncTime = new Date().toISOString();

    // Add state message to persist settings
    messages.push({
      type: 'state',
      data: {
        ...settings,
        lastSyncTime: newSyncTime,
        lastUserEventTime: newSyncTime,
      },
    });

    return messages;
  } catch (error) {
    return [];
  }
}

Best Practices

1. State Management

Always persist state to track sync progress:
// Load state at start
let settings = (state || {}) as Settings;
const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();

// ... fetch and process data

// Save state at end
messages.push({
  type: 'state',
  data: {
    ...settings,
    lastSyncTime: new Date().toISOString(),
  }
});

2. Default Sync Window

On first sync, fetch last 24 hours of data:
function getDefaultSyncTime(): string {
  return new Date(Date.now() - 24 * 60 * 60 * 1000).toISOString();
}

const lastSyncTime = settings.lastSyncTime || getDefaultSyncTime();

3. Efficient Pagination

Stop paginating when no more data:
while (hasMorePages) {
  const data = await fetchData(page, perPage);

  // No data left
  if (!data || data.length === 0) {
    hasMorePages = false;
    break;
  }

  // Process data...

  // Less than full page means no more data
  if (data.length < perPage) {
    hasMorePages = false;
  } else {
    page++;
  }
}

4. Error Resilience

Continue processing even if individual items fail:
for (const item of items) {
  try {
    // Process item
    const activity = await processItem(item);
    activities.push(activity);
  } catch (error) {
    // Log but continue
    console.error('Failed to process item:', error);
    continue; // Don't let one failure stop everything
  }
}

5. Rate Limit Handling

Respect API rate limits:
async function fetchWithRetry(url: string, accessToken: string, retries = 3) {
  for (let i = 0; i < retries; i++) {
    try {
      const response = await fetch(url, {
        headers: { Authorization: `Bearer ${accessToken}` }
      });

      if (response.status === 429) {
        // Rate limited - wait and retry
        const retryAfter = response.headers.get('Retry-After') || 60;
        await sleep(parseInt(retryAfter) * 1000);
        continue;
      }

      return await response.json();
    } catch (error) {
      if (i === retries - 1) throw error;
      await sleep(1000 * (i + 1)); // Exponential backoff
    }
  }
}

6. Multiple Data Sources

Fetch from multiple endpoints and aggregate:
const messages = [];

// Fetch from multiple sources in parallel
const [notifications, pullRequests, issues] = await Promise.all([
  processNotifications(accessToken, lastSyncTime),
  processPullRequests(accessToken, lastSyncTime),
  processIssues(accessToken, lastSyncTime)
]);

// Aggregate all activities
messages.push(...notifications, ...pullRequests, ...issues);

// Add state
messages.push({ type: 'state', data: newState });

return messages;

7. Schedule Configuration

Choose appropriate sync frequency:
{
  "schedule": {
    "frequency": "*/5 * * * *"   // Every 5 minutes - real-time feel
    "frequency": "*/15 * * * *"  // Every 15 minutes - balance
    "frequency": "0 * * * *"     // Every hour - low volume
  }
}

Handling Account Setup

Set initial sync schedule during OAuth:
// account-create.ts
export async function integrationCreate(data: any) {
  const { oauthResponse } = data;

  const user = await getGithubData(
    'https://api.github.com/user',
    oauthResponse.access_token
  );

  return [{
    type: 'account',
    data: {
      accountId: user.id.toString(),
      config: {
        access_token: oauthResponse.access_token,
        refresh_token: oauthResponse.refresh_token,
        mcp: { tokens: { access_token: oauthResponse.access_token } }
      },
      settings: {
        username: user.login,
        schedule: {
          frequency: '*/15 * * * *' // Can customize per user
        }
      }
    }
  }];
}

Testing Schedule-Based Integrations

  1. Manual Trigger: Call SYNC event handler directly with test state
  2. Time Travel: Mock date/time to simulate different sync scenarios
  3. State Testing: Test with various lastSyncTime values
  4. Pagination: Test with large datasets requiring multiple pages
  5. Error Scenarios: Test with network failures, rate limits, invalid tokens

Performance Optimization

Batch API Calls

// Bad: Sequential calls
for (const id of ids) {
  const data = await fetchData(id);
  process(data);
}

// Good: Parallel calls with limit
const chunks = chunkArray(ids, 10); // Process 10 at a time
for (const chunk of chunks) {
  const results = await Promise.all(chunk.map(id => fetchData(id)));
  results.forEach(process);
}

Cache Frequently Used Data

const userCache = new Map();

async function getUserDetails(userId: string, accessToken: string) {
  if (userCache.has(userId)) {
    return userCache.get(userId);
  }

  const user = await fetch(`https://api.github.com/users/${userId}`, {
    headers: { Authorization: `Bearer ${accessToken}` }
  });

  userCache.set(userId, user);
  return user;
}

Next Steps