Skip to content

Eneza Notification Worker

The Notification Worker is a Node.js microservice that handles all asynchronous notification delivery via Google Pub/Sub subscriptions.

Architecture

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   ENEZA API     │────▶│  GOOGLE PUBSUB  │────▶│  NOTIFICATION   │
│                 │     │                 │     │     WORKER      │
│ • Publish msg   │     │ Topic per type  │     │                 │
│                 │     │                 │     │ • Email (SMTP)  │
│                 │     │                 │     │ • Push (FCM)    │
│                 │     │                 │     │ • WhatsApp      │
└─────────────────┘     └─────────────────┘     └─────────────────┘

Technology Stack

ComponentTechnology
RuntimeNode.js 20
Message BrokerGoogle Pub/Sub
EmailGmail SMTP (nodemailer)
PushFirebase Cloud Messaging
WhatsAppWhatsApp Business API
HostingCloud Run

Project Structure

notification-worker/
├── src/
│   ├── index.ts              # Entry point, subscription setup
│   ├── config.ts             # Configuration
│   ├── logger.ts             # Logging
│   ├── handlers/             # Topic handlers
│   │   ├── adLive.ts
│   │   ├── adRejected.ts
│   │   ├── advertiserDripEmail.ts
│   │   ├── emailVerification.ts
│   │   ├── invoiceEmail.ts
│   │   ├── passwordReset.ts
│   │   ├── paymentReceipt.ts
│   │   ├── paymentReminder.ts
│   │   ├── pushNotification.ts
│   │   └── whatsapp.ts
│   └── services/
│       ├── emailService.ts   # SMTP client
│       └── pushService.ts    # FCM client
└── package.json

Supported Topics

TopicHandlerChannel
invoice-emailhandleInvoiceEmailEmail
email-verificationhandleEmailVerificationEmail
password-resethandlePasswordResetEmail
payment-reminderhandlePaymentReminderEmail
payment-receipthandlePaymentReceiptEmail
ad-livehandleAdLiveEmail
ad-rejectedhandleAdRejectedEmail
push-notificationhandlePushNotificationPush (FCM)
whatsapp-messagehandleWhatsAppWhatsApp
advertiser-drip-emailhandleAdvertiserDripEmailEmail

Entry Point

typescript
// index.ts
import { PubSub, Message } from '@google-cloud/pubsub';
import { handleInvoiceEmail } from './handlers/invoiceEmail';
import { handleEmailVerification } from './handlers/emailVerification';
import { handlePasswordReset } from './handlers/passwordReset';
// ... other imports

// Topic to handler mapping
const topicHandlers: Record<string, (data: any) => Promise<void>> = {
  'invoice-email': handleInvoiceEmail,
  'email-verification': handleEmailVerification,
  'password-reset': handlePasswordReset,
  'payment-reminder': handlePaymentReminder,
  'payment-receipt': handlePaymentReceipt,
  'ad-live': handleAdLive,
  'ad-rejected': handleAdRejected,
  'whatsapp-message': handleWhatsApp,
  'push-notification': handlePushNotification,
  'advertiser-drip-email': handleAdvertiserDripEmail,
};

async function main() {
  // Start health check server
  startHealthServer();

  const pubsub = new PubSub({ projectId: config.gcpProjectId });

  // Subscribe to each topic
  for (const [topicBase, handler] of Object.entries(topicHandlers)) {
    const subscriptionName = getSubscriptionName(topicBase);
    const subscription = pubsub.subscription(subscriptionName);

    const [exists] = await subscription.exists();
    if (!exists) {
      logger.warn(`Subscription ${subscriptionName} does not exist, skipping...`);
      continue;
    }

    // Message handler
    subscription.on('message', async (message: Message) => {
      const startTime = Date.now();

      try {
        const data = JSON.parse(message.data.toString());
        logger.info(`Received message on ${topicBase}`, {
          messageId: message.id,
          publishTime: message.publishTime,
        });

        await handler(data);
        message.ack();

        logger.info(`Message processed successfully`, {
          topic: topicBase,
          duration: Date.now() - startTime,
        });
      } catch (error) {
        logger.error(`Error processing message on ${topicBase}`, {
          messageId: message.id,
          error: error.message,
        });
        message.nack(); // Retry
      }
    });

    logger.info(`Subscribed to: ${subscriptionName}`);
  }

  logger.info('Notification Worker Ready');
}

Handler Examples

Email Verification Handler

