const _ = require('lodash');
const { Observable } = require('rxjs');
const { extractInfo, waitForInvocationReaction } = require('./reactions_utils');
const { handleBeeError } = require('./ConnectionService');

const REACTION_TIMEOUT = 30000;

const END_STATES = [ "REVOKED", "REMOVED", "EXPIRED", "ARRIVED" ];
const hasEndState = (queueItem) => _.includes(END_STATES, queueItem.state);

const updateValue = (data) => queueItem => {
  data.subscriber.next(queueItem);
  if (hasEndState(queueItem)) {
    console.log(`Observing for queueItem ${data.queueItemId} has completed!`);
    data.subscriber.complete();
  }
};

const fetchItemInfo = (data, queueItemId) => data.bee.actions.invoke('vwroom.getItemInfo', queueItemId)
  .then(waitForInvocationReaction(data.bee, r => _.get(r, 'details.message'), REACTION_TIMEOUT))
  .then(extractInfo(queueItemId, 'order'))
  .then(queueItem => {
    // TODO: check if we have changed queue, if so, then we need to:
    // - stop listening on old queue channel
    // - start listening on new queue channel
    data.queueItem = queueItem;
    return queueItem;
  })
;

const handleChanges = (data, queueItem) => () => {
  console.log(`QueueItem ${data.queueItemId} has changed`, Date.now());
  fetchItemInfo(data, queueItem.id)
    .then(updateValue(data))
    .catch(error => {
      console.error('Could not read item info, could have been deleted', error);
    });
  ;
};

const listenForChanges = data => queueItem => {
  if (data.unsubscribed) {
    return queueItem;
  }

  return data.bee.storage.executeQuery('vwroom', 'notificationChannelForQueue', [ queueItem.queue.id ])
    .then(_.first)
    .then(channel => {
      const callbackId = data.bee.reactions.setCallback(
        `storage:mutate.instance.${channel.id}`,
        _.debounce(handleChanges(data, queueItem), 125)
      );

      console.log(`Adding callback for queueItem ${data.queueItemId}: ${callbackId}`);
      data.callbackIds.push(callbackId);
      return queueItem;
    });
};

const removeCallbacks = (data) => {
  const callbackIds = data.callbackIds;
  data.callbackIds = [];
  console.log(`Removing callbacks for queueItem ${data.queueItemId}:`, callbackIds);
  callbackIds.forEach(data.bee.reactions.removeCallback);
};

const setupObservable = _.curry((data, itemId) => fetchItemInfo(data, itemId)
  .then(listenForChanges(data))
  .then(updateValue(data))
  .catch(error => {
    console.error('Subscribing error', error);
    handleBeeError(error);
    data.subscriber.error(error);
    removeCallbacks(data);
  })
  .finally(() => {
    if (data.unsubscribed) {
      removeCallbacks(data);
    }
  })
);

const tearDown = data => () => {
  console.debug(`Unsubscribing from queueItem ${data.queueItemId}...`);

  removeCallbacks(data);
  data.unsubscribed = true;

  console.debug(`Unsubscribed from queueItem ${data.queueItemId}.`);
};

const prepareData = (bee, subscriber, extras = {}) => _.assignIn(
  {
    unsubscribed: false,
    bee,
    subscriber,
    callbackIds: [],
  },
  extras,
);

function observeItem(bee, itemId) {
  // TODO: validate params

  return new Observable(subscriber => {
    console.debug(`Subscribing to queueItem ${itemId}`);

    const data = prepareData(bee, subscriber, { queueItemId: itemId });

    setupObservable(data, itemId);

    return tearDown(data);
  });
}

module.exports = {
  observeItem,
};
