import type { FastifyBaseLogger } from "fastify";
import { Prisma } from "@prisma/client";
import { consolidateCaseFacts, reprocessExistingDocumentWithAi, synthesizeClientKnowledgeBase } from "./intelligence.js";
import { createId } from "./id.js";
import { prisma } from "./prisma.js";

type RepositoryDocumentReprocessJobRow = {
  id: string;
  law_firm_id: string;
  client_id: string;
  case_id: string | null;
  created_by_user_id: string | null;
  status_code: string;
  stage_code: string;
  total_documents: number | bigint | string;
  processed_documents: number | bigint | string;
  succeeded_documents: number | bigint | string;
  failed_documents: number | bigint | string;
  extracted_field_count: number | bigint | string;
  synced_client_field_count: number | bigint | string;
  current_document_record_id: string | null;
  current_document_title: string | null;
  synthesis_status_code: string | null;
  synthesis_repository_item_id: string | null;
  synthesized_profile_fact_count: number | bigint | string;
  synthesized_knowledge_fact_count: number | bigint | string;
  error_message: string | null;
  started_at: Date | string | null;
  completed_at: Date | string | null;
  created_at: Date | string;
  updated_at: Date | string;
};

type RepositoryDocumentReprocessJobDocumentRow = {
  id: string;
  job_id: string;
  document_record_id: string;
  sort_order: number | bigint | string;
  title: string;
  document_type_code: string;
  status_code: string;
  extracted_field_count: number | bigint | string;
  synced_client_field_count: number | bigint | string;
  ai_run_id: string | null;
  extraction_id: string | null;
  summary_text: string | null;
  extracted_preview_json: string | null;
  error_message: string | null;
  started_at: Date | string | null;
  completed_at: Date | string | null;
  created_at: Date | string;
  updated_at: Date | string;
};

function sanitizeRepositoryReprocessErrorMessage(error: unknown) {
  const message =
    error instanceof Error ? error.message : "Document reprocessing failed unexpectedly";

  if (message.includes("Can't find end of central directory")) {
    return "This DOCX file is invalid, corrupted, or not a real DOCX archive.";
  }

  return message;
}

export type RepositoryDocumentReprocessJobSummary = {
  id: string;
  clientId: string;
  caseId: string | null;
  statusCode: string;
  stageCode: string;
  totalDocuments: number;
  processedDocuments: number;
  succeededDocuments: number;
  failedDocuments: number;
  remainingDocuments: number;
  extractedFieldCount: number;
  syncedClientFieldCount: number;
  currentDocumentRecordId: string | null;
  currentDocumentTitle: string | null;
  errorMessage: string | null;
  createdAt: string;
  updatedAt: string;
  startedAt: string | null;
  completedAt: string | null;
  synthesis: {
    statusCode: string | null;
    repositoryItemId: string | null;
    profileFactCount: number;
    knowledgeFactCount: number;
  } | null;
};

export type RepositoryDocumentReprocessJobDetail = RepositoryDocumentReprocessJobSummary & {
  documents: Array<{
    id: string;
    documentRecordId: string;
    sortOrder: number;
    title: string;
    documentTypeCode: string;
    statusCode: string;
    extractedFieldCount: number;
    syncedClientFieldCount: number;
    aiRunId: string | null;
    extractionId: string | null;
    summaryText: string | null;
    extractedPreview: Array<{
      label: string;
      value: string;
    }>;
    errorMessage: string | null;
    startedAt: string | null;
    completedAt: string | null;
  }>;
};

const activeRepositoryDocumentReprocessJobs = new Map<string, Promise<void>>();

class RepositoryDocumentReprocessPausedError extends Error {
  constructor() {
    super("__JOB_PAUSED__");
  }
}

function toNumber(value: number | bigint | string | null | undefined) {
  if (typeof value === "number") {
    return Number.isFinite(value) ? value : 0;
  }

  if (typeof value === "bigint") {
    return Number(value);
  }

  if (typeof value === "string" && value.trim()) {
    const parsed = Number(value);
    return Number.isFinite(parsed) ? parsed : 0;
  }

  return 0;
}

function toIsoString(value: Date | string | null | undefined) {
  if (!value) {
    return null;
  }

  if (value instanceof Date) {
    return value.toISOString();
  }

  const parsed = new Date(value);
  return Number.isNaN(parsed.getTime()) ? String(value) : parsed.toISOString();
}

function parseExtractedPreview(value: string | null | undefined) {
  if (!value) {
    return [];
  }

  try {
    const parsed = JSON.parse(value) as unknown;
    if (!Array.isArray(parsed)) {
      return [];
    }

    return parsed
      .map((entry) => {
        if (!entry || typeof entry !== "object") {
          return null;
        }

        const label = String((entry as { label?: unknown }).label ?? "").trim();
        const itemValue = String((entry as { value?: unknown }).value ?? "").trim();

        if (!label || !itemValue) {
          return null;
        }

        return {
          label,
          value: itemValue,
        };
      })
      .filter((entry): entry is { label: string; value: string } => Boolean(entry));
  } catch {
    return [];
  }
}

