import { delay } from 'redux-saga';
import { call, put, takeLatest, all } from 'redux-saga/effects';

import { dagState } from 'constantsBase';
import { Airflow } from 'utils/copilotAPI';
import {
  FETCH_REPORT,
  AF_NUM_RETRIES,
  AF_POLL_DELAY_TIME,
  AF_RETRY_DELAY_TIME,
} from './constants';

import { fetchDagStatusRunning } from './actions';

export function* pollDagStatus(
  dagId,
  runId,
  taskId,
  actionOnFinish,
  actionOnDisconnect,
  actionOnRecovery,
  actionOnFail,
  ...passthrough
) {
  let numFailures = 0;

  for (;;) {
    const hasFailed = numFailures > 0;
    try {
      yield call(delay, AF_POLL_DELAY_TIME);
      const status = yield call(Airflow.getDagStatus, {
        dag_id: dagId,
        run_id: runId,
        task_id: taskId,
      });

      if (hasFailed) {
        yield put(actionOnRecovery());
      }

      switch (status.data.state) {
        case dagState.success: {
          yield put(actionOnFinish(dagId, runId, status.data.state, passthrough));
          return;
        }
        case dagState.queued:
        case dagState.running:
          yield put(fetchDagStatusRunning(status));
          break;
        default:
          yield put(actionOnFail());
          return;
      }
    } catch (e) {
      yield delay(AF_RETRY_DELAY_TIME);
      yield put(actionOnDisconnect(AF_NUM_RETRIES - numFailures));
      numFailures += 1;
      if (numFailures >= AF_NUM_RETRIES) {
        yield put(actionOnFail());
        break;
      }
    }
  }
}

export function* attemptToTriggerDag(action) {
  const apiMethod = action.payload.triggerFn ? action.payload.triggerFn : Airflow.triggerStrat;
  for (let i = 0; i < AF_NUM_RETRIES; i += 1) {
    try {
      const dag = yield call(apiMethod, action.payload.request);
      if (i > 0) {
        yield put(action.payload.actionOnRecovery());
      }
      return dag;
    } catch (e) {
      yield put(action.payload.actionOnDisconnect(AF_NUM_RETRIES - i));
      yield call(delay, AF_RETRY_DELAY_TIME);
    }
  }
  throw new Error(`Trigger failed after ${AF_NUM_RETRIES} tries`);
}

export function* triggerDagByStrat(action) {
  const {
    actionOnFinish, actionOnFail, actionOnRecovery, actionOnDisconnect, taskId, ...passthrough
  } = action.payload;
  try {
    const dag = yield call(attemptToTriggerDag, action);
    const dagId = dag.data.dag_id;
    const runId = dag.data.run_id;

    yield call(
      pollDagStatus,
      dagId,
      runId,
      taskId,
      actionOnFinish,
      actionOnDisconnect,
      actionOnRecovery,
      actionOnFail,
      passthrough,
    );
  } catch (error) {
    yield put(actionOnFail());
  }
}

export function* watchSagas() {
  yield all([
    takeLatest(FETCH_REPORT, triggerDagByStrat),
  ]);
}
