const _ = require('lodash');
const { Observable } = require('rxjs');
const { handleBeeError } = require('./ConnectionService');
const { waitForInvocationReaction, extractInfo } = require('./reactions_utils');

const REACTION_TIMEOUT = 30000;

const updateSubscribers = _.curry((data, queue) => {
  data.subscriber.next(queue);
  return queue;
});

const fetchQueue = (data) => {
  return data.bee.actions.invoke('vwroom.getQueueInfo', data.locationId, data.queueName)
    .then(waitForInvocationReaction(data.bee, r => _.get(r, 'details.message'), REACTION_TIMEOUT))
    .then(extractInfo(undefined, ['itemCount','averageWaitTime']))
    .then(updateSubscribers(data))
  ;
};


const listenForChanges = data => queue => {
  if (data.unsubscribed) {
    return queue;
  }

  const queueHandler = () => {
    console.debug('Queue changed, reloading');
    return fetchQueue(data);
  };

  let callbackId = data.bee.reactions.setCallback(`storage:mutate.instance.${queue.notificationChannel}`, queueHandler);
  console.debug(`Adding callback for queue ${data.queueName}: ${callbackId}`);
  data.callbackIds.push(callbackId);

  return queue;
};

const removeCallbacks = (data) => {
  const callbackIds = data.callbackIds;
  data.callbackIds = [];
  console.debug(`Removing callbacks for queue ${data.queueName}: ${callbackIds}`);
  callbackIds.forEach(data.bee.reactions.removeCallback);
};

function observe(bee, locationId, queueName) {
  // TODO: validate params

  return new Observable(subscriber => {
    console.debug(`Subscribing to queue ${queueName}...`);

    const data = {
      unsubscribed: false,
      bee,
      subscriber,
      locationId,
      queueName,
      callbackIds: [],
    };

    fetchQueue(data)
      .then(listenForChanges(data))
      .catch(error => {
        console.error('Subscribing error', error);
        handleBeeError(error);
        removeCallbacks(data);
        subscriber.error(error);
      })
      .finally(() => {
        if (data.unsubscribed) {
          removeCallbacks(data);
        }
      })
    ;

    return () => {
      console.debug(`Unsubscribing from queue ${queueName}`, data.callbackIds);
      removeCallbacks(data);
      data.unsubscribed = true;
    };
  });
}


module.exports = {
  observe,
};