function buildRepositoryDocumentReprocessJobSummary(
  row: RepositoryDocumentReprocessJobRow,
): RepositoryDocumentReprocessJobSummary {
  const totalDocuments = toNumber(row.total_documents);
  const processedDocuments = toNumber(row.processed_documents);
  const succeededDocuments = toNumber(row.succeeded_documents);
  const failedDocuments = toNumber(row.failed_documents);

  return {
    id: row.id,
    clientId: row.client_id,
    caseId: row.case_id,
    statusCode: row.status_code,
    stageCode: row.stage_code,
    totalDocuments,
    processedDocuments,
    succeededDocuments,
    failedDocuments,
    remainingDocuments: Math.max(0, totalDocuments - processedDocuments),
    extractedFieldCount: toNumber(row.extracted_field_count),
    syncedClientFieldCount: toNumber(row.synced_client_field_count),
    currentDocumentRecordId: row.current_document_record_id,
    currentDocumentTitle: row.current_document_title,
    errorMessage: row.error_message,
    createdAt: toIsoString(row.created_at) ?? new Date().toISOString(),
    updatedAt: toIsoString(row.updated_at) ?? new Date().toISOString(),
    startedAt: toIsoString(row.started_at),
    completedAt: toIsoString(row.completed_at),
    synthesis: {
      statusCode: row.synthesis_status_code,
      repositoryItemId: row.synthesis_repository_item_id,
      profileFactCount: toNumber(row.synthesized_profile_fact_count),
      knowledgeFactCount: toNumber(row.synthesized_knowledge_fact_count),
    },
  };
}

function buildRepositoryDocumentReprocessJobDetail(
  row: RepositoryDocumentReprocessJobRow,
  documentRows: RepositoryDocumentReprocessJobDocumentRow[],
): RepositoryDocumentReprocessJobDetail {
  return {
    ...buildRepositoryDocumentReprocessJobSummary(row),
    documents: documentRows.map((item) => ({
      id: item.id,
      documentRecordId: item.document_record_id,
      sortOrder: toNumber(item.sort_order),
      title: item.title,
      documentTypeCode: item.document_type_code,
      statusCode: item.status_code,
      extractedFieldCount: toNumber(item.extracted_field_count),
      syncedClientFieldCount: toNumber(item.synced_client_field_count),
      aiRunId: item.ai_run_id,
      extractionId: item.extraction_id,
      summaryText: item.summary_text,
      extractedPreview: parseExtractedPreview(item.extracted_preview_json),
      errorMessage: item.error_message,
      startedAt: toIsoString(item.started_at),
      completedAt: toIsoString(item.completed_at),
    })),
  };
}

async function getRepositoryDocumentReprocessJobRow(lawFirmId: string, jobId: string) {
  const [row] = await prisma.$queryRaw<Array<RepositoryDocumentReprocessJobRow>>`
    SELECT
      id,
      law_firm_id,
      client_id,
      case_id,
      created_by_user_id,
      status_code,
      stage_code,
      total_documents,
      processed_documents,
      succeeded_documents,
      failed_documents,
      extracted_field_count,
      synced_client_field_count,
      current_document_record_id,
      current_document_title,
      synthesis_status_code,
      synthesis_repository_item_id,
      synthesized_profile_fact_count,
      synthesized_knowledge_fact_count,
      error_message,
      started_at,
      completed_at,
      created_at,
      updated_at
    FROM repository_document_reprocess_jobs
    WHERE id = ${jobId}
      AND law_firm_id = ${lawFirmId}
    LIMIT 1
  `;

  return row ?? null;
}

async function getRepositoryDocumentReprocessJobRowById(jobId: string) {
  const [row] = await prisma.$queryRaw<Array<RepositoryDocumentReprocessJobRow>>`
    SELECT
      id,
      law_firm_id,
      client_id,
      case_id,
      created_by_user_id,
      status_code,
      stage_code,
      total_documents,
      processed_documents,
      succeeded_documents,
      failed_documents,
      extracted_field_count,
      synced_client_field_count,
      current_document_record_id,
      current_document_title,
      synthesis_status_code,
      synthesis_repository_item_id,
      synthesized_profile_fact_count,
      synthesized_knowledge_fact_count,
      error_message,
      started_at,
      completed_at,
      created_at,
      updated_at
    FROM repository_document_reprocess_jobs
    WHERE id = ${jobId}
    LIMIT 1
  `;

  return row ?? null;
}

