write.js 5.02 KB
'use strict'

const util = require('util')

const contentPath = require('./path')
const fixOwner = require('../util/fix-owner')
const fs = require('fs')
const moveFile = require('../util/move-file')
const Minipass = require('minipass')
const Pipeline = require('minipass-pipeline')
const Flush = require('minipass-flush')
const path = require('path')
const rimraf = util.promisify(require('rimraf'))
const ssri = require('ssri')
const uniqueFilename = require('unique-filename')
const { disposer } = require('./../util/disposer')
const fsm = require('fs-minipass')

const writeFile = util.promisify(fs.writeFile)

module.exports = write

function write (cache, data, opts = {}) {
  const { algorithms, size, integrity } = opts
  if (algorithms && algorithms.length > 1) {
    throw new Error('opts.algorithms only supports a single algorithm for now')
  }
  if (typeof size === 'number' && data.length !== size) {
    return Promise.reject(sizeError(size, data.length))
  }
  const sri = ssri.fromData(data, algorithms ? { algorithms } : {})
  if (integrity && !ssri.checkData(data, integrity, opts)) {
    return Promise.reject(checksumError(integrity, sri))
  }

  return disposer(makeTmp(cache, opts), makeTmpDisposer,
    (tmp) => {
      return writeFile(tmp.target, data, { flag: 'wx' })
        .then(() => moveToDestination(tmp, cache, sri, opts))
    })
    .then(() => ({ integrity: sri, size: data.length }))
}

module.exports.stream = writeStream

// writes proxied to the 'inputStream' that is passed to the Promise
// 'end' is deferred until content is handled.
class CacacheWriteStream extends Flush {
  constructor (cache, opts) {
    super()
    this.opts = opts
    this.cache = cache
    this.inputStream = new Minipass()
    this.inputStream.on('error', er => this.emit('error', er))
    this.inputStream.on('drain', () => this.emit('drain'))
    this.handleContentP = null
  }

  write (chunk, encoding, cb) {
    if (!this.handleContentP) {
      this.handleContentP = handleContent(
        this.inputStream,
        this.cache,
        this.opts
      )
    }
    return this.inputStream.write(chunk, encoding, cb)
  }

  flush (cb) {
    this.inputStream.end(() => {
      if (!this.handleContentP) {
        const e = new Error('Cache input stream was empty')
        e.code = 'ENODATA'
        // empty streams are probably emitting end right away.
        // defer this one tick by rejecting a promise on it.
        return Promise.reject(e).catch(cb)
      }
      this.handleContentP.then(
        (res) => {
          res.integrity && this.emit('integrity', res.integrity)
          res.size !== null && this.emit('size', res.size)
          cb()
        },
        (er) => cb(er)
      )
    })
  }
}

function writeStream (cache, opts = {}) {
  return new CacacheWriteStream(cache, opts)
}

function handleContent (inputStream, cache, opts) {
  return disposer(makeTmp(cache, opts), makeTmpDisposer, (tmp) => {
    return pipeToTmp(inputStream, cache, tmp.target, opts)
      .then((res) => {
        return moveToDestination(
          tmp,
          cache,
          res.integrity,
          opts
        ).then(() => res)
      })
  })
}

function pipeToTmp (inputStream, cache, tmpTarget, opts) {
  let integrity
  let size
  const hashStream = ssri.integrityStream({
    integrity: opts.integrity,
    algorithms: opts.algorithms,
    size: opts.size
  })
  hashStream.on('integrity', i => { integrity = i })
  hashStream.on('size', s => { size = s })

  const outStream = new fsm.WriteStream(tmpTarget, {
    flags: 'wx'
  })

  // NB: this can throw if the hashStream has a problem with
  // it, and the data is fully written.  but pipeToTmp is only
  // called in promisory contexts where that is handled.
  const pipeline = new Pipeline(
    inputStream,
    hashStream,
    outStream
  )

  return pipeline.promise()
    .then(() => ({ integrity, size }))
    .catch(er => rimraf(tmpTarget).then(() => { throw er }))
}

function makeTmp (cache, opts) {
  const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix)
  return fixOwner.mkdirfix(cache, path.dirname(tmpTarget)).then(() => ({
    target: tmpTarget,
    moved: false
  }))
}

function makeTmpDisposer (tmp) {
  if (tmp.moved) {
    return Promise.resolve()
  }
  return rimraf(tmp.target)
}

function moveToDestination (tmp, cache, sri, opts) {
  const destination = contentPath(cache, sri)
  const destDir = path.dirname(destination)

  return fixOwner
    .mkdirfix(cache, destDir)
    .then(() => {
      return moveFile(tmp.target, destination)
    })
    .then(() => {
      tmp.moved = true
      return fixOwner.chownr(cache, destination)
    })
}

function sizeError (expected, found) {
  const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`)
  err.expected = expected
  err.found = found
  err.code = 'EBADSIZE'
  return err
}

function checksumError (expected, found) {
  const err = new Error(`Integrity check failed:
  Wanted: ${expected}
   Found: ${found}`)
  err.code = 'EINTEGRITY'
  err.expected = expected
  err.found = found
  return err
}