import { inject, Injectable } from '@angular/core';
import * as Sentry from '@sentry/browser';
import { KeycloakService } from 'keycloak-angular';
import { RewriteStreamResult } from 'rio-models';
import { from, map, Observable, reduce, switchMap } from 'rxjs';
import {
  DoneStreamResponse,
  ErrorStreamResponse,
  StreamResponse,
  StreamTextChainBodyRequest,
  StreamTextPromptCreationBodyRequest,
  StreamTextUserViewBodyRequest,
} from '../models/stream';
import { API_PREFIX } from '../utils/injection-tokens';
import { RESOURCES } from './resources';

@Injectable({ providedIn: 'root' })
export class RewriteStreamService {
  API_PREFIX = inject(API_PREFIX);
  keycloak = inject(KeycloakService);

  generateOutputStream(
    inputText: string,
    promptId: string,
    organizationId: string,
    textLength: number = undefined,
  ): Observable<RewriteStreamResult> {
    return this.createStream(organizationId, {
      text: inputText,
      prompt_id: promptId,
      // undefined is removed from request payload
      text_length: textLength == null ? undefined : textLength,
    });
  }

  generateOutputStreamForPromptCreation(
    inputText: string,
    promptText: string,
    organizationId: string,
    textLength: number,
    quotePreservation: boolean,
  ): Observable<RewriteStreamResult> {
    return this.createStream(organizationId, {
      text: inputText,
      prompt_text: promptText,
      // undefined is removed from request payload
      text_length: textLength == null ? undefined : textLength,
      quote_preservation:
        quotePreservation == null ? undefined : quotePreservation,
    });
  }

  generateOutputChainStream(
    inputText: string,
    promptId: string,
    organizationId: string,
  ): Observable<string> {
    return this.createChainStream(organizationId, {
      text: inputText,
      prompt_id: promptId,
    });
  }

  private createStream(
    organizationId: string,
    body: StreamTextPromptCreationBodyRequest | StreamTextUserViewBodyRequest,
  ): Observable<RewriteStreamResult> {
    const url = `${this.API_PREFIX}${RESOURCES.REWRITE.rewriteStream(
      organizationId,
    )}`;

    return from(this.keycloak.getToken()).pipe(
      switchMap(token => {
        const requestOptions = this.createRequestOptions(token, body);

        return this.fetchStream(url, requestOptions);
      }),
    );
  }

  private createChainStream(
    organizationId: string,
    body: StreamTextChainBodyRequest,
  ): Observable<string> {
    const url = `${this.API_PREFIX}${RESOURCES.REWRITE.rewriteStream(
      organizationId,
    )}`;

    return from(this.keycloak.getToken()).pipe(
      switchMap(token => {
        const requestOptions = this.createRequestOptions(token, body);

        return this.fetchStream(url, requestOptions).pipe(
          reduce((acc, item) => {
            if (item.event === 'message') {
              acc = item.data.fullText;
            }

            if (item.event === 'error') {
              console.error(item.data.fullText);
              return 'error';
            }

            return acc;
          }, ''),
        );
      }),
    );
  }

  private createRequestOptions(
    token: string,
    body:
      | StreamTextChainBodyRequest
      | StreamTextPromptCreationBodyRequest
      | StreamTextUserViewBodyRequest,
  ): RequestInit {
    return {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        Authorization: `Bearer ${token}`,
      },
      body: JSON.stringify(body),
    };
  }

  private fetchStream(url: string, requestOptions: RequestInit) {
    return from(fetch(url, requestOptions)).pipe(
      switchMap(response => {
        if (!response.ok) {
          throw new Error(`HTTP error! status: ${response.status}`);
        }

        return streamToObservable(response).pipe(
          map(item => {
            if (item.event === 'error') {
              return {
                data: {
                  fullText: item.data.text,
                },
                event: item.event,
              };
            }

            if (item.event === 'done') {
              return {
                event: item.event,
              };
            }

            return {
              data: {
                fullText: item.data.full_text,
              },
              event: item.event,
            };
          }),
        );
      }),
    );
  }
}

// https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/read#example_2_-_handling_text_line_by_line
export async function* makeStreamTextIterator(response: Response) {
  const utf8Decoder = new TextDecoder('utf-8');
  // let response = await fetch(fileURL);
  const reader = response.body.getReader();
  let { value, done: readerDone } = await reader.read();
  //we need this tertiary operator because the last chunk value is undefined
  let chunk = value ? utf8Decoder.decode(value, { stream: true }) : '';

  const re = /\r\n|\n|\r/gm;
  let startIndex = 0;

  for (;;) {
    const result = re.exec(chunk);
    if (!result) {
      if (readerDone) {
        break;
      }
      const remainder = chunk.substr(startIndex);
      ({ value, done: readerDone } = await reader.read());
      chunk =
        remainder + (value ? utf8Decoder.decode(value, { stream: true }) : '');
      startIndex = re.lastIndex = 0;
      continue;
    }
    yield chunk.substring(startIndex, result.index);
    startIndex = re.lastIndex;
  }
  if (startIndex < chunk.length) {
    // last line didn't end in a newline char
    yield chunk.substr(startIndex);
  }
}

export function streamToObservable(
  response: Response,
): Observable<StreamResponse | DoneStreamResponse | ErrorStreamResponse> {
  return new Observable<StreamResponse>(observer => {
    (async () => {
      try {
        for await (const line of makeStreamTextIterator(response)) {
          if (line.trim() !== '') {
            const obj = JSON.parse(line);
            observer.next(obj);
            if (obj.event === 'done') {
              observer.complete();
            }

            if (obj.event === 'error') {
              observer.error(obj);
            }
          }
        }
      } catch (error) {
        Sentry.captureMessage('Stream Error', Sentry.Severity.Warning);
        observer.error(error);
      }
    })();
  });
}
