const _ = require('lodash');
const { Observable } = require('rxjs');
const { waitForInvocationReaction } = require('./reactions_utils');
const { handleBeeError } = require('./ConnectionService');
const moment = require('moment');

const REACTION_TIMEOUT = 30000;

const fetchStats = (data) => {
  const now = moment();
  const dayStart = now.startOf('day').toISOString();
  const weekStart = now.startOf('week').toISOString();

  return data.bee.actions
    .invoke('vwroom.getStats', data.queueId, dayStart, weekStart)
    .then(waitForInvocationReaction(data.bee, r => _.get(r, 'details.message'), REACTION_TIMEOUT))
    .then((stats) => data.subscriber.next(stats));
};

const handleChanges = (data) => () => {
  fetchStats(data).catch((error) => {
    console.error('Could not read item info, could have been deleted', error);
  });
};

const listenForChanges = (data) => () => {
  if (data.unsubscribed) {
    return;
  }

  const callbackId = data.bee.reactions.setCallback(
    `storage:mutate.instance.${data.queueId}`,
    handleChanges(data)
  );

  console.debug(`Adding callback for queue stats ${data.queueId}: ${callbackId}`);
  data.callbackIds.push(callbackId);
};

const removeCallbacks = (data) => {
  const callbackIds = data.callbackIds;
  data.callbackIds = [];
  console.debug(`Removing callbacks for queue stats ${data.queueId}:`, callbackIds);
  callbackIds.forEach(data.bee.reactions.removeCallback);
};

const setupObservable = _.curry((data, queueId) =>
  fetchStats(data, queueId)
    .then(listenForChanges(data))
    .catch((error) => {
      console.error('Subscribing error', error);
      handleBeeError(error);
      data.subscriber.error(error);
      removeCallbacks(data);
    })
    .finally(() => {
      if (data.unsubscribed) {
        removeCallbacks(data);
      }
    })
);

const tearDown = (data) => () => {
  console.debug(`Unsubscribing from queue stats ${data.queueId}...`);

  removeCallbacks(data);
  data.unsubscribed = true;

  console.debug(`Unsubscribed from queue stats ${data.queueId}.`);
};

function observe(bee, queueId) {
  // TODO: validate params

  return new Observable((subscriber) => {
    console.debug(`Subscribing to queue stats ${queueId}`);

    const data = {
      unsubscribed: false,
      bee,
      subscriber,
      queueId,
      callbackIds: [],
    };

    setupObservable(data, queueId);

    return tearDown(data);
  });
}

module.exports = {
  observe,
};