async function ensureRepositoryDocumentReprocessJobCanRun(jobId: string) {
  const row = await getRepositoryDocumentReprocessJobRowById(jobId);
  if (!row) {
    return null;
  }

  if (row.status_code === "paused") {
    await updateRepositoryDocumentReprocessJob({
      jobId: row.id,
      stageCode: "paused",
      errorMessage: null,
      markStarted: true,
    });
    return null;
  }

  return row;
}

async function getRepositoryDocumentReprocessJobDocumentRows(jobId: string) {
  return prisma.$queryRaw<Array<RepositoryDocumentReprocessJobDocumentRow>>`
    SELECT
      id,
      job_id,
      document_record_id,
      sort_order,
      title,
      document_type_code,
      status_code,
      extracted_field_count,
      synced_client_field_count,
      ai_run_id,
      extraction_id,
      summary_text,
      extracted_preview_json,
      error_message,
      started_at,
      completed_at,
      created_at,
      updated_at
    FROM repository_document_reprocess_job_documents
    WHERE job_id = ${jobId}
    ORDER BY sort_order ASC
  `;
}

async function updateRepositoryDocumentReprocessJob(input: {
  jobId: string;
  statusCode?: string;
  stageCode?: string;
  totalDocuments?: number;
  processedDocuments?: number;
  succeededDocuments?: number;
  failedDocuments?: number;
  extractedFieldCount?: number;
  syncedClientFieldCount?: number;
  currentDocumentRecordId?: string | null;
  currentDocumentTitle?: string | null;
  synthesisStatusCode?: string | null;
  synthesisRepositoryItemId?: string | null;
  synthesizedProfileFactCount?: number;
  synthesizedKnowledgeFactCount?: number;
  errorMessage?: string | null;
  markStarted?: boolean;
  markCompleted?: boolean;
}) {
  const assignments: Prisma.Sql[] = [Prisma.sql`updated_at = CURRENT_TIMESTAMP`];

  if (input.statusCode !== undefined) {
    assignments.push(Prisma.sql`status_code = ${input.statusCode}`);
  }
  if (input.stageCode !== undefined) {
    assignments.push(Prisma.sql`stage_code = ${input.stageCode}`);
  }
  if (input.totalDocuments !== undefined) {
    assignments.push(Prisma.sql`total_documents = ${Math.max(0, input.totalDocuments)}`);
  }
  if (input.processedDocuments !== undefined) {
    assignments.push(Prisma.sql`processed_documents = ${Math.max(0, input.processedDocuments)}`);
  }
  if (input.succeededDocuments !== undefined) {
    assignments.push(Prisma.sql`succeeded_documents = ${Math.max(0, input.succeededDocuments)}`);
  }
  if (input.failedDocuments !== undefined) {
    assignments.push(Prisma.sql`failed_documents = ${Math.max(0, input.failedDocuments)}`);
  }
  if (input.extractedFieldCount !== undefined) {
    assignments.push(Prisma.sql`extracted_field_count = ${Math.max(0, input.extractedFieldCount)}`);
  }
  if (input.syncedClientFieldCount !== undefined) {
    assignments.push(
      Prisma.sql`synced_client_field_count = ${Math.max(0, input.syncedClientFieldCount)}`,
    );
  }
  if (input.currentDocumentRecordId !== undefined) {
    assignments.push(Prisma.sql`current_document_record_id = ${input.currentDocumentRecordId}`);
  }
  if (input.currentDocumentTitle !== undefined) {
    assignments.push(Prisma.sql`current_document_title = ${input.currentDocumentTitle}`);
  }
  if (input.synthesisStatusCode !== undefined) {
    assignments.push(Prisma.sql`synthesis_status_code = ${input.synthesisStatusCode}`);
  }
  if (input.synthesisRepositoryItemId !== undefined) {
    assignments.push(Prisma.sql`synthesis_repository_item_id = ${input.synthesisRepositoryItemId}`);
  }
  if (input.synthesizedProfileFactCount !== undefined) {
    assignments.push(
      Prisma.sql`synthesized_profile_fact_count = ${Math.max(0, input.synthesizedProfileFactCount)}`,
    );
  }
  if (input.synthesizedKnowledgeFactCount !== undefined) {
    assignments.push(
      Prisma.sql`synthesized_knowledge_fact_count = ${Math.max(0, input.synthesizedKnowledgeFactCount)}`,
    );
  }
  if (input.errorMessage !== undefined) {
    assignments.push(Prisma.sql`error_message = ${input.errorMessage}`);
  }
  if (input.markStarted) {
    assignments.push(Prisma.sql`started_at = COALESCE(started_at, CURRENT_TIMESTAMP)`);
  }
  if (input.markCompleted) {
    assignments.push(Prisma.sql`completed_at = CURRENT_TIMESTAMP`);
  }

  await prisma.$executeRaw(
    Prisma.sql`
      UPDATE repository_document_reprocess_jobs
      SET ${Prisma.join(assignments, ", ")}
      WHERE id = ${input.jobId}
    `,
  );
}

