csa-backend-test/app/services/notification.services.js

640 lines
21 KiB
JavaScript

// ENVIRONMENTS
require("dotenv").config();
// DATABASE
const { PrismaClient: CMSClient } = require("../../prisma/clients/cms");
// SERVICES
const logger = require("./logger.services");
const { sendNotification } = require("./firebase.services");
const { getAIResponse } = require("./ai.services");
const { localTime } = require("./time.services");
const { createScheduledNotification } = require("./notification-scheduler.services");
// INSTANCES
const prisma = new CMSClient();
async function analyzeUserActivitiesForNotification(userID, timeRangeMinutes = 60) {
const startTime = Date.now();
let aiNotificationRecord = null;
try {
const userToken = await prisma.usersToken.findUnique({
where: { UserID_UT: userID },
select: { UUID_UT: true, Token_UT: true }
});
if (!userToken || !userToken.Token_UT) {
logger.warn(`No valid token found for user ${userID}`);
return {
shouldNotify: false,
reason: "No valid FCM token"
};
}
const timeRangeStart = new Date(Date.now() - timeRangeMinutes * 60 * 1000);
const activities = await prisma.usersActivity.findMany({
where: {
UUID_UT: userToken.UUID_UT,
CreatedAt_UA: {
gte: timeRangeStart
},
Processed_UA: false
},
select: {
UUID_UA: true,
ActivityType_UA: true,
Params_UA: true,
CreatedAt_UA: true
},
orderBy: {
CreatedAt_UA: 'desc'
}
});
if (activities.length === 0) {
logger.info(`No activities found for user ${userID} in the last ${timeRangeMinutes} minutes`);
return {
shouldNotify: false,
reason: "No activities found"
};
}
const activityTypes = [...new Set(activities.map(act => act.ActivityType_UA))];
const activitySummary = {
userID,
totalActivities: activities.length,
timeRange: `${timeRangeMinutes} minutes`,
activities: activities.map(act => ({
type: act.ActivityType_UA,
params: act.Params_UA,
timestamp: act.CreatedAt_UA
}))
};
logger.info(`Analyzing ${activities.length} activities for user ${userID}`);
const aiStartTime = Date.now();
const aiAnalysis = await getAIResponse(activitySummary);
const aiResponseTime = Date.now() - aiStartTime;
aiNotificationRecord = await prisma.aINotification.create({
data: {
UserID_AIN: userID,
AnalyzedActivities_AIN: activities.length,
ActivityTypes_AIN: JSON.stringify(activityTypes),
GeneratedTitle_AIN: aiAnalysis?.title || "No title generated",
GeneratedDesc_AIN: aiAnalysis?.description || "No description generated",
ResponseTime_AIN: aiResponseTime,
ActivityTimeRange_AIN: timeRangeMinutes,
ProcessingTime_AIN: Date.now() - startTime
}
});
if (activities.length > 0) {
await prisma.usersActivity.updateMany({
where: {
UUID_UA: { in: activities.map(act => act.UUID_UA) }
},
data: {
Processed_UA: true
}
});
}
if (aiAnalysis && aiAnalysis.title && aiAnalysis.description) {
return {
shouldNotify: true,
notification: {
title: aiAnalysis.title,
description: aiAnalysis.description,
data: {
source: 'activity-analyzer',
activityCount: activities.length.toString(),
lastActivityType: activities[0].ActivityType_UA,
aiNotificationId: aiNotificationRecord.UUID_AIN
}
},
reason: "AI recommendation",
activityCount: activities.length,
aiNotificationId: aiNotificationRecord.UUID_AIN,
recentActivities: activities,
processingTime: Date.now() - startTime
};
}
await prisma.aINotification.update({
where: { UUID_AIN: aiNotificationRecord.UUID_AIN },
data: {
SentStatus_AIN: "failed",
ErrorMessage_AIN: "AI did not recommend notification",
ProcessingTime_AIN: Date.now() - startTime
}
});
return {
shouldNotify: false,
reason: "AI did not recommend notification"
};
} catch (error) {
logger.error(`Error analyzing activities for user ${userID}:`, { error });
if (aiNotificationRecord) {
await prisma.aINotification.update({
where: { UUID_AIN: aiNotificationRecord.UUID_AIN },
data: {
SentStatus_AIN: "failed",
ErrorMessage_AIN: error.message,
ProcessingTime_AIN: Date.now() - startTime
}
}).catch(() => {});
}
return {
shouldNotify: false,
reason: `Error during analysis: ${error.message}`
};
}
}
async function sendNotificationToUser(userID, notification) {
try {
const { title, description, data = {} } = notification;
if (!title || !description) {
throw new Error("Notification must have title and description");
}
const userToken = await prisma.usersToken.findUnique({
where: { UserID_UT: userID }
});
if (!userToken || !userToken.Token_UT) {
logger.warn(`No valid token found for user ${userID}`);
if (data.aiNotificationId) {
await prisma.aINotification.update({
where: { UUID_AIN: data.aiNotificationId },
data: {
SentStatus_AIN: "failed",
ErrorMessage_AIN: "No valid FCM token",
FailedAt_AIN: localTime(new Date())
}
}).catch(() => {});
}
return {
success: false,
userID,
message: "No valid FCM token"
};
}
const messageId = await sendNotification(
userToken.Token_UT,
title,
description,
data
);
if (data.aiNotificationId) {
await prisma.aINotification.update({
where: { UUID_AIN: data.aiNotificationId },
data: {
SentStatus_AIN: "sent",
SentAt_AIN: localTime(new Date()),
FCMMessageId_AIN: messageId
}
}).catch(() => {});
}
logger.info(`Notification sent to user ${userID}`, { messageId });
return {
success: true,
userID,
messageId,
notification: { title, description }
};
} catch (error) {
logger.error(`Error sending notification to user ${userID}:`, { error });
if (notification?.data?.aiNotificationId) {
await prisma.aINotification.update({
where: { UUID_AIN: notification.data.aiNotificationId },
data: {
SentStatus_AIN: "failed",
ErrorMessage_AIN: error.message,
FailedAt_AIN: localTime(new Date())
}
}).catch(() => {});
}
return {
success: false,
userID,
message: error.message
};
}
}
async function processActivitiesAndSendNotifications(options = {}) {
const { timeRangeMinutes = 60, minActivityCount = 5 } = options;
try {
const startTime = Date.now();
logger.info(`Starting activity analysis and notification process (timeRange: ${timeRangeMinutes}min, minCount: ${minActivityCount})`);
const timeRangeStart = new Date(Date.now() - timeRangeMinutes * 60 * 1000);
const usersWithActivities = await prisma.usersToken.findMany({
where: {
UsersActivity: {
some: {
CreatedAt_UA: {
gte: timeRangeStart
}
}
}
},
select: {
UserID_UT: true,
Token_UT: true,
_count: {
select: { UsersActivity: true }
}
},
distinct: ['UserID_UT']
});
logger.info(`Found ${usersWithActivities.length} users with recent activities`);
const results = {
totalUsersProcessed: 0,
notificationsSent: 0,
notificationsSkipped: 0,
errors: 0,
processedUsers: [],
errorDetails: []
};
for (const user of usersWithActivities) {
try {
if (!user.Token_UT) {
logger.warn(`User ${user.UserID_UT} has no FCM token`);
results.notificationsSkipped++;
continue;
}
const analysis = await analyzeUserActivitiesForNotification(
user.UserID_UT,
timeRangeMinutes
);
results.totalUsersProcessed++;
if (analysis.shouldNotify) {
try {
const scheduledResult = await createScheduledNotification({
userID: user.UserID_UT,
recentActivities: analysis.recentActivities || [],
notificationContent: {
title: analysis.notification.title,
description: analysis.notification.description
},
analyzedActivityCount: analysis.activityCount || 0,
activityTypes: analysis.notification.data?.lastActivityType || 'unknown',
activityTimeRange: timeRangeMinutes,
aiModel: 'gemini-1.5-pro',
processingTime: analysis.processingTime || 0
});
results.notificationsSent++; // Count as scheduled
results.processedUsers.push({
userID: user.UserID_UT,
status: 'scheduled',
notification: analysis.notification,
activityCount: analysis.activityCount,
scheduledFor: scheduledResult.scheduledFor,
delayMinutes: scheduledResult.delayMinutes,
confidence: scheduledResult.confidence,
reasoning: scheduledResult.reasoning,
pattern: scheduledResult.pattern
});
logger.info(`✅ Notification scheduled for ${user.UserID_UT} at ${scheduledResult.scheduledFor}`);
} catch (scheduleError) {
logger.error(`Failed to schedule notification for ${user.UserID_UT}:`, scheduleError);
results.errors++;
results.errorDetails.push({
userID: user.UserID_UT,
error: scheduleError.message
});
}
} else {
results.notificationsSkipped++;
logger.info(`Notification skipped for user ${user.UserID_UT}: ${analysis.reason}`);
}
} catch (error) {
results.errors++;
results.errorDetails.push({
userID: user.UserID_UT,
error: error.message
});
logger.error(`Error processing user ${user.UserID_UT}:`, { error });
}
}
const duration = Date.now() - startTime;
results.processingTimeMs = duration;
results.timestamp = new Date();
logger.info(`Activity analysis completed`, {
totalUsersProcessed: results.totalUsersProcessed,
notificationsSent: results.notificationsSent,
notificationsSkipped: results.notificationsSkipped,
errors: results.errors,
durationMs: duration
});
return results;
} catch (error) {
logger.error("Error in processActivitiesAndSendNotifications:", { error });
throw error;
}
}
async function manualTriggerNotification(userID, timeRangeMinutes = 60) {
try {
logger.info(`Manual trigger for user ${userID}`);
const analysis = await analyzeUserActivitiesForNotification(userID, timeRangeMinutes);
if (!analysis.shouldNotify) {
return {
success: false,
userID,
reason: analysis.reason
};
}
const result = await sendNotificationToUser(userID, analysis.notification);
return {
success: result.success,
...result
};
} catch (error) {
logger.error(`Error in manual trigger for user ${userID}:`, { error });
return {
success: false,
userID,
error: error.message
};
}
}
async function updateDailyAnalytics(date = new Date()) {
try {
const targetDate = new Date(date);
targetDate.setUTCHours(0, 0, 0, 0);
const nextDate = new Date(targetDate);
nextDate.setDate(nextDate.getDate() + 1);
const dailyStats = await prisma.aINotification.groupBy({
by: ['SentStatus_AIN'],
where: {
CreatedAt_AIN: {
gte: targetDate,
lt: nextDate
}
},
_count: {
SentStatus_AIN: true
},
_avg: {
ResponseTime_AIN: true,
ProcessingTime_AIN: true
}
});
const activityAnalysis = await prisma.aINotification.findMany({
where: {
CreatedAt_AIN: {
gte: targetDate,
lt: nextDate
},
ActivityTypes_AIN: {
not: null
}
},
select: {
ActivityTypes_AIN: true,
GeneratedTitle_AIN: true
}
});
let totalAnalyzed = 0;
let totalSent = 0;
let totalDelivered = 0;
let totalFailed = 0;
let avgResponseTime = 0;
let avgProcessingTime = 0;
const statusCounts = {};
dailyStats.forEach(stat => {
const status = stat.SentStatus_AIN;
const count = stat._count.SentStatus_AIN;
statusCounts[status] = count;
totalAnalyzed += count;
if (status === 'sent') totalSent += count;
if (status === 'delivered') totalDelivered += count;
if (status === 'failed') totalFailed += count;
if (stat._avg.ResponseTime_AIN) {
avgResponseTime += stat._avg.ResponseTime_AIN * count;
}
if (stat._avg.ProcessingTime_AIN) {
avgProcessingTime += stat._avg.ProcessingTime_AIN * count;
}
});
if (totalAnalyzed > 0) {
avgResponseTime = avgResponseTime / totalAnalyzed;
avgProcessingTime = avgProcessingTime / totalAnalyzed;
}
const activityTypeCount = {};
const titleCount = {};
activityAnalysis.forEach(record => {
try {
const activityTypes = JSON.parse(record.ActivityTypes_AIN || '[]');
activityTypes.forEach(type => {
activityTypeCount[type] = (activityTypeCount[type] || 0) + 1;
});
const title = record.GeneratedTitle_AIN;
if (title) {
titleCount[title] = (titleCount[title] || 0) + 1;
}
} catch (e) {
}
});
const topActivityTypes = Object.entries(activityTypeCount)
.sort(([,a], [,b]) => b - a)
.slice(0, 10)
.map(([type, count]) => ({ type, count }));
const popularTitles = Object.entries(titleCount)
.sort(([,a], [,b]) => b - a)
.slice(0, 10)
.map(([title, count]) => ({ title, count }));
const deliveryRate = totalSent > 0 ? (totalDelivered / totalSent) * 100 : 0;
await prisma.aINotificationAnalytics.upsert({
where: {
Date_ANA: targetDate
},
update: {
TotalAnalyzed_ANA: totalAnalyzed,
TotalGenerated_ANA: totalAnalyzed,
TotalSent_ANA: totalSent,
TotalDelivered_ANA: totalDelivered,
TotalFailed_ANA: totalFailed,
DeliveryRate_ANA: deliveryRate,
AvgResponseTime_ANA: avgResponseTime,
AvgProcessingTime_ANA: avgProcessingTime,
TopActivityTypes_ANA: topActivityTypes,
PopularTitles_ANA: popularTitles,
ErrorBreakdown_ANA: statusCounts,
UpdatedAt_ANA: localTime(new Date())
},
create: {
Date_ANA: targetDate,
TotalAnalyzed_ANA: totalAnalyzed,
TotalGenerated_ANA: totalAnalyzed,
TotalSent_ANA: totalSent,
TotalDelivered_ANA: totalDelivered,
TotalFailed_ANA: totalFailed,
DeliveryRate_ANA: deliveryRate,
AvgResponseTime_ANA: avgResponseTime,
AvgProcessingTime_ANA: avgProcessingTime,
TopActivityTypes_ANA: topActivityTypes,
PopularTitles_ANA: popularTitles,
ErrorBreakdown_ANA: statusCounts
}
});
logger.info(`AI Notification analytics updated for ${targetDate.toISOString().split('T')[0]}`, {
totalAnalyzed,
totalSent,
deliveryRate: `${deliveryRate.toFixed(2)}%`
});
return {
success: true,
date: targetDate,
stats: {
totalAnalyzed,
totalSent,
totalDelivered,
totalFailed,
deliveryRate,
avgResponseTime,
avgProcessingTime
}
};
} catch (error) {
logger.error('Error updating daily analytics:', { error });
throw error;
}
}
async function getAINotificationAnalytics(options = {}) {
try {
const {
startDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000),
endDate = new Date(),
limit = 30
} = options;
const analytics = await prisma.aINotificationAnalytics.findMany({
where: {
Date_ANA: {
gte: startDate,
lte: endDate
}
},
orderBy: {
Date_ANA: 'desc'
},
take: limit
});
const recentNotifications = await prisma.aINotification.findMany({
where: {
CreatedAt_AIN: {
gte: startDate,
lte: endDate
}
},
orderBy: {
CreatedAt_AIN: 'desc'
},
take: 50
});
const totalAnalyzed = analytics.reduce((sum, a) => sum + a.TotalAnalyzed_ANA, 0);
const totalSent = analytics.reduce((sum, a) => sum + a.TotalSent_ANA, 0);
const totalDelivered = analytics.reduce((sum, a) => sum + a.TotalDelivered_ANA, 0);
const totalFailed = analytics.reduce((sum, a) => sum + a.TotalFailed_ANA, 0);
const avgDeliveryRate = analytics.length > 0
? analytics.reduce((sum, a) => sum + (a.DeliveryRate_ANA || 0), 0) / analytics.length
: 0;
const avgResponseTime = analytics.length > 0
? analytics.reduce((sum, a) => sum + (a.AvgResponseTime_ANA || 0), 0) / analytics.length
: 0;
return {
summary: {
totalAnalyzed,
totalSent,
totalDelivered,
totalFailed,
avgDeliveryRate: `${avgDeliveryRate.toFixed(2)}%`,
avgResponseTime: `${avgResponseTime.toFixed(0)}ms`,
successRate: totalAnalyzed > 0 ? `${((totalSent / totalAnalyzed) * 100).toFixed(2)}%` : '0%'
},
analytics,
recentNotifications: recentNotifications.slice(0, 20)
};
} catch (error) {
logger.error('Error getting AI notification analytics:', { error });
throw error;
}
}
module.exports = {
analyzeUserActivitiesForNotification,
sendNotificationToUser,
processActivitiesAndSendNotifications,
manualTriggerNotification,
updateDailyAnalytics,
getAINotificationAnalytics
};