typescript
// handlers/emailVerification.ts
export async function handleEmailVerification(data: {
  email: string;
  verificationCode: string;
  fullName?: string;
}) {
  const { email, verificationCode, fullName } = data;

  await emailService.send({
    to: email,
    subject: 'Verify Your Eneza Account',
    html: `
      <h1>Welcome to Eneza${fullName ? `, ${fullName}` : ''}!</h1>
      <p>Your verification code is:</p>
      <h2 style="font-size: 32px; letter-spacing: 8px;">${verificationCode}</h2>
      <p>This code expires in 10 minutes.</p>
    `,
  });

  logger.info('Verification email sent', { email });
}

Push Notification Handler

typescript
// handlers/pushNotification.ts
export interface PushNotificationPayload {
  deviceToken: string;
  title: string;
  body: string;
  data?: {
    type: string;
    ticketId?: string;
    subscriptionId?: string;
    [key: string]: string | undefined;
  };
  imageUrl?: string;
}

export async function handlePushNotification(data: PushNotificationPayload) {
  if (!data.deviceToken || !data.title || !data.body) {
    throw new Error('Missing required fields');
  }

  // Convert data to string values (FCM requirement)
  const stringData: Record<string, string> = {};
  if (data.data) {
    for (const [key, value] of Object.entries(data.data)) {
      if (value !== undefined) {
        stringData[key] = String(value);
      }
    }
  }

  const result = await pushService.sendPushNotification({
    deviceToken: data.deviceToken,
    title: data.title,
    body: data.body,
    data: stringData,
    imageUrl: data.imageUrl,
  });

  if (!result.success) {
    if (result.error?.includes('Invalid or expired')) {
      // Don't retry invalid tokens
      logger.warn('Device token is invalid', { deviceToken: data.deviceToken.slice(-10) });
      return;
    }
    throw new Error(result.error || 'Push notification failed');
  }

  logger.info('Push notification sent', { messageId: result.messageId });
}

Invoice Email Handler

typescript
// handlers/invoiceEmail.ts
export async function handleInvoiceEmail(data: {
  advertiserId: string;
  invoiceId: string;
  pdfUrl?: string;
}) {
  const { advertiserId, invoiceId } = data;

  // Get invoice details
  const invoice = await prisma.invoice.findUnique({
    where: { id: invoiceId },
    include: { advertiser: true }
  });

  // Generate PDF if not provided
  let pdfBuffer: Buffer;
  if (data.pdfUrl) {
    pdfBuffer = await downloadPdf(data.pdfUrl);
  } else {
    pdfBuffer = await pdfService.generateInvoice(invoice);
  }

  await emailService.send({
    to: invoice.advertiser.emailAddress,
    subject: `Invoice ${invoice.invoiceNumber} - Eneza`,
    html: `
      <h1>Invoice ${invoice.invoiceNumber}</h1>
      <p>Thank you for advertising with Eneza!</p>
      <p><strong>Amount:</strong> ${formatCurrency(invoice.amountLocal, invoice.currency)}</p>
      <p><strong>Campaign:</strong> ${invoice.estimatedViews.toLocaleString()} estimated views</p>
      <p>Please find your invoice attached.</p>
    `,
    attachments: [{
      filename: `${invoice.invoiceNumber}.pdf`,
      content: pdfBuffer,
      contentType: 'application/pdf'
    }]
  });

  // Update invoice
  await prisma.invoice.update({
    where: { id: invoiceId },
    data: {
      sentViaEmail: true,
      emailSentAt: new Date()
    }
  });

  logger.info('Invoice email sent', { invoiceId, email: invoice.advertiser.emailAddress });
}

Ad Live Notification

typescript
// handlers/adLive.ts
export async function handleAdLive(data: {
  advertiserId: string;
  adId: string;
  adTitle: string;
}) {
  const { advertiserId, adId, adTitle } = data;

  const advertiser = await prisma.advertiser.findUnique({
    where: { id: advertiserId }
  });

  await emailService.send({
    to: advertiser.emailAddress,
    subject: `🚀 Your Campaign is Live: ${adTitle}`,
    html: `
      <h1>Your Campaign is Now Live!</h1>
      <p>Great news! Your ad "${adTitle}" has been approved and is now live.</p>
      <p>Posters across our network are now viewing and sharing your content.</p>
      <p><a href="https://grow.eneza.app/campaigns/${adId}">View Campaign Dashboard</a></p>
    `
  });

  logger.info('Ad live notification sent', { adId, email: advertiser.emailAddress });
}

Advertiser Drip Campaign

typescript
// handlers/advertiserDripEmail.ts
const DRIP_EMAILS = {
  welcome: {
    subject: 'Welcome to Eneza! 🎉',
    delayHours: 0,
    template: 'drip/welcome'
  },
  gettingStarted: {
    subject: 'Getting Started with Your First Campaign',
    delayHours: 24,
    template: 'drip/getting-started'
  },
  tips: {
    subject: '5 Tips for Successful Campaigns',
    delayHours: 72,
    template: 'drip/tips'
  },
  inactive: {
    subject: "We miss you! Here's 20% off your next campaign",
    delayHours: 168, // 7 days
    template: 'drip/inactive'
  }
};