async function updateRepositoryDocumentReprocessJobDocument(input: {
  jobDocumentId: string;
  statusCode?: string;
  extractedFieldCount?: number;
  syncedClientFieldCount?: number;
  aiRunId?: string | null;
  extractionId?: string | null;
  summaryText?: string | null;
  extractedPreview?: Array<{ label: string; value: string }>;
  errorMessage?: string | null;
  markStarted?: boolean;
  markCompleted?: boolean;
}) {
  const assignments: Prisma.Sql[] = [Prisma.sql`updated_at = CURRENT_TIMESTAMP`];

  if (input.statusCode !== undefined) {
    assignments.push(Prisma.sql`status_code = ${input.statusCode}`);
  }
  if (input.extractedFieldCount !== undefined) {
    assignments.push(Prisma.sql`extracted_field_count = ${Math.max(0, input.extractedFieldCount)}`);
  }
  if (input.syncedClientFieldCount !== undefined) {
    assignments.push(
      Prisma.sql`synced_client_field_count = ${Math.max(0, input.syncedClientFieldCount)}`,
    );
  }
  if (input.aiRunId !== undefined) {
    assignments.push(Prisma.sql`ai_run_id = ${input.aiRunId}`);
  }
  if (input.extractionId !== undefined) {
    assignments.push(Prisma.sql`extraction_id = ${input.extractionId}`);
  }
  if (input.summaryText !== undefined) {
    assignments.push(Prisma.sql`summary_text = ${input.summaryText}`);
  }
  if (input.extractedPreview !== undefined) {
    assignments.push(Prisma.sql`extracted_preview_json = ${JSON.stringify(input.extractedPreview ?? [])}`);
  }
  if (input.errorMessage !== undefined) {
    assignments.push(Prisma.sql`error_message = ${input.errorMessage}`);
  }
  if (input.markStarted) {
    assignments.push(Prisma.sql`started_at = COALESCE(started_at, CURRENT_TIMESTAMP)`);
  }
  if (input.markCompleted) {
    assignments.push(Prisma.sql`completed_at = CURRENT_TIMESTAMP`);
  }

  await prisma.$executeRaw(
    Prisma.sql`
      UPDATE repository_document_reprocess_job_documents
      SET ${Prisma.join(assignments, ", ")}
      WHERE id = ${input.jobDocumentId}
    `,
  );
}

