精通IPFS:IPFS 保存内容之下篇
builder/builder.js
文件中调用调用
pull
函数进行保存文件,这篇文章我们就来详细研究下这个过程。
file.content
。
chunker
流,对保存的内容进行分块。通过前面的文章,我们知道
chunker
流的默认实现为
chunker/fixed-size.js
,它是一个
pull-through
流。这个流提供了两个函数,分别称为
onData
和
onEnd
,前者在每次数据到来时调用,后者当数据发送完成时调用。
fixed-size.js
在初始化时,根据选项中指定的
maxChunkSize
属性设置每一个区块的大小。下面,我们来看下它的在它的
onData
和
onEnd
两个方法。
onData
函数处理如下:
-
每次收到数据之后就保存在
BufferList
中,同时把当前数据长度也加上读取到数据的长度。bl.append(buffer) currentLength += buffer.length -
如果当前数据长度大于等于规定的区块大小时,那么就进行下面的循环处理,直到当前数据长度小于规定的区块大小。
-
从缓冲列表中取得规定的区块大小数据到队列中。
this.queue(bl.slice(0, maxSize))
-
如果缓冲列表的长度刚好等于规定的区块大小,那么重新一个新的缓冲区列表,并将当前数据长度设置为 0;否则,生成一个新的缓冲区列表,并从老缓冲区中区块大小处把数据读取到新的缓冲区列表中(0 到区块大小处的数据已经在上面读取过),同时设置其为老的缓冲区列表,并更新当前数据长度设减去前一步读取到区块大小长度,从而更缓冲区列表及其长度。
if (maxSize === bl.length) { bl = new BufferList() currentLength = 0 } else { const newBl = new BufferList() newBl.append(bl.shallowSlice(maxSize)) bl = newBl currentLength -= maxSize }
onData
方法,接下来我们再看onEnd
函数,这个函数首先检查缓冲列表中是否有数据(少于区块大小),如果有则同样保存到队列中。if (currentLength) { this.queue(bl.slice(0, currentLength)) emitted = true }
if (!emitted) { this.queue(Buffer.alloc(0)) }
this.queue(null)
-
从缓冲列表中取得规定的区块大小数据到队列中。
paraMap
流(类型为 pull-paramap),对每一个分块进行处理。当前面的流对文件进行分块之后,每一个分区都会下一个流进行拉取,在这里就是这个函数,我们看下这个函数是如何处理每一个分块的。它的主体是一个
waterfall
函数,这个函数正如其名字所示,每一个函数都进行各自的处理,并把结果传递给下一个函数,我们看下它的几个处理函数。
首先,我们来看第一个函数,它主要用来创建
DAGNode
,并把相关信息传递给第二个函数,它的执行逻辑如下:
-
生成一个
UnixFS
对象。
UnixFS 是一种基于协议缓冲区的格式,用于描述IPFS中的文件,目录和符号链接。目前它支持:原始数据、目录、文件、原数据、符号连接、hamt-sharded-directory 等几种类型。const file = new UnixFS(options.leafType, buffer)
leafType
默认为文件,在文件初始化时通过默认选项defaultOptions
指定的。 -
调用
DAGNode.create
静态方法,创建DAGNode
节点,成功之后,把相信信息传递下一个函数。
UnixFS 的DAGNode.create(file.marshal(), [], (err, node) => { if (err) { return cb(err) }
cb(null, { size: node.size, leafSize: file.fileSize(), data: node }) })
marshal
方法主要内容是对文件内容(字节缓冲区)进行编码。这里DAGNode
引用的是 ipld-dag-pb 库中的dag-node/index.js
中定义的DAGNode
函数对象,它的create
方法,定义于同一个目录下的create.js
中,我们来看下这个方法。它的主要内容是对文件的分区数据和对其他区块的连接link
进行检查,并把两者序列后之后再创建DAGNode
对象。而后者的构造函数比较简单,仅把区块的数据及与其他区块的连接(代表与其他区块的关系)保存起来。
DAGNode
保存到系统中,并把保存的结果传递给下一个函数,它的执行逻辑如下:
-
调用
persist
方法,保存 DAG 节点。这是非常重要的一步,它不仅把区块对象保存在本地仓库,也涉及与是否把区块 CID 保存在与它最近的节点上,还涉及到把区块通过 bitswap 协义发送到那些想要它的节点中。它的执行如下:-
从选项中获取 CID 版本号、哈希算法、编码方式等。
默认情况下,版本号为0,哈希算法为 SHA256,编码方式为 dag-pb,这是一种基于 Protocol 规定的 JS 实现。let cidVersion = options.cidVersion || defaultOptions.cidVersion let hashAlg = options.hashAlg || defaultOptions.hashAlg let codec = options.codec || defaultOptions.codec
if (Buffer.isBuffer(node)) { cidVersion = 1 codec = 'raw' }
if (hashAlg !== 'sha2-256') { cidVersion = 1 }
-
如果选项中指定不保存而仅仅是计算哈希值,那么调用 ipld-dag-pb 库中的
util.js
中的cid
函数,获取 DAG 节点的 CID,然后直接返回。if (options.onlyHash) { return cid(node, { version: cidVersion, hashAlg: hashAlg }, (err, cid) => { callback(err, { cid, node }) }) }
-
如果不是只计算哈希,那么调用 IPLD 对象的
put
来保存 DAG 节点。
IPLD 对象定义于 ipld 库中。IPLD 在 IPFS 中具有非常重要的作用,它是 InterPlanetary Linked-Data 的缩写,代表了 IPFS 的野心与希望,把一切东西连结起来的愿望,目前可以边结比特币、以太坊、Zcash、git 等。它持有 ipfs-block-service,后者又持有 ipfs 仓库对象和 bitswap 对象,这几个对象构成了 ipfs 的核心。ipld.put(node, { version: cidVersion, hashAlg: hashAlg, format: codec }, (error, cid) => { callback(error, { cid, node }) })
下面我们来看
put
方法,看它是怎么来保存 DAG 对象的。它的主体是调用内部方法获取当前 DAG 对象编码用的格式,然后使用与这种格式相匹配的cid
方法来取得对象的 CID 对象,然后调用内部的_put
来保存数据。this._getFormat(options.format, (err, format) => { if (err) return callback(err)
format.util.cid(node, options, (err, cid) => { if (err) { return callback(err) }
if (options.onlyHash) { return callback(null, cid) }
this._put(cid, node, callback)
接下来,我们来看这个内部}) })
_put
方法,这个方法主体是一个waterfall
函数,它内部的几个函数分别根据 CID 对象获得对应的编码格式,然后使用编码格式对应的方法序列化 DAG 节点对象,最后生成区块Block
对象,并调用区块服务对象的put
方法来保存区块。区块服务对象定义于 ipfs-block-service 库,它的
put
方法,根据是否有 bitswap 对象(初始化是这个对象为空)来决定是调用仓库对象来保存区块,还是调用 bitswap 来保存区块。对于我们的例子来说,它会调用 bitswap 来保存区块。bitswap 对象的
put
方法,不仅会把区块保存在底层的 blockstore 中,还会把它发送给那些需要它的节点。它的主体是一个waterfall
函数,其中第一个函数检查本地区块存储是否有这个区块,第二个根据本地是否有这个区块来确定是否忽略调用,还是真正来保存区块。waterfall([ (cb) => this.blockstore.has(block.cid, cb), (has, cb) => { if (has) { return nextTick(cb) }
this._putBlock(block, cb)
bitswap 对象的} ], callback)
_putBlock
方法调用区块存储对象的put
方法在本地仓库中保存区块对象,并在成功之后触发一个收到区块的事件,同时通过网络对象的provide
方法,从而把 CID 保存在最近的节点中,然后调用引擎对象的receivedBlocks
方法,把接收到的区块对象发送到所有想要这个区块的所有节点中。
bitswap 对象中有两个重要的对象,一个是网络对象,一个是引擎对象。this.blockstore.put(block, (err) => { if (err) { return callback(err) }
this.notifications.hasBlock(block) this.network.provide(block.cid, (err) => { if (err) { this._log.error('Failed to provide: %s', err.message) } })
this.engine.receivedBlocks([block.cid]) callback() })
网络对象的
provide
方法直接调用 libp2p 对象的内容路由的同名方法来处理区块的 CID。libp2p 对象的内容路由中保存所有具体的路由方法,默认情况下,是空的,即没有任何路由方法,而我们通过在配置文件中,指定libp2p.config.dht.enabled
为真,为内容路由指定了 DHT 路由,所以最终区块的 CID 会被保存在最合适的节点中。网络对象在初始方法中,指定了自身的两个方法作为 libp2p 对象的节点连接与断开事件的处理器,从而在连接与断开时获得相应的通知,并且还调用了 libp2p 对象的
handle
方法,从而使自己成为 libp2p 对象/ipfs/bitswap/1.0.0
和/ipfs/bitswap/1.1.0
这两种协义的处理对象,从而当 libp2p 收到这两种消息时,会调用网络对象对象的相应方法进行处理。网络对象处理 bitswap 协义是通过
pull
函数处理的,大致流程如下:从连接对象中获取消息,然后反序列化成为消息对象,然后通过连接对象获取它的节点信息对象,再然后调用 bitswap 对象的内部方法_receiveMessage
处理传递进来的消息,而这个方法又会调用引擎对象的messageReceived
方法来处理接收到的消息。引擎对象的
messageReceived
方法的大致流程如下:1)调用内部方法
_findOrCreate
,找到或创建远程对等节点的总账本对象 Ledger,如果是新创建的总账本对象,还要放入内部映射集合中,key 为远程对等节点的 Base58 字符串;2)如果这个消息是完全的消息,则生成一个新的想要请求列表。
3)调用内部方法
_processBlocks
,处理消息中的区块对象。4)如果消息中的想要列表为空的,则退出方法。
5)遍历消息中的想要列表,如果当前想要的实体被取消,则从对应的节点的总账本中去掉对应项,同时保存在取消项列表中;否则,把当前项保存在对应节点的总账本中,同时保存在想要列表中。
6)调用内部方法
_cancelWants
,把任务中已经取消的过滤掉,即删除任务中已经取消的任务。7)调用内部方法
_addWants
,处理远程对等节点所有想要的列表。调用区块存储对象判断想要的项本地仓库中是否已经有,如果已经有,则生成相应的任务。引擎对象的
receivedBlocks
方法在收到具体区块时,检查所有已连接的远程节点(总账本对象),看它们是否想要这个区块,如果是则生成一个任务,在后台进行处理。
-
从选项中获取 CID 版本号、哈希算法、编码方式等。
pullThrough
流(类型为 pull-through 流),对收到的每个数据进行处理。这个过程比较简单,这里不细讲。
reducer
流,把所有生成的分块进行归一处理。在默认情况下,
reducer
流是在
balanced/index.js
中通过调用
balanced/balanced-reducer.js
中的
balancedReduceToRoot
的函数生成的。我们看下这个函数的执行过程:
-
生成
pull-pair
对象和pull-pushable
对象。const pair = pullPair() const source = pair.source
const result = pushable()
-
调用
reduceToParents
函数,建立内部pull
流。函数的主体就是一个pull
函数建立起来的流,它的几个函数如下:-
第一个函数是前面建立的
source
流。 - 第二个函数是一个 pull-batch 类库定义的流,这是一个 pull-through 流,它实现了自己的 writer、ender 两个函数,它把每次获取到的数据保存在内部数组中,达到一定程序之后才会保存到 pull-through 流的队列中。
-
第三个函数是 pull-stream 类库的
async-map
流,这是一个through
流,与map
流相似,但有更好的性能。它的归一处理函数reduce
默认情况下为builder/reduce.js
中返回的reducefile
函数。它的流程如下:1)如果当前叶子节点数量是1,并且其single
标志为真,并且选项中有配置把单独叶子归一到自身,那么直接调用回调对象;否则,执行下面的流。
2)创建父节点,并添加它的所有叶子节点。当文件比较大的时候,IPFS 会进行分块,每一个分块就构成了这里的叶子节点,最终这些叶子按照它们分块的顺序,生成对应的 DAGLink ,然后依次添加到父 DAGNode 中,这时候父 DAGNode 保存的不是文件内容,而是这些叶子节点的 DAGLink,从而构成文件的完整内容。if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) { const leaf = leaves[0]
return callback(null, { size: leaf.size, leafSize: leaf.leafSize, multihash: leaf.multihash, path: file.path, name: leaf.name }) }
3)调用const f = new UnixFS('file')
const links = leaves.map((leaf) => { f.addBlockSize(leaf.leafSize)
return new DAGLink(leaf.name, leaf.size, leaf.multihash) })
waterfall
函数,顺序处理父节点。这个地方和处理单个分块类似,就是创建 DAGNode 对象、调用persist
函数进行持久化处理。注意:这里的区别是父节点有叶子节点,即links
不空。
4)上面waterfall([ (cb) => DAGNode.create(f.marshal(), links, cb), (node, cb) => persist(node, ipld, options, cb) ], (error, result) => { if (error) { return callback(error) }
callback(null, { size: result.node.size, leafSize: f.fileSize(), multihash: result.cid.buffer, path: file.path, name: '' }) })
waterfall
函数处理完成后,调用回调函数进行继续处理。归一处理函数
reduce
中的回调函数是下面collect
流即sink
流中的读取回调函数,当归一函数读取到数据之后,调用这个回调函数,从而数据 pull 到collect
流,进而进入reduced
函数中进行处理。 -
第四个函数是 pull-stream 类库的
collect
流,这是一个sink
流。它的处理函数reduced
流程如下:1)如果前面的流有错误,则直接调用reduceToParents
函数的回调函数进行处理;2)否则,如果当前收到的数据长度大于1,即前面归一处理之后,还是有多个根 DAGNode,则调用
reduceToParents
函数继续进行归一处理;3)否则,调用
reduceToParents
函数的回调函数进行处理。reduceToParents
函数的回调函数,这是一个很关键的函数,在这个函数内部把读取到的数据写入result
表示的 pull-pushable 流,以便在它后面的外部流流获取数据。
-
第一个函数是前面建立的
-
返回双向流对象。这里返回的双向流对象为
其中{ sink: pair.sink, source: result }
sink
是pull-pair
类库中定义的sink
流,它被外部的pull
函数调用用来从前面一个流中读取数据;source
是pull-pushable
类库中的流,在reduceToParents
函数的回调函数中被push
数据,从而外部的pull
函数中相关的流可以从它中读取函数。
collect
流,在这个流的处理函数中,把保存文件的结果传递到外部函数中。
collect((err, roots) => {
if (err) {
callback(err)
} else {
callback(null, roots[0])
}
})
这里的
callback
是调用
createAndStoreFile
函数时传递进来的,而它的调用是在
builer/builder.js
文件中,简单回顾一下调用代码:
createAndStoreFile(item, (err, node) => {
if (err) {
return cb(err)
}
if (node) {
source.push(node)
}
cb()
})
这里的匿名回调函数即是上面的
callback
,在回调函数中,通过保存文件的结果写入
source
流中,从而把数据传递到更外层的
pull
流中。
点击回顾: