0

fabric将区块链数据封装为Ledger,底层数据存储使用了RocksDB

DB

数据存储接口的封装位于fabric/core/db/db.go

RocksDB数据库的位置通过peer.fileSystemPath配置,主要的ColumnFamily有五个:

  1. BlockchainCF,存储区块链
  2. StateCF,存储状态
  3. StateDeltaCF,存储临时状态
  4. IndexesCF,存储区块链的索引
  5. PersistCF,存储consensu等状态信息

Ledger

区块链接口封装在fabric/core/ledger包下。

主要的数据结构:


/////////////////////////////////////////
//账本,单例
type Ledger struct {
	blockchain *blockchain
	state      *state.State
	currentID  interface{}
}

/////////////////////////////////////////
//区块链
type blockchain struct {
	size               uint64
	previousBlockHash  []byte
	// 索引,也就是读写IndexesCF
	indexer            blockchainIndexer
	// 最后处理的区块
	lastProcessedBlock *lastProcessedBlock
}

/////////////////////////////////////////
//状态
type State struct {
	// 默认buckettree
	stateImpl             statemgmt.HashableState
	stateDelta            *statemgmt.StateDelta
	currentTxStateDelta   *statemgmt.StateDelta
	currentTxID           string
	txStateDeltaHash      map[string][]byte
	updateStateImpl       bool
	historyStateDeltaSize uint64
}

主要的接口:

// BeginTxBatch - gets invoked when next round of transaction-batch execution begins
// 通过ledger.currentID是否为nil判断
func (ledger *Ledger) BeginTxBatch(id interface{}) error 
// CommitTxBatch - gets invoked when the current transaction-batch needs to be committed
// This function returns successfully iff the transactions details and state changes (that
// may have happened during execution of this transaction-batch) have been committed to permanent storage
func (ledger *Ledger) CommitTxBatch(id interface{}, transactions []*protos.Transaction, transactionResults []*protos.TransactionResult, metadata []byte) error 

//调用blockchain.addPersistenceChangesForNewBlock写数据库
//BlockchainCF blockNumber: blockBytes
//BlockchainCF blockCountKey("blockCount"): blockNumber+1

//调用blockchain.indexer.createIndexes写数据库
//IndexesCF blockHash: blockNumber
//IndexesCF tx.Txid: (blockNumber, txIndex)
//IndexesCF (address, blockNumber): txsIndexes

//调用state.AddChangesForPersistence写数据库
//StateCF dataKey: value
//StateCF bucketKey: marshal
//StateDeltaCF blockNumber: serializedStateDelta
// RollbackTxBatch - Discards all the state changes that may have taken place during the execution of
// current transaction-batch
func (ledger *Ledger) RollbackTxBatch(id interface{}) error 

// 通过state.currentTxID判断Tx状态
// TxBegin - Marks the begin of a new transaction in the ongoing batch
func (ledger *Ledger) TxBegin(txID string) 

// TxFinished - Marks the finish of the on-going transaction.
// If txSuccessful is false, the state changes made by the transaction are discarded
func (ledger *Ledger) TxFinished(txID string, txSuccessful bool) 

//如果成功,调用state.stateDelta.ApplyChanges更新stateDelta
// GetState get state for chaincodeID and key. If committed is false, this first looks in memory
// and if missing, pulls from db.  If committed is true, this pulls from the db only.
func (ledger *Ledger) GetState(chaincodeID string, key string, committed bool) ([]byte, error) 
// 查数据库 StateCF (bucketNumber, chaincodeID, key)

// SetState sets state to given value for chaincodeID and key. Does not immideatly writes to DB
func (ledger *Ledger) SetState(chaincodeID string, key string, value []byte) error 