async function processRepositoryDocumentReprocessJobInternal(input: {
  jobId: string;
  logger?: FastifyBaseLogger | null;
}) {
  const row = await ensureRepositoryDocumentReprocessJobCanRun(input.jobId);
  if (!row) {
    return;
  }

  if (!row.created_by_user_id) {
    throw new Error("Repository reprocess job is missing the actor user");
  }

  const documents = await getRepositoryDocumentReprocessJobDocumentRows(row.id);

  await updateRepositoryDocumentReprocessJob({
    jobId: row.id,
    statusCode: "processing",
    stageCode: "processing_documents",
    totalDocuments: documents.length,
    processedDocuments: 0,
    succeededDocuments: 0,
    failedDocuments: 0,
    extractedFieldCount: 0,
    syncedClientFieldCount: 0,
    currentDocumentRecordId: null,
    currentDocumentTitle: null,
    synthesisStatusCode: null,
    synthesisRepositoryItemId: null,
    synthesizedProfileFactCount: 0,
    synthesizedKnowledgeFactCount: 0,
    errorMessage: null,
    markStarted: true,
  });

  let processedDocuments = 0;
  let succeededDocuments = 0;
  let failedDocuments = 0;
  let extractedFieldCount = 0;
  let syncedClientFieldCount = 0;
  const fileErrors: string[] = [];

  try {
    for (const [index, item] of documents.entries()) {
      const refreshed = await ensureRepositoryDocumentReprocessJobCanRun(row.id);
      if (!refreshed) {
        throw new RepositoryDocumentReprocessPausedError();
      }

      const progressPercent = documents.length
        ? Math.max(5, Math.min(88, Math.round((index / documents.length) * 83 + 5)))
        : 5;

      await updateRepositoryDocumentReprocessJob({
        jobId: row.id,
        statusCode: "processing",
        stageCode: "processing_documents",
        processedDocuments,
        succeededDocuments,
        failedDocuments,
        extractedFieldCount,
        syncedClientFieldCount,
        currentDocumentRecordId: item.document_record_id,
        currentDocumentTitle: item.title,
        errorMessage: fileErrors.length ? fileErrors.slice(-1)[0] : null,
        markStarted: true,
      });

      await updateRepositoryDocumentReprocessJobDocument({
        jobDocumentId: item.id,
        statusCode: "processing",
        errorMessage: null,
        markStarted: true,
      });

      try {
        const result = await reprocessExistingDocumentWithAi({
          lawFirmId: row.law_firm_id,
          clientId: row.client_id,
          caseId: row.case_id,
          actorUserId: row.created_by_user_id,
          documentRecordId: item.document_record_id,
        });

        const preview = [
          ...result.extractedPreview,
          ...result.importantFacts.map((fact) => ({
            label: fact.label,
            value: fact.value,
          })),
        ].slice(0, 12);

        processedDocuments += 1;
        succeededDocuments += 1;
        extractedFieldCount += Object.keys(result.extractedData).length;
        syncedClientFieldCount += result.syncedClientFieldKeys.length;

        await updateRepositoryDocumentReprocessJobDocument({
          jobDocumentId: item.id,
          statusCode: "completed",
          extractedFieldCount: Object.keys(result.extractedData).length,
          syncedClientFieldCount: result.syncedClientFieldKeys.length,
          aiRunId: result.aiRunId,
          extractionId: result.extractionId,
          summaryText: result.summaryText,
          extractedPreview: preview,
          errorMessage: null,
          markStarted: true,
          markCompleted: true,
        });

        await updateRepositoryDocumentReprocessJob({
          jobId: row.id,
          statusCode: "processing",
          stageCode: "processing_documents",
          processedDocuments,
          succeededDocuments,
          failedDocuments,
          extractedFieldCount,
          syncedClientFieldCount,
          currentDocumentRecordId: item.document_record_id,
          currentDocumentTitle: item.title,
          errorMessage: fileErrors.length ? fileErrors.slice(-1)[0] : null,
          markStarted: true,
        });
      } catch (error) {
        const message = sanitizeRepositoryReprocessErrorMessage(error);

        processedDocuments += 1;
        failedDocuments += 1;
        fileErrors.push(`${item.title}: ${message}`);

        await updateRepositoryDocumentReprocessJobDocument({
          jobDocumentId: item.id,
          statusCode: "failed",
          extractedFieldCount: 0,
          syncedClientFieldCount: 0,
          errorMessage: message,
          markStarted: true,
          markCompleted: true,
        });

        await updateRepositoryDocumentReprocessJob({
          jobId: row.id,
          statusCode: "processing",
          stageCode: "processing_documents",
          processedDocuments,
          succeededDocuments,
          failedDocuments,
          extractedFieldCount,
          syncedClientFieldCount,
          currentDocumentRecordId: item.document_record_id,
          currentDocumentTitle: item.title,
          errorMessage: fileErrors.slice(-1)[0] ?? null,
          markStarted: true,
        });
      }

      const refreshedAfterDocument = await ensureRepositoryDocumentReprocessJobCanRun(row.id);
      if (!refreshedAfterDocument) {
        throw new RepositoryDocumentReprocessPausedError();
      }

      await updateRepositoryDocumentReprocessJob({
        jobId: row.id,
        statusCode: "processing",
        stageCode: "processing_documents",
        processedDocuments,
        succeededDocuments,
        failedDocuments,
        extractedFieldCount,
        syncedClientFieldCount,
        currentDocumentRecordId: item.document_record_id,
        currentDocumentTitle: item.title,
        errorMessage: fileErrors.length ? fileErrors.slice(-1)[0] : null,
        markStarted: true,
      });

      if (progressPercent) {
        await updateRepositoryDocumentReprocessJob({
          jobId: row.id,
          statusCode: "processing",
          stageCode: "processing_documents",
          processedDocuments,
          succeededDocuments,
          failedDocuments,
          extractedFieldCount,
          syncedClientFieldCount,
          currentDocumentRecordId: item.document_record_id,
          currentDocumentTitle: item.title,
          errorMessage: fileErrors.length ? fileErrors.slice(-1)[0] : null,
          markStarted: true,
        });
      }
    }

    const refreshedBeforeSynthesis = await ensureRepositoryDocumentReprocessJobCanRun(row.id);
    if (!refreshedBeforeSynthesis) {
      throw new RepositoryDocumentReprocessPausedError();
    }

    await updateRepositoryDocumentReprocessJob({
      jobId: row.id,
      statusCode: "processing",
      stageCode: "synthesizing_client_knowledge",
      processedDocuments,
      succeededDocuments,
      failedDocuments,
      extractedFieldCount,
      syncedClientFieldCount,
      currentDocumentRecordId: null,
      currentDocumentTitle: null,
      synthesisStatusCode: "processing",
      errorMessage: fileErrors.length ? fileErrors.slice(-1)[0] : null,
      markStarted: true,
    });

    let synthesizedProfileFactCount = 0;
    let synthesizedKnowledgeFactCount = 0;
    let synthesisRepositoryItemId: string | null = null;

    try {
      const refreshedBeforeSynthesisCall = await ensureRepositoryDocumentReprocessJobCanRun(row.id);
      if (!refreshedBeforeSynthesisCall) {
        throw new RepositoryDocumentReprocessPausedError();
      }

      const synthesized = await synthesizeClientKnowledgeBase({
        lawFirmId: row.law_firm_id,
        clientId: row.client_id,
        caseId: row.case_id,
        actorUserId: row.created_by_user_id,
        formName: "repository document reprocessing",
      });

      const refreshedAfterSynthesis = await ensureRepositoryDocumentReprocessJobCanRun(row.id);
      if (!refreshedAfterSynthesis) {
        throw new RepositoryDocumentReprocessPausedError();
      }

      synthesizedProfileFactCount = synthesized?.profileFacts.length ?? 0;
      synthesizedKnowledgeFactCount = synthesized?.knowledgeFacts.length ?? 0;
      synthesisRepositoryItemId = synthesized?.repositoryItemId ?? null;

      await updateRepositoryDocumentReprocessJob({
        jobId: row.id,
        statusCode: "processing",
        stageCode: row.case_id ? "consolidating_case_facts" : "finalizing",
        processedDocuments,
        succeededDocuments,
        failedDocuments,
        extractedFieldCount,
        syncedClientFieldCount,
        currentDocumentRecordId: null,
        currentDocumentTitle: null,
        synthesisStatusCode: "completed",
        synthesisRepositoryItemId,
        synthesizedProfileFactCount,
        synthesizedKnowledgeFactCount,
        errorMessage: fileErrors.length ? fileErrors.slice(-1)[0] : null,
        markStarted: true,
      });
    } catch (error) {
      fileErrors.push(
        `Client knowledge synthesis: ${
          error instanceof Error ? error.message : "Unexpected synthesis error"
        }`,
      );

      await updateRepositoryDocumentReprocessJob({
        jobId: row.id,
        statusCode: "processing",
        stageCode: row.case_id ? "consolidating_case_facts" : "finalizing",
        processedDocuments,
        succeededDocuments,
        failedDocuments,
        extractedFieldCount,
        syncedClientFieldCount,
        currentDocumentRecordId: null,
        currentDocumentTitle: null,
        synthesisStatusCode: "failed",
        synthesisRepositoryItemId: null,
        synthesizedProfileFactCount: 0,
        synthesizedKnowledgeFactCount: 0,
        errorMessage: fileErrors.slice(-1)[0] ?? null,
        markStarted: true,
      });
    }

    if (row.case_id) {
      try {
        const refreshedBeforeConsolidation = await ensureRepositoryDocumentReprocessJobCanRun(row.id);
        if (!refreshedBeforeConsolidation) {
          throw new RepositoryDocumentReprocessPausedError();
        }

        await consolidateCaseFacts({
          lawFirmId: row.law_firm_id,
          caseId: row.case_id,
          actorUserId: row.created_by_user_id,
        });
      } catch (error) {
        fileErrors.push(
          `Case fact consolidation: ${
            error instanceof Error ? error.message : "Unexpected consolidation error"
          }`,
        );
      }
    }

    const completedStatus = fileErrors.length ? "completed_with_errors" : "completed";

    await updateRepositoryDocumentReprocessJob({
      jobId: row.id,
      statusCode: completedStatus,
      stageCode: "completed",
      processedDocuments,
      succeededDocuments,
      failedDocuments,
      extractedFieldCount,
      syncedClientFieldCount,
      currentDocumentRecordId: null,
      currentDocumentTitle: null,
      synthesizedProfileFactCount,
      synthesizedKnowledgeFactCount,
      synthesisRepositoryItemId,
      errorMessage: fileErrors.length ? fileErrors.join("\n") : null,
      markStarted: true,
      markCompleted: true,
    });

    input.logger?.info(
      { jobId: row.id, clientId: row.client_id, processedDocuments, failedDocuments },
      "Repository document AI reprocess job completed",
    );
  } catch (error) {
    if (error instanceof RepositoryDocumentReprocessPausedError) {
      await updateRepositoryDocumentReprocessJob({
        jobId: row.id,
        statusCode: "paused",
        stageCode: "paused",
        processedDocuments,
        succeededDocuments,
        failedDocuments,
        extractedFieldCount,
        syncedClientFieldCount,
        currentDocumentRecordId: null,
        currentDocumentTitle: null,
        errorMessage: null,
        markStarted: true,
      });

      input.logger?.info(
        { jobId: row.id, clientId: row.client_id },
        "Repository document AI reprocess job paused",
      );
      return;
    }

    const message = error instanceof Error ? error.message : "Unexpected error";

    await updateRepositoryDocumentReprocessJob({
      jobId: row.id,
      statusCode: "failed",
      stageCode: "failed",
      processedDocuments,
      succeededDocuments,
      failedDocuments,
      extractedFieldCount,
      syncedClientFieldCount,
      currentDocumentRecordId: null,
      currentDocumentTitle: null,
      errorMessage: message,
      markStarted: true,
      markCompleted: true,
    });

    input.logger?.error(
      { error, jobId: row.id, clientId: row.client_id },
      "Repository document AI reprocess job failed",
    );
  }
}