export async function handleAdvertiserDripEmail(data: {
  advertiserId: string;
  emailType: keyof typeof DRIP_EMAILS;
}) {
  const { advertiserId, emailType } = data;
  const emailConfig = DRIP_EMAILS[emailType];

  const advertiser = await prisma.advertiser.findUnique({
    where: { id: advertiserId }
  });

  // Check email preferences
  if (!advertiser.emailPreferences?.marketing) {
    logger.info('Skipping drip email - marketing disabled', { advertiserId });
    return;
  }

  const html = await renderTemplate(emailConfig.template, { advertiser });

  await emailService.send({
    to: advertiser.emailAddress,
    subject: emailConfig.subject,
    html
  });

  // Update drip state
  await prisma.advertiser.update({
    where: { id: advertiserId },
    data: {
      dripCampaignState: {
        ...advertiser.dripCampaignState,
        [emailType]: new Date().toISOString()
      },
      lastEmailSentAt: new Date()
    }
  });

  logger.info('Drip email sent', { advertiserId, emailType });
}

Services

Email Service

typescript
// services/emailService.ts
import nodemailer from 'nodemailer';

class EmailService {
  private transporter: nodemailer.Transporter;

  constructor() {
    this.transporter = nodemailer.createTransport({
      service: 'gmail',
      auth: {
        user: process.env.GMAIL_USER,
        pass: process.env.GMAIL_APP_PASSWORD
      }
    });
  }

  async send(options: {
    to: string;
    subject: string;
    html: string;
    attachments?: Array<{
      filename: string;
      content: Buffer;
      contentType: string;
    }>;
  }) {
    await this.transporter.sendMail({
      from: `"Eneza" <${process.env.GMAIL_USER}>`,
      to: options.to,
      subject: options.subject,
      html: options.html,
      attachments: options.attachments
    });
  }
}

export const emailService = new EmailService();

Push Service (FCM)

typescript
// services/pushService.ts
import * as admin from 'firebase-admin';

class PushService {
  constructor() {
    if (!admin.apps.length) {
      admin.initializeApp({
        credential: admin.credential.cert(
          JSON.parse(process.env.FIREBASE_SERVICE_ACCOUNT)
        )
      });
    }
  }

  async sendPushNotification(payload: {
    deviceToken: string;
    title: string;
    body: string;
    data?: Record<string, string>;
    imageUrl?: string;
  }): Promise<{ success: boolean; messageId?: string; error?: string }> {
    try {
      const message: admin.messaging.Message = {
        token: payload.deviceToken,
        notification: {
          title: payload.title,
          body: payload.body,
          ...(payload.imageUrl && { imageUrl: payload.imageUrl })
        },
        data: payload.data,
        android: {
          priority: 'high',
          notification: {
            sound: 'default',
            clickAction: 'FLUTTER_NOTIFICATION_CLICK'
          }
        },
        apns: {
          payload: {
            aps: { sound: 'default' }
          }
        }
      };

      const messageId = await admin.messaging().send(message);
      return { success: true, messageId };
    } catch (error) {
      return { success: false, error: error.message };
    }
  }
}

export const pushService = new PushService();

Environment Variables

bash
# GCP
GCP_PROJECT_ID=eneza-40ab5
PUBSUB_TOPIC_SUFFIX=prod

# Email
GMAIL_USER=notifications@eneza.app
GMAIL_APP_PASSWORD=...

# Firebase
FIREBASE_SERVICE_ACCOUNT={"type":"service_account",...}

# Cloud Run
PORT=8080

Health Check

typescript
function startHealthServer() {
  const port = parseInt(process.env.PORT || '8080', 10);
  const server = http.createServer((req, res) => {
    if (req.url === '/health' || req.url === '/') {
      res.writeHead(200, { 'Content-Type': 'application/json' });
      res.end(JSON.stringify({ status: 'healthy', service: 'notification-worker' }));
    } else {
      res.writeHead(404);
      res.end();
    }
  });

  server.listen(port, '0.0.0.0', () => {
    logger.info(`Health server listening on port ${port}`);
  });
}

Graceful Shutdown

typescript
process.on('SIGTERM', () => {
  logger.info('Received SIGTERM, shutting down...');
  // Pub/Sub subscriptions will stop receiving messages
  // In-flight messages will be processed before exit
  process.exit(0);
});

One chat. Everything done.