Eneza Notification Worker
The Notification Worker is a Node.js microservice that handles all asynchronous notification delivery via Google Pub/Sub subscriptions.
Architecture
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ ENEZA API │────▶│ GOOGLE PUBSUB │────▶│ NOTIFICATION │
│ │ │ │ │ WORKER │
│ • Publish msg │ │ Topic per type │ │ │
│ │ │ │ │ • Email (SMTP) │
│ │ │ │ │ • Push (FCM) │
│ │ │ │ │ • WhatsApp │
└─────────────────┘ └─────────────────┘ └─────────────────┘Technology Stack
| Component | Technology |
|---|---|
| Runtime | Node.js 20 |
| Message Broker | Google Pub/Sub |
| Gmail SMTP (nodemailer) | |
| Push | Firebase Cloud Messaging |
| WhatsApp Business API | |
| Hosting | Cloud Run |
Project Structure
notification-worker/
├── src/
│ ├── index.ts # Entry point, subscription setup
│ ├── config.ts # Configuration
│ ├── logger.ts # Logging
│ ├── handlers/ # Topic handlers
│ │ ├── adLive.ts
│ │ ├── adRejected.ts
│ │ ├── advertiserDripEmail.ts
│ │ ├── emailVerification.ts
│ │ ├── invoiceEmail.ts
│ │ ├── passwordReset.ts
│ │ ├── paymentReceipt.ts
│ │ ├── paymentReminder.ts
│ │ ├── pushNotification.ts
│ │ └── whatsapp.ts
│ └── services/
│ ├── emailService.ts # SMTP client
│ └── pushService.ts # FCM client
└── package.jsonSupported Topics
| Topic | Handler | Channel |
|---|---|---|
invoice-email | handleInvoiceEmail | |
email-verification | handleEmailVerification | |
password-reset | handlePasswordReset | |
payment-reminder | handlePaymentReminder | |
payment-receipt | handlePaymentReceipt | |
ad-live | handleAdLive | |
ad-rejected | handleAdRejected | |
push-notification | handlePushNotification | Push (FCM) |
whatsapp-message | handleWhatsApp | |
advertiser-drip-email | handleAdvertiserDripEmail |
Entry Point
typescript
// index.ts
import { PubSub, Message } from '@google-cloud/pubsub';
import { handleInvoiceEmail } from './handlers/invoiceEmail';
import { handleEmailVerification } from './handlers/emailVerification';
import { handlePasswordReset } from './handlers/passwordReset';
// ... other imports
// Topic to handler mapping
const topicHandlers: Record<string, (data: any) => Promise<void>> = {
'invoice-email': handleInvoiceEmail,
'email-verification': handleEmailVerification,
'password-reset': handlePasswordReset,
'payment-reminder': handlePaymentReminder,
'payment-receipt': handlePaymentReceipt,
'ad-live': handleAdLive,
'ad-rejected': handleAdRejected,
'whatsapp-message': handleWhatsApp,
'push-notification': handlePushNotification,
'advertiser-drip-email': handleAdvertiserDripEmail,
};
async function main() {
// Start health check server
startHealthServer();
const pubsub = new PubSub({ projectId: config.gcpProjectId });
// Subscribe to each topic
for (const [topicBase, handler] of Object.entries(topicHandlers)) {
const subscriptionName = getSubscriptionName(topicBase);
const subscription = pubsub.subscription(subscriptionName);
const [exists] = await subscription.exists();
if (!exists) {
logger.warn(`Subscription ${subscriptionName} does not exist, skipping...`);
continue;
}
// Message handler
subscription.on('message', async (message: Message) => {
const startTime = Date.now();
try {
const data = JSON.parse(message.data.toString());
logger.info(`Received message on ${topicBase}`, {
messageId: message.id,
publishTime: message.publishTime,
});
await handler(data);
message.ack();
logger.info(`Message processed successfully`, {
topic: topicBase,
duration: Date.now() - startTime,
});
} catch (error) {
logger.error(`Error processing message on ${topicBase}`, {
messageId: message.id,
error: error.message,
});
message.nack(); // Retry
}
});
logger.info(`Subscribed to: ${subscriptionName}`);
}
logger.info('Notification Worker Ready');
}Handler Examples
Email Verification Handler
typescript
// handlers/emailVerification.ts
export async function handleEmailVerification(data: {
email: string;
verificationCode: string;
fullName?: string;
}) {
const { email, verificationCode, fullName } = data;
await emailService.send({
to: email,
subject: 'Verify Your Eneza Account',
html: `
<h1>Welcome to Eneza${fullName ? `, ${fullName}` : ''}!</h1>
<p>Your verification code is:</p>
<h2 style="font-size: 32px; letter-spacing: 8px;">${verificationCode}</h2>
<p>This code expires in 10 minutes.</p>
`,
});
logger.info('Verification email sent', { email });
}Push Notification Handler
typescript
// handlers/pushNotification.ts
export interface PushNotificationPayload {
deviceToken: string;
title: string;
body: string;
data?: {
type: string;
ticketId?: string;
subscriptionId?: string;
[key: string]: string | undefined;
};
imageUrl?: string;
}
export async function handlePushNotification(data: PushNotificationPayload) {
if (!data.deviceToken || !data.title || !data.body) {
throw new Error('Missing required fields');
}
// Convert data to string values (FCM requirement)
const stringData: Record<string, string> = {};
if (data.data) {
for (const [key, value] of Object.entries(data.data)) {
if (value !== undefined) {
stringData[key] = String(value);
}
}
}
const result = await pushService.sendPushNotification({
deviceToken: data.deviceToken,
title: data.title,
body: data.body,
data: stringData,
imageUrl: data.imageUrl,
});
if (!result.success) {
if (result.error?.includes('Invalid or expired')) {
// Don't retry invalid tokens
logger.warn('Device token is invalid', { deviceToken: data.deviceToken.slice(-10) });
return;
}
throw new Error(result.error || 'Push notification failed');
}
logger.info('Push notification sent', { messageId: result.messageId });
}Invoice Email Handler
typescript
// handlers/invoiceEmail.ts
export async function handleInvoiceEmail(data: {
advertiserId: string;
invoiceId: string;
pdfUrl?: string;
}) {
const { advertiserId, invoiceId } = data;
// Get invoice details
const invoice = await prisma.invoice.findUnique({
where: { id: invoiceId },
include: { advertiser: true }
});
// Generate PDF if not provided
let pdfBuffer: Buffer;
if (data.pdfUrl) {
pdfBuffer = await downloadPdf(data.pdfUrl);
} else {
pdfBuffer = await pdfService.generateInvoice(invoice);
}
await emailService.send({
to: invoice.advertiser.emailAddress,
subject: `Invoice ${invoice.invoiceNumber} - Eneza`,
html: `
<h1>Invoice ${invoice.invoiceNumber}</h1>
<p>Thank you for advertising with Eneza!</p>
<p><strong>Amount:</strong> ${formatCurrency(invoice.amountLocal, invoice.currency)}</p>
<p><strong>Campaign:</strong> ${invoice.estimatedViews.toLocaleString()} estimated views</p>
<p>Please find your invoice attached.</p>
`,
attachments: [{
filename: `${invoice.invoiceNumber}.pdf`,
content: pdfBuffer,
contentType: 'application/pdf'
}]
});
// Update invoice
await prisma.invoice.update({
where: { id: invoiceId },
data: {
sentViaEmail: true,
emailSentAt: new Date()
}
});
logger.info('Invoice email sent', { invoiceId, email: invoice.advertiser.emailAddress });
}Ad Live Notification
typescript
// handlers/adLive.ts
export async function handleAdLive(data: {
advertiserId: string;
adId: string;
adTitle: string;
}) {
const { advertiserId, adId, adTitle } = data;
const advertiser = await prisma.advertiser.findUnique({
where: { id: advertiserId }
});
await emailService.send({
to: advertiser.emailAddress,
subject: `🚀 Your Campaign is Live: ${adTitle}`,
html: `
<h1>Your Campaign is Now Live!</h1>
<p>Great news! Your ad "${adTitle}" has been approved and is now live.</p>
<p>Posters across our network are now viewing and sharing your content.</p>
<p><a href="https://grow.eneza.app/campaigns/${adId}">View Campaign Dashboard</a></p>
`
});
logger.info('Ad live notification sent', { adId, email: advertiser.emailAddress });
}Advertiser Drip Campaign
typescript
// handlers/advertiserDripEmail.ts
const DRIP_EMAILS = {
welcome: {
subject: 'Welcome to Eneza! 🎉',
delayHours: 0,
template: 'drip/welcome'
},
gettingStarted: {
subject: 'Getting Started with Your First Campaign',
delayHours: 24,
template: 'drip/getting-started'
},
tips: {
subject: '5 Tips for Successful Campaigns',
delayHours: 72,
template: 'drip/tips'
},
inactive: {
subject: "We miss you! Here's 20% off your next campaign",
delayHours: 168, // 7 days
template: 'drip/inactive'
}
};
export async function handleAdvertiserDripEmail(data: {
advertiserId: string;
emailType: keyof typeof DRIP_EMAILS;
}) {
const { advertiserId, emailType } = data;
const emailConfig = DRIP_EMAILS[emailType];
const advertiser = await prisma.advertiser.findUnique({
where: { id: advertiserId }
});
// Check email preferences
if (!advertiser.emailPreferences?.marketing) {
logger.info('Skipping drip email - marketing disabled', { advertiserId });
return;
}
const html = await renderTemplate(emailConfig.template, { advertiser });
await emailService.send({
to: advertiser.emailAddress,
subject: emailConfig.subject,
html
});
// Update drip state
await prisma.advertiser.update({
where: { id: advertiserId },
data: {
dripCampaignState: {
...advertiser.dripCampaignState,
[emailType]: new Date().toISOString()
},
lastEmailSentAt: new Date()
}
});
logger.info('Drip email sent', { advertiserId, emailType });
}Services
Email Service
typescript
// services/emailService.ts
import nodemailer from 'nodemailer';
class EmailService {
private transporter: nodemailer.Transporter;
constructor() {
this.transporter = nodemailer.createTransport({
service: 'gmail',
auth: {
user: process.env.GMAIL_USER,
pass: process.env.GMAIL_APP_PASSWORD
}
});
}
async send(options: {
to: string;
subject: string;
html: string;
attachments?: Array<{
filename: string;
content: Buffer;
contentType: string;
}>;
}) {
await this.transporter.sendMail({
from: `"Eneza" <${process.env.GMAIL_USER}>`,
to: options.to,
subject: options.subject,
html: options.html,
attachments: options.attachments
});
}
}
export const emailService = new EmailService();Push Service (FCM)
typescript
// services/pushService.ts
import * as admin from 'firebase-admin';
class PushService {
constructor() {
if (!admin.apps.length) {
admin.initializeApp({
credential: admin.credential.cert(
JSON.parse(process.env.FIREBASE_SERVICE_ACCOUNT)
)
});
}
}
async sendPushNotification(payload: {
deviceToken: string;
title: string;
body: string;
data?: Record<string, string>;
imageUrl?: string;
}): Promise<{ success: boolean; messageId?: string; error?: string }> {
try {
const message: admin.messaging.Message = {
token: payload.deviceToken,
notification: {
title: payload.title,
body: payload.body,
...(payload.imageUrl && { imageUrl: payload.imageUrl })
},
data: payload.data,
android: {
priority: 'high',
notification: {
sound: 'default',
clickAction: 'FLUTTER_NOTIFICATION_CLICK'
}
},
apns: {
payload: {
aps: { sound: 'default' }
}
}
};
const messageId = await admin.messaging().send(message);
return { success: true, messageId };
} catch (error) {
return { success: false, error: error.message };
}
}
}
export const pushService = new PushService();Environment Variables
bash
# GCP
GCP_PROJECT_ID=eneza-40ab5
PUBSUB_TOPIC_SUFFIX=prod
# Email
GMAIL_USER=notifications@eneza.app
GMAIL_APP_PASSWORD=...
# Firebase
FIREBASE_SERVICE_ACCOUNT={"type":"service_account",...}
# Cloud Run
PORT=8080Health Check
typescript
function startHealthServer() {
const port = parseInt(process.env.PORT || '8080', 10);
const server = http.createServer((req, res) => {
if (req.url === '/health' || req.url === '/') {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'healthy', service: 'notification-worker' }));
} else {
res.writeHead(404);
res.end();
}
});
server.listen(port, '0.0.0.0', () => {
logger.info(`Health server listening on port ${port}`);
});
}Graceful Shutdown
typescript
process.on('SIGTERM', () => {
logger.info('Received SIGTERM, shutting down...');
// Pub/Sub subscriptions will stop receiving messages
// In-flight messages will be processed before exit
process.exit(0);
});