export async function createRepositoryDocumentReprocessJob(input: {
  lawFirmId: string;
  clientId: string;
  caseId?: string | null;
  actorUserId: string;
}) {
  const [existing] = await prisma.$queryRaw<Array<{ id: string }>>`
    SELECT id
    FROM repository_document_reprocess_jobs
    WHERE law_firm_id = ${input.lawFirmId}
      AND client_id = ${input.clientId}
      AND status_code IN ('queued', 'processing')
    ORDER BY created_at DESC
    LIMIT 1
  `;

  if (existing?.id) {
    return existing.id;
  }

  const documents = await prisma.$queryRaw<
    Array<{
      id: string;
      title: string;
      document_type_code: string;
      created_at: Date;
    }>
  >`
    SELECT id, title, document_type_code, created_at
    FROM document_records
    WHERE law_firm_id = ${input.lawFirmId}
      AND client_id = ${input.clientId}
    ORDER BY created_at ASC, id ASC
  `;

  if (!documents.length) {
    throw new Error("This client has no repository documents available for AI reprocessing");
  }

  const jobId = createId();
  await prisma.$executeRaw`
    INSERT INTO repository_document_reprocess_jobs (
      id,
      law_firm_id,
      client_id,
      case_id,
      created_by_user_id,
      status_code,
      stage_code,
      total_documents,
      processed_documents,
      succeeded_documents,
      failed_documents,
      extracted_field_count,
      synced_client_field_count,
      created_at,
      updated_at
    ) VALUES (
      ${jobId},
      ${input.lawFirmId},
      ${input.clientId},
      ${input.caseId ?? null},
      ${input.actorUserId},
      'queued',
      'queued',
      ${documents.length},
      0,
      0,
      0,
      0,
      0,
      CURRENT_TIMESTAMP,
      CURRENT_TIMESTAMP
    )
  `;

  for (const [index, document] of documents.entries()) {
    await prisma.$executeRaw`
      INSERT INTO repository_document_reprocess_job_documents (
        id,
        job_id,
        document_record_id,
        sort_order,
        title,
        document_type_code,
        status_code,
        created_at,
        updated_at
      ) VALUES (
        ${createId()},
        ${jobId},
        ${document.id},
        ${index},
        ${document.title},
        ${document.document_type_code},
        'queued',
        CURRENT_TIMESTAMP,
        CURRENT_TIMESTAMP
      )
    `;
  }

  return jobId;
}

