精通IPFS:IPFS 保存内容之上篇
在开始真正分析这些命令/动作之前,先要对 pull-stream 类库进行简单介绍,如果不熟悉这个类库,接下来就没办法进行。
pull-stream 是一个新型的流库,数据被从源中拉取到目的中,它有两种基本类型的流:Source 源和 Sink 接收器。除此之外,有两种复合类型的流:Through 通道流(比如转换)和 Duplex 双向流。
source 流,这类流返回一个匿名函数,这个匿名函数被称为 read 函数,它被后续的 sink 流函数或 through 流函数调用,从而读取 source 流中的内容。
sink 流,这类流最终都返回内部 drain.js 中的 sink 函数。这类流主要是读取数据,并且对每一个读取到的数据进行处理,如果流已经结束,则调用用户指定结束函数进行处理。
through 流,这类流的函数会返回嵌套的匿名函数,第一层函数接收一个 source 流的 read 函数或其他 through 函数返回的第一层函数为参数,第二层函数接收最终 sink 提供的写函数或其他 through 返回的第二层函数,第二层函数内部调用 read 函数,从而直接或间接从 source 中取得数据,获取数据后直接或间接调用 sink 函数,从而把数据写入到目的地址。
在 pull-streams 中,数据在流动之前,必须有一个完整的管道,这意味着一个源、零个或多个通道、一个接收器。但是仍然可以创建一个部分化的管道,这非常有用。也就是说可以创建一个完整的管道,比如
pull(source, sink) => undefined
,也可以部分化的管道,比如
pull(through, sink) => sink
,或者
pull(through1, through2) => through
,我们在下面会大量遇到这种部分化的管道。
今天,我们看下第一个最常用的
add
命令/动作,我们使用 IPFS 就是为了把文件保存到 IPFS,自然少不了保存操作,
add
命令就是干这个的,闲话少数,我们来看一段代码。
const {createNode} = require('ipfs')const node = createNode({
libp2p:{
config:{
dht:{
enabled:true
}
}
}
})
node.on('ready', async () => {
const content = `我爱黑萤`;
const filesAdded = await node.add({
content: Buffer.from(content)
},{
chunkerOptions:{
maxChunkSize:1000,
avgChunkSize:1000
}
})
console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)
})
这次我们没有完全使用默认配置,开启了 DHT,看过我文章的读者都知道 DHT 是什么东东,这里不详细解释。在程序中,通过调用 IPFS 节点的
add
方法来上传内容,内容可以是文件,也可以是直接的内容,两者有稍微的区别,在讲到相关代码时,我们指出这种区别的,这里我们为了简单直接上传内容为例来说明。
add
方法位于
core/components/files-regular/add.js
文件中,在 《精通IPFS:系统启动之概览》 那篇文章中,我们说过,系统会把
core/components/files-regular
目录下的所有文件扩展到 IPFS 对象上面,这其中自然包括这里的
add.js
文件。下面,我们直接看这个函数的执行流程。
这个函数返回了一个内部定义的函数,在这个内部定义的函数中对参数做了一些处理,然后就调用内部的
add
函数,后者才是主体,它的逻辑如下:
-
首先,检查选项对象是否为函数,如果是,则重新生成相关的变量。
if (typeof options === 'function') { callback = options options = {} }
-
定义检测内容的工具函数来检测我们要上传的内容。
const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj) const isContentObject = obj => { if (typeof obj !== 'object') return false if (obj.content) return isBufferOrStream(obj.content) return Boolean(obj.path) && typeof obj.path === 'string' }
const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))
if (!ok) { return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) }
-
接下来,执行 pull-stream 类库提供的
pull
函数。我们来看pull
函数的主要内容。它的第一个参数是pull.values
函数执行的结果,这个values
函数就是一个 source 流,它返回一个称为read
的函数来读取我们提供的数据。这个read
函数从数组中读取当前索引位置的值,以此值为参数,调用它之后的 through 函数第二层函数内部定义的回调函数或最终的 sink 函数内部定义的回调函数。如果数组已经读取完成,则直接以 true 为参数进行调用。第二个参数是 IPFS 对象的
addPullStream
方法,这个方法也是在启动时候使用同样的方法扩展到 IPFS 对象,它的主体是当前目录的add-pull-stream.js
文件中的函数。接下来,我们会详细看这个函数,现在我们只需要知道这个函数返回了一个部分化的管道。第三个参数是
pull-sort
中定义的函数,这是一个依赖于pull-stream
的库,根据一定规则来排序,这个函数我们不用管。最后一个参数是
pull.collect
函数执行的结果,这个collect
函数就是一个 sink 流。它把最终的结果放入一个数组中,然后调用回调函数。我们在前面代码中看到的filesAdded
之所以是一个数组就是拜这个函数所赐。上面逻辑的代码如下:
在上面的代码中,我们把要保存的内容构成一个数组,具体原因下面解释。pull( pull.values([data]), self.addPullStream(options), sort((a, b) => { if (a.path < b.path) return 1 if (a.path > b.path) return -1 return 0 }), pull.collect(callback) )
addPullStream
方法,这个方法是保存内容的主体,
add
方法是只开胃小菜。
addPullStream
方法执行逻辑如下:
-
调用
parseChunkerString
函数,处理内容分块相关的选项。这个函数位于相同目录下的utils.js
文件中,它检查用户指定的分块算法。如果用户没有指定,则使用固定分块算法,大小为系统默认的 262144;如果指定了大小,则使用固定分块算法,但大小为用户指定大小;如果指定为rabin
类分割法,即变长分割法,则调用内部函数来生成对应的分割选项。上面逻辑代码如下:
注意:我们也可以通过重写这个函数来增加自己的分割算法。parseChunkerString = (chunker) => { if (!chunker) { return { chunker: 'fixed' } } else if (chunker.startsWith('size-')) { const sizeStr = chunker.split('-')[1] const size = parseInt(sizeStr) if (isNaN(size)) { throw new Error('Chunker parameter size must be an integer') } return { chunker: 'fixed', chunkerOptions: { maxChunkSize: size } } } else if (chunker.startsWith('rabin')) { return { chunker: 'rabin', chunkerOptions: parseRabinString(chunker) } } else { throw new Error(
Unrecognized chunker option: ${chunker}
) } } -
合并整理选项变量。
const opts = Object.assign({}, { shardSplitThreshold: self._options.EXPERIMENTAL.sharding ? 1000 : Infinity }, options, chunkerOptions)
-
设置默认的 CID 版本号。如果指定了 Hash 算法,但是 CID 版本又不是 1,则强制设为 1。CID 是分布式系统的自描述内容寻址标识符,目前有两个版本 0 和 1,版本 0 是一个向后兼容的版本,只支持 sha256 哈希算法,并且不能指定。
if (opts.hashAlg && opts.cidVersion !== 1) { opts.cidVersion = 1 }
-
设置进度处理函数,默认空实现。
const prog = opts.progress || noop const progress = (bytes) => { total += bytes prog(total) }
opts.progress = progress
-
用
pull
函数返回一个部分化的 pull-stream 流。这个部分化的 pull-stream 流是处理文件/内容保存的关键,我们仔细研究下。-
首先调用
pull.map
方法对保存的内容进行处理。pull.map
方法是 pull-stream 流中的一个 source 流,它对数组中的每个元素使用指定的处理函数进行处理。这就是我们在add
函数中把需要保存的内容转化为数组的原因。在这里,对每个数组元素进行处理的函数是normalizeContent
。这个函数定义在同一个文件中,它首先检查保存的内容是否为数组,如果不是则转化为数组;然后,对数组中的每一个元素进行处理,具体如下:-
如果保存的内容是 Buffer 对象,则把要保存的内容转化为路径为空字符串,内容为 pull-stream 流的对象。
if (Buffer.isBuffer(data)) { data = { path: '', content: pull.values([data]) } }
-
如果保存的内容是一个 Node.js 可读流,比如文件,则把要保存的转化为路径为空字符串,内容使用 stream-to-pull-stream 类的
source
方法库把 Node.js 可读流转化为 pull-stream 的 source 流对象。if (isStream.readable(data)) { data = { path: '', content: toPull.source(data) } }
-
如果保存的内容是 pull-stream 的 source 流,则把要保存的内容转化为路径为空字符串,内容不变的对象。
if (isSource(data)) { data = { path: '', content: data } }
-
如果要保存的内容是一个对象,并且
content
属性存在,且不是函数,则进行如下处理:if (data && data.content && typeof data.content !== 'function') { if (Buffer.isBuffer(data.content)) { data.content = pull.values([data.content]) }
if (isStream.readable(data.content)) { data.content = toPull.source(data.content) } }
-
如果指定的是路径,则进行下面的处理。
if (opts.wrapWithDirectory && !data.path) { throw new Error('Must provide a path when wrapping with a directory') }
if (opts.wrapWithDirectory) { data.path = WRAPPER + data.path }
- 返回最终生成的要保存的内容。
-
如果保存的内容是 Buffer 对象,则把要保存的内容转化为路径为空字符串,内容为 pull-stream 流的对象。
-
调用
pull.flatten()
方法,把前上步生成的数组进行扁平化处理。flatten
方法是一个 through 流,主要是把多个流或数组流转换为一个流,比如把多个数组转换成一个数组,比如:
这样的数组使用这个方法处理后,最终会变成下面的数组[ [1, 2, 3], [4, 5, 6], [7, 8, 9] ]
[1, 2, 3, 4, 5, 6, 7, 8, 9]
-
调用
importer
函数来保存内容。这个函数定义在ipfs-unixfs-importer
类库中,这个类库是 IPFS 用于处理文件的布局和分块机制的 JavaScript 实现,具体如何保存内容,如何分块我们将在下篇文章中进行详细分析。 -
调用
pull.asyncMap
方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。当程序执行到这里时,我们要保存的文件或内容已经保存在本地 IPFS 仓库,已经可以用使用cat
、get
、ls
等命令来 API 来查看我们保存的内容或文件了。asyncMap
方法是一个 through 流,类似于map
流,但是有更好的性能。它会对每一个数组元素进行处理,这里处理函数为prepareFile
。这个函数定义在同一个文件中,它的处理具体如下:
-
使用已经生成文件的
multihash
内容生成 CID 对象。
CID 构造方法会检查传入的参数,如果是 CID 对象,则直接从对象中取出版本号、编码方式、多哈希等属性;如果是字符串,则又分为是否被 multibase 编码过,如果是则需要先解码,然后再分离出各种属性,如果没有经过 multibase 编码,那么肯定是 base58 字符串,则设置版本为0,编码方式为let cid = new CID(file.multihash)
dag-pb
,再从 base58 串中获取多哈希值;如果是缓冲对象,则取得第一个字节,并按十六进制转化成整数,如果第一个字节是 0或1,则生成各自属性,否则为多哈希,则设置版本为0,编码方式为dag-pb
。 -
如果用户指定 CID 版本为 1,则生成 CID 对象到版本1.
if (opts.cidVersion === 1) { cid = cid.toV1() }
-
接下来,调用
waterfall
方法,顺序处理它指定的函数。第一个函数,检查配置选项是否指定了onlyHash
,即不实际地上传文件到IFS网络,仅仅计算一下这个文件的 HASH,那么直接调用第二个函数,否则,调用 IPFS 对象的object.get
方法来获取指定文件在仓库中保存的节点信息。这个方法我们后面会专门讲解,这里略去不讲。第二个函数,生成最终返回给用户的对象,这个对象包括了:path、size、hash 等。上面代码如下,比较简单,可自己阅读。
waterfall([ (cb) => opts.onlyHash ? cb(null, file) : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb), (node, cb) => { const b58Hash = cid.toBaseEncodedString()
let size = node.size
if (Buffer.isBuffer(node)) { size = node.length }
cb(null, { path: opts.wrapWithDirectory ? file.path.substring(WRAPPER.length) : (file.path || b58Hash), hash: b58Hash, size }) }
], callback)
-
使用已经生成文件的
-
调用
pull.map
方法,把已经保存到本地的文件预加载到指定节点。map
是一个 through 流,它会对每一个数组元素进行处理,这里处理函数为preloadFile
。这个函数定义在同一个文件中,这会把已经保存的文件预加载到指定的节点,具体保存在哪些节点,可以参考《精通IPFS:系统启动之概览》篇中preload.addresses
,也可以手动指定。 -
调用
pull.asyncMap
方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。asyncMap
方法是一个 through 流,这里处理函数为pinFile
。pin 操作后面我们会详细分析,这里略过不提,读者可以自行阅读相关代码。
-
首先调用