// DeleteState tracks the deletion of state for chaincodeID and key. Does not immediately writes to DB
func (ledger *Ledger) DeleteState(chaincodeID string, key string) error 
// ApplyStateDelta applies a state delta to the current state. This is an
// in memory change only. You must call ledger.CommitStateDelta to persist
// the change to the DB.
// This should only be used as part of state synchronization. State deltas
// can be retrieved from another peer though the Ledger.GetStateDelta function
// or by creating state deltas with keys retrieved from
// Ledger.GetStateSnapshot(). For an example, see TestSetRawState in
// ledger_test.go
// Note that there is no order checking in this function and it is up to
// the caller to ensure that deltas are applied in the correct order.
// For example, if you are currently at block 8 and call this function
// with a delta retrieved from Ledger.GetStateDelta(10), you would now
// be in a bad state because you did not apply the delta for block 9.
// It's possible to roll the state forwards or backwards using
// stateDelta.RollBackwards. By default, a delta retrieved for block 3 can
// be used to roll forwards from state at block 2 to state at block 3. If
// stateDelta.RollBackwards=false, the delta retrieved for block 3 can be
// used to roll backwards from the state at block 3 to the state at block 2.
func (ledger *Ledger) ApplyStateDelta(id interface{}, delta *statemgmt.StateDelta) error 

// CommitStateDelta will commit the state delta passed to ledger.ApplyStateDelta
// to the DB
func (ledger *Ledger) CommitStateDelta(id interface{}) error {

//StateCF (bucketNumber, chaincodeID, key): value
//StateCF bucketKey: bucketNode
// VerifyChain will verify the integrity of the blockchain. This is accomplished
// by ensuring that the previous block hash stored in each block matches
// the actual hash of the previous block in the chain. The return value is the
// block number of lowest block in the range which can be verified as valid.
// The first block is assumed to be valid, and an error is only returned if the
// first block does not exist, or some other sort of irrecoverable ledger error
// such as the first block failing to hash is encountered.
// For example, if VerifyChain(0, 99) is called and previous hash values stored
// in blocks 8, 32, and 42 do not match the actual hashes of respective previous
// block 42 would be the return value from this function.
// highBlock is the high block in the chain to include in verification. If you
// wish to verify the entire chain, use ledger.GetBlockchainSize() - 1.
// lowBlock is the low block in the chain to include in verification. If
// you wish to verify the entire chain, use 0 for the genesis block.
func (ledger *Ledger) VerifyChain(highBlock, lowBlock uint64) (uint64, error) 

blockchain接口

跟区块链直接相关的接口主要有三个:

  1. BeginTxBatch
  2. CommitTxBatch
  3. RollbackTxBatch

consensus模块做完一致性逻辑后调用这些接口完成写区块。consensus/helper/helper.go对这些接口又做了一次封装。

Ledger结构体中的currentID用于标记一次写区块的事务。

CommitTxBatch的过程主要包括:

  1. ledger.state.GetHash()计算状态的散列值。state.updateStateImpl标记是否需要将state.stateDelta更新到state.stateImpl
  2. ledger.blockchain.addPersistenceChangesForNewBlock构建block,通过writeBatch写数据库。
  3. ledger.state.AddChangesForPersistence写StateCF和StateDeltaCF。
  4. 如果数据库写成功,currentID设为nil,清空StateDelta,递增blockchain.size
  5. 发送各种events。

state接口

跟状态相关的接口主要有五个:

  1. TxBegin
  2. TxFinished
  3. GetState
  4. SetState
  5. DeleteState

在执行交易exectransaction.go的过程中会调用这些接口对state进行修改。

State结构体中的currentTxID用于标记一次交易的事务。

如果执行成功,那么在TxFinished过程中会将currentTxStateDelta应用到stateDelta

对状态的增删改查方法主要是供chaincode中的shim.ChaincodeStubInterface调用。

GetState依次查找state.currentTxStateDeltastate.stateDelta,最后通过state.stateImpl.Get查数据库。

SetState将kv赋值到currentTxStateDelta,而不是直接写数据库。