export async function startRepositoryDocumentReprocessJob(input: {
  jobId: string;
  logger?: FastifyBaseLogger | null;
}) {
  if (activeRepositoryDocumentReprocessJobs.has(input.jobId)) {
    return;
  }

  const task = processRepositoryDocumentReprocessJobInternal(input)
    .catch(async (error) => {
      const message = error instanceof Error ? error.message : "Unexpected error";
      await updateRepositoryDocumentReprocessJob({
        jobId: input.jobId,
        statusCode: "failed",
        stageCode: "failed",
        errorMessage: message,
        markStarted: true,
        markCompleted: true,
      });
      input.logger?.error(
        { error, jobId: input.jobId },
        "Unable to process repository document AI reprocess job",
      );
    })
    .finally(() => {
      activeRepositoryDocumentReprocessJobs.delete(input.jobId);
    });

  activeRepositoryDocumentReprocessJobs.set(input.jobId, task);
}

export async function pauseRepositoryDocumentReprocessJob(input: {
  lawFirmId: string;
  jobId: string;
}) {
  const row = await getRepositoryDocumentReprocessJobRow(input.lawFirmId, input.jobId);
  if (!row) {
    return null;
  }

  if (row.status_code === "completed" || row.status_code === "failed") {
    const documentRows = await getRepositoryDocumentReprocessJobDocumentRows(row.id);
    return buildRepositoryDocumentReprocessJobDetail(row, documentRows);
  }

  await updateRepositoryDocumentReprocessJob({
    jobId: row.id,
    statusCode: "paused",
    stageCode: "paused",
    errorMessage: null,
    markStarted: true,
  });

  const refreshedRow = await getRepositoryDocumentReprocessJobRow(input.lawFirmId, input.jobId);
  if (!refreshedRow) {
    return null;
  }
  const documentRows = await getRepositoryDocumentReprocessJobDocumentRows(refreshedRow.id);
  return buildRepositoryDocumentReprocessJobDetail(refreshedRow, documentRows);
}

