交易队列的输入
交易队列的输入有两个,分别是接收到其他节点的广播交易和自身节点提交的交易。
分别来看这两种输入方式:
-
接收广播交易
在前面区块链同步章节中提到过,接收到交易后会通过调用TransactionQueue::enqueue()
来将新交易放入交易队列中,这个函数代码非常简单:void TransactionQueue::enqueue(RLP const& _data, h512 const& _nodeId) { bool queued = false; { Guard l(x_queue); unsigned itemCount = _data.itemCount(); for (unsigned i = 0; i < itemCount; ++i) { if (m_unverified.size() >= c_maxVerificationQueueSize) { LOG(m_logger) << "Transaction verification queue is full. Dropping " << itemCount - i << " transactions"; break; } m_unverified.emplace_back(UnverifiedTransaction(_data[i].data(), _nodeId)); queued = true; } } if (queued) m_queueReady.notify_all(); }
只是将交易放入m_unverified
中,然后通知校验线程来校验。
再来看校验线程:
void TransactionQueue::verifierBody()
{
while (!m_aborting)
{
UnverifiedTransaction work;
{
unique_lock<Mutex> l(x_queue);
m_queueReady.wait(l, [&](){ return !m_unverified.empty() || m_aborting; });
if (m_aborting)
return;
work = move(m_unverified.front());
m_unverified.pop_front();
}
try
{
Transaction t(work.transaction, CheckTransaction::Cheap); //Signature will be checked later
ImportResult ir = import(t);
m_onImport(ir, t.sha3(), work.nodeId);
}
catch (...)
{
// should not happen as exceptions are handled in import.
cwarn << "Bad transaction:" << boost::current_exception_diagnostic_information();
}
}
}
这里是一个简单的生产消费者队列,先将UnverifiedTransaction
取出,然后调用import()
函数进行校验。由于使用了线程,校验过程是异步的。
-
自身提交交易
节点自身提交的交易与上面的交易不同,是同步的,也就是直接会调用import()
函数来进行校验。ImportResult TransactionQueue::import(bytesConstRef _transactionRLP, IfDropped _ik) { try { Transaction t = Transaction(_transactionRLP, CheckTransaction::Everything); return import(t, _ik); } catch (Exception const&) { return ImportResult::Malformed; } }
交易的校验
我们来看这个校验的过程,也就是TransactionQueue::import()
函数。
ImportResult TransactionQueue::import(Transaction const& _transaction, IfDropped _ik)
{
if (_transaction.hasZeroSignature())
return ImportResult::ZeroSignature;
// Check if we already know this transaction.
h256 h = _transaction.sha3(WithSignature);
ImportResult ret;
{
UpgradableGuard l(m_lock);
auto ir = check_WITH_LOCK(h, _ik);
if (ir != ImportResult::Success)
return ir;
{
_transaction.safeSender(); // Perform EC recovery outside of the write lock
UpgradeGuard ul(l);
ret = manageImport_WITH_LOCK(h, _transaction);
}
}
return ret;
}
可以看到先是调用TransactionQueue::check_WITH_LOCK()
函数来做一个简单检查。检查的过程是看这个交易是否是已经校验过的,是否是之前被删除的。
接着调用_transaction.safeSender()
,这个函数是通过签名反推sender
,在交易那一节已经说过。最后调用manageImport_WITH_LOCK()
函数来处理交易。
manageImport_WITH_LOCK()
函数过程稍稍复杂,我们可以一步一步来分析。
第一步:
auto cs = m_currentByAddressAndNonce.find(_transaction.from());
if (cs != m_currentByAddressAndNonce.end())
{
auto t = cs->second.find(_transaction.nonce());
if (t != cs->second.end())
{
if (_transaction.gasPrice() < (*t->second).transaction.gasPrice())
return ImportResult::OverbidGasPrice;
else
{
h256 dropped = (*t->second).transaction.sha3();
remove_WITH_LOCK(dropped);
m_onReplaced(dropped);
}
}
}
这一步是检查m_currentByAddressAndNonce
中是否有重复项,判断标准是sender
和nonce
,如果存在重复的则检查gasPrice
,如果新的交易gasPrice
更低,则校验失败,否则将现有的交易删除,替换为gasPrice
更高的交易。
第二步是检查m_future
,检查过程与第一步类似。
第三步:
// If valid, append to transactions.
insertCurrent_WITH_LOCK(make_pair(_h, _transaction));
LOG(m_loggerDetail) << "Queued vaguely legit-looking transaction " << _h;
while (m_current.size() > m_limit)
{
LOG(m_loggerDetail) << "Dropping out of bounds transaction " << _h;
remove_WITH_LOCK(m_current.rbegin()->transaction.sha3());
}
m_onReady();
这里调用insertCurrent_WITH_LOCK()
插入队列,然后将超出容量m_limit
的部分删除,并调用m_onReady()
表示目前队列有数据了,可以来取了。
我们再来看insertCurrent_WITH_LOCK()
函数是怎么将交易插入队列的。
void TransactionQueue::insertCurrent_WITH_LOCK(std::pair<h256, Transaction> const& _p)
{
if (m_currentByHash.count(_p.first))
{
cwarn << "Transaction hash" << _p.first << "already in current?!";
return;
}
Transaction const& t = _p.second;
// Insert into current
auto inserted = m_currentByAddressAndNonce[t.from()].insert(std::make_pair(t.nonce(), PriorityQueue::iterator()));
PriorityQueue::iterator handle = m_current.emplace(VerifiedTransaction(t));
inserted.first->second = handle;
m_currentByHash[_p.first] = handle;
// Move following transactions from future to current
makeCurrent_WITH_LOCK(t);
m_known.insert(_p.first);
}
先还是检查是否有重复项,然后把交易插入m_current
里,并记下插入的位置(迭代器),再分别加入m_currentByAddressAndNonce
和m_currentByHash
中。
注意到末尾还有个makeCurrent_WITH_LOCK()
的调用,这个是看情况将m_future
里的交易移到m_current
中。
交易队列的输出
交易队列输出只有一个,那就是区块block。在Block::sync()
中会调用TransactionQueue::topTransactions()
来取队列的前N项数据,再加入block
中。