export async function resumeRepositoryDocumentReprocessJob(input: {
  lawFirmId: string;
  jobId: string;
  logger?: FastifyBaseLogger | null;
}) {
  const row = await getRepositoryDocumentReprocessJobRow(input.lawFirmId, input.jobId);
  if (!row) {
    return null;
  }

  if (row.status_code === "completed" || row.status_code === "failed") {
    const documentRows = await getRepositoryDocumentReprocessJobDocumentRows(row.id);
    return buildRepositoryDocumentReprocessJobDetail(row, documentRows);
  }

  await updateRepositoryDocumentReprocessJob({
    jobId: row.id,
    statusCode: "queued",
    stageCode: "queued",
    errorMessage: null,
    markStarted: true,
  });

  await startRepositoryDocumentReprocessJob({
    jobId: row.id,
    logger: input.logger,
  });

  const refreshedRow = await getRepositoryDocumentReprocessJobRow(input.lawFirmId, input.jobId);
  if (!refreshedRow) {
    return null;
  }
  const documentRows = await getRepositoryDocumentReprocessJobDocumentRows(refreshedRow.id);
  return buildRepositoryDocumentReprocessJobDetail(refreshedRow, documentRows);
}

export async function getRepositoryDocumentReprocessJobDetail(input: {
  lawFirmId: string;
  jobId: string;
  logger?: FastifyBaseLogger | null;
}) {
  const row = await getRepositoryDocumentReprocessJobRow(input.lawFirmId, input.jobId);
  if (!row) {
    return null;
  }

  if (
    (row.status_code === "queued" || row.status_code === "processing") &&
    !activeRepositoryDocumentReprocessJobs.has(row.id)
  ) {
    await startRepositoryDocumentReprocessJob({
      jobId: row.id,
      logger: input.logger,
    });
  }

  const refreshedRow = await getRepositoryDocumentReprocessJobRow(input.lawFirmId, input.jobId);
  if (!refreshedRow) {
    return null;
  }

  const documentRows = await getRepositoryDocumentReprocessJobDocumentRows(refreshedRow.id);
  return buildRepositoryDocumentReprocessJobDetail(refreshedRow, documentRows);
}

export async function listRepositoryDocumentReprocessJobs(input: {
  lawFirmId: string;
  clientId?: string | null;
  caseId?: string | null;
  limit?: number;
  logger?: FastifyBaseLogger | null;
}) {
  const limit = Math.max(1, Math.min(input.limit ?? 6, 12));
  const rows = await prisma.$queryRaw<Array<RepositoryDocumentReprocessJobRow>>`
    SELECT
      id,
      law_firm_id,
      client_id,
      case_id,
      created_by_user_id,
      status_code,
      stage_code,
      total_documents,
      processed_documents,
      succeeded_documents,
      failed_documents,
      extracted_field_count,
      synced_client_field_count,
      current_document_record_id,
      current_document_title,
      synthesis_status_code,
      synthesis_repository_item_id,
      synthesized_profile_fact_count,
      synthesized_knowledge_fact_count,
      error_message,
      started_at,
      completed_at,
      created_at,
      updated_at
    FROM repository_document_reprocess_jobs
    WHERE law_firm_id = ${input.lawFirmId}
      AND (${input.clientId ?? null} IS NULL OR client_id = ${input.clientId ?? null})
      AND (${input.caseId ?? null} IS NULL OR case_id = ${input.caseId ?? null})
    ORDER BY created_at DESC
    LIMIT ${limit}
  `;

  const results: RepositoryDocumentReprocessJobSummary[] = [];
  for (const row of rows) {
    if (
      (row.status_code === "queued" || row.status_code === "processing") &&
      !activeRepositoryDocumentReprocessJobs.has(row.id)
    ) {
      await startRepositoryDocumentReprocessJob({
        jobId: row.id,
        logger: input.logger,
      });
    }

    results.push(buildRepositoryDocumentReprocessJobSummary(row));
  }

  return results;
}

export async function resumePendingRepositoryDocumentReprocessJobs(
  logger?: FastifyBaseLogger | null,
) {
  const jobs = await prisma.$queryRaw<Array<{ id: string }>>`
    SELECT id
    FROM repository_document_reprocess_jobs
    WHERE status_code IN ('queued', 'processing')
  `;

  for (const job of jobs) {
    if (!activeRepositoryDocumentReprocessJobs.has(job.id)) {
      await startRepositoryDocumentReprocessJob({
        jobId: job.id,
        logger,
      });
    }
  }
}
