C++ folly库解读(三)Synchronized —— 比std::lock_guard/std::unique_lock更易用、功能更强大的同步机制

2021年6月19日 16点热度 0条评论 来源: 张雅宸

目录

  • 传统同步方案的缺点
  • folly/Synchronized.h 简单使用
  • Synchronized 的模板参数
  • withLock()/withRLock()/withWLock() —— 更易用的加锁方式
  • 升级锁
  • ulock()和 withULockPtr()
  • Timed Locking
  • Synchronized 与 std::condition_variable
  • acquireLocked() —— 同时锁多个数据
  • 使用一把锁,锁多个数据
  • struct
  • std::tuple
  • Benchmark

folly/Synchronized.h 提供了一种更简单、更不容易出错的同步机制,可以用来替代传统 C++标准库中使用较复杂、较容易出错的同步机制。

传统同步方案的缺点

一般是将需要同步的数据和锁一一配对,即 —— associate mutexes with data, not code :

class RequestHandler {
  ...
  std::mutex requestMutex_;
  RequestQueue requestQueue_;

  processRequest(const Request& request);
};

void RequestHandler::processRequest(const Request& request) {
  std::lock_guard<std::mutex> lg(requestMutex_);
  requestQueue_.push_back(request);
}

然而,操作这些数据成员,开发人员必须注意,正确的获取锁、获取正确的锁。

一些常见的错误包括:

  • 操作数据之前没有获取锁。
  • 获取了不配对的锁,这个锁不是用来锁这个数据的。
  • 获取了读锁,但是试图去修改数据。
  • 获取了写锁,但是对数据只有 const access.

一般在使用时,需要提醒开发人员:“别忘了 xxxx”,那一般都会出错,比如 new 的对象别忘了 delete : )

folly/Synchronized.h 简单使用

上面的代码可以用 folly/Synchronized.h 重写为:

class RequestHandler {
  folly::Synchronized<RequestQueue> requestQueue_;

  processRequest(const Request& request);
};

void RequestHandler::processRequest(const Request& request) {
  requestQueue_.wlock()->push_back(request);
}

为什么 folly/Synchronized.h 更加有效呢?

  • 与传统使用方式不同,这里锁和数据是结合成了一个对象 —— requestQueue_。传统方案中,需要寻找锁和数据的配对关系。
  • 几乎不可能在不获取锁的情况下,去操作数据,还是因为它们被封装成了一个对象。传统方案加不加锁全靠自觉。
  • 在 push_back 后,锁立即被释放。

如果在临界区有多个操作,那么可以使用如下方法:

{
      auto lockedQueue = requestQueue_.wlock();
      lockedQueue->push_back(request1);
      lockedQueue->push_back(request2);
}

wlock 返回一个 LockedPtr 对象,这个对象可以被理解为指向数据成员的指针。只有这个对象存在,那么锁就会被锁住,所以最好为这个对象显示定义一个 scope.

更好的方式,是使用 lambdas :

void RequestHandler::processRequest(const Request& request) {
      requestQueue_.withWLock([&](auto& queue "&") {
        // withWLock() automatically holds the lock for the
        // duration of this lambda function
        queue.push_back(request);
      });
}

使用 withWLock 配合 lambdas 强制定义了一个 scope,更清晰。

Synchronized 的模板参数

Synchronized 有两个模板参数,数据类型和锁类型:

template <class T, class Mutex = SharedMutex>

如果不指定第二个模板参数,默认是 folly::SharedMutex。只要被 folly::LockTraits 支持的都可以使用,比如 std::mutex、std::recursive_mutex、std::timed_mutex,。std::recursive_timed_mutex、folly::SharedMutex、folly::RWSpinLock、folly::SpinLock.

根据锁类型的不同,Synchronized 会提供不同的 API:

  • 共享锁和升级锁:如果存在 lock_shared()成员函数,Synchronized 会提供 wlock(),rlock(),ulock()三个方法来获取不同的锁类型。其中,rlock()只提供对数据成员 const access.
  • 排他锁:lock()

withLock()/withRLock()/withWLock() —— 更易用的加锁方式

withLock()在上面提到过了,可以用来替代 lock()。在持有锁的期间,执行一个 lambda 或者 function. withRLock()/withWLock()同理可以替代 rlock()/wlock().

我们再详细说一下这种方式的好处。下面的函数将 vector 里的所有元素都 double:

auto locked = vec.lock();
for (int& n : *locked) {
    n *= 2;
}

使用 lock()/wlock()/rlock()的一个重要注意事项:一个指向数据的指针或者引用,它的生命周期一定不要比 LockedPtr 对象长(lock()/wlock()/rlock()的返回值类型)。 如果我们将上面的例子这样写就会出问题:

// No. NO. NO!
for (int& n : *vec.wlock()) {
      n *= 2;
}

vec.wlock()返回的 LockPtr 对象在 range iterators 建立后就销毁了(详细解释见 Range-based for loop Temporary range expression 小节),range iterators 指向了 vector data,但此时锁已经被释放。想想如果要 debug 这种问题,会用多少时间

这时 withLock()/withRLock()/withWLock()的好处就体现出来了,锁会在 for loop 期间一直持有:

vec.withLock([](auto& data "") {
    for (int& n : data) {
        n *= 2;
    }
});

withLock 定义为(withRLock/withWLock 类似):

  /**
   * Invoke a function while holding the lock.
   *
   * A reference to the datum will be passed into the function as its only
   * argument.
   *
   * This can be used with a lambda argument for easily defining small critical
   * sections in the code.  For example:
   *
   *   auto value = obj.withLock([](auto& data "") {
   *     data.doStuff();
   *     return data.getValue();
   *   });
   */
  template <class Function>
  auto withLock(Function&& function) {
    return function(*lock());
  }

  template <class Function>
  auto withLock(Function&& function) const {
    return function(*lock());
  }

升级锁

ulock()和 withULockPtr()

Synchronized 还支持升级锁。升级锁与共享锁可以共存,但是与排它锁互斥。

/**
 * An enum to describe the "level" of a mutex.  The supported levels are
 *  Unique - a normal mutex that supports only exclusive locking
 *  Shared - a shared mutex which has shared locking and unlocking functions;
 *  Upgrade - a mutex that has all the methods of the two above along with
 *            support for upgradable locking
 */
enum class MutexLevel { UNIQUE, SHARED, UPGRADE };

升级锁解决的问题是:先对数据进行读操作,然后根据一定的条件会进行写操作。

升级锁可以通过 uclock()或者 withULockPtr()获得:

{
    // only const access allowed to the underlying object when an upgrade lock
    // is acquired
    auto ulock = vec.ulock();
    auto newSize = ulock->size();
}

auto newSize = vec.withULockPtr([](auto ulock "") {
    // only const access allowed to the underlying object when an upgrade lock
    // is acquired
    return ulock->size();
});

通过下面的函数可以进行升级或者降级:

  • moveFromUpgradeToWrite()
  • moveFromWriteToUpgrade()
  • moveFromWriteToRead() // withWLockPtr()获得的 wlock 可以调用此函数降级为 rlock
  • moveFromUpgradeToRead()

调用这些函数的 LockedPtr 会被设置为 invalid null state,并返回另一个锁住特定锁的 LockedPtr。这些操作都是原子性的,中间不会出现 unlocked 状态。

比如现在有一个 cache,数据结构为 unordered_map,需求是先检查对应的 key 是否在 unordered_map 中,如果在则返回对应的 value,不在则初始化 value 为 0:

folly::Synchronized<std::unordered_map<int64_t, int64_t>> cache;

int64_t res = cache.withULockPtr([key,value](auto ulock "key,value") {
    int64_t cache_value;
    auto iter = ulock->find(key);
    if (iter != ulock->end()) {
        cache_value = iter->second;
    } else {
        cache_value = 0;

        // ulock is now null
        auto wlock = ulock.moveFromUpgradeToWrite();
        (*wlock)[key] = cache_value;
    }

    return cache_value;
});

Timed Locking

如果初始化 Synchronized 的锁类型支持时间,lock()/wlock()/rlock()可以传入一个类型为 std::chrono::duration 的参数:

void fun(Synchronized<vector<string>>& vec) {
      {
        auto locked = vec.lock(10ms);
        if (!locked) {
          throw std::runtime_error("failed to acquire lock");
        }
        locked->push_back("hello");
        locked->push_back("world");
      }
      LOG(INFO) << "successfully added greeting";
}

Synchronized 与 std::condition_variable

如果 Synchronized 的锁类型是 std::mutex,那么可以和 std::condition_variable 配合使用。

Synchronized<vector<string>, std::mutex> vec;
std::condition_variable emptySignal;

// Assuming some other thread will put data on vec and signal
// emptySignal, we can then wait on it as follows:
auto locked = vec.lock();
emptySignal.wait(locked.getUniqueLock(),
                    [&] { return !locked->empty(); });

getUniqueLock()返回一个 std::unique_lockstd::mutex的引用。但是不推荐这么使用,因为这绕过了 Synchronized 的 API,可以直接操作对应的锁:

 /**
   * Get a reference to the std::unique_lock.
   *
   * This is provided so that callers can use Synchronized<T, std::mutex>
   * with a std::condition_variable.
   *
   * While this API could be used to bypass the normal Synchronized APIs and
   * manually interact with the underlying unique_lock, this is strongly
   * discouraged.
   */
  std::unique_lock<std::mutex>& getUniqueLock() { return lock_; }

acquireLocked() —— 同时锁多个数据

假如需要将一个 vector 的数据拷贝到另一个 vector,wlock()可能会实现需求:

void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
    auto lockedA = a.wlock();
    auto lockedB = b.wlock();
    ... use lockedA and lockedB ...
}

但是如果一个线程调用 fun(x,y),另一个线程调用 func(y,x),就很有可能出现死锁。经典的解决方式是,所有的线程以同样的顺序获取锁。许多库的实现是通过比较锁地址的大小来决定加锁顺序:

void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
    auto ret = folly::acquireLocked(a, b);
    auto& lockedA = std::get<0>(ret);
    auto& lockedB = std::get<1>(ret);
    ... use lockedA and lockedB ...
}

// 实现:通过比较锁地址的大小
/**
 * Acquire locks for multiple Synchronized<T> objects, in a deadlock-safe
 * manner.
 *
 * The locks are acquired in order from lowest address to highest address.
 * (Note that this is not necessarily the same algorithm used by std::lock().)
 * For parameters that are const and support shared locks, a read lock is
 * acquired.  Otherwise an exclusive lock is acquired.
 *
 * use lock() with folly::wlock(), folly::rlock() and folly::ulock() for
 * arbitrary locking without causing a deadlock (as much as possible), with the
 * same effects as std::lock()
 */
template <class Sync1, class Sync2>
std::tuple<detail::LockedPtrType<Sync1>, detail::LockedPtrType<Sync2>>
acquireLocked(Sync1& l1, Sync2& l2) {
  if (static_cast<const void*>(&l1) < static_cast<const void*>(&l2)) {
    auto p1 = l1.contextualLock();
    auto p2 = l2.contextualLock();
    return std::make_tuple(std::move(p1), std::move(p2));
  } else {
    auto p2 = l2.contextualLock();
    auto p1 = l1.contextualLock();
    return std::make_tuple(std::move(p1), std::move(p2));
  }
}

C++17 引入了 structured binding syntax,可以使代码更简单:

void fun(Synchronized<vector<int>>& a, Synchronized<vector<int>>& b) {
     auto [lockedA, lockedB] = folly::acquireLocked(a, b);
     ... use lockedA and lockedB ...
}

acquireLockedPair()返回 std::pair,在不支持 C++17 的编译器情况下,使用也很方便。

使用一把锁,锁多个数据

比如一个 bidirectional map,需要同时操作。一般有两个方案:

Struct

class Server {
    struct BiMap {
    map<int, string> direct;
    map<string, int> inverse;
    };
    Synchronized<BiMap> bimap_;
    ...
};

...
bimap_.withLock([](auto& locked "") {
    locked.direct[0] = "zero";
    locked.inverse["zero"] = 0;
});

std::tuple

class Server {
    Synchronized<tuple<map<int, string>, map<string, int>>> bimap_;
    ...
};

...
bimap_.withLock([](auto& locked "") {
    get<0>(locked)[0] = "zero";
    get<1>(locked)["zero"] = 0;
});

Benchmark

SynchronizedBenchmark.cpp

下篇文章写一下 Synchronized 的基本实现 :)

参考资料:

(完)

朋友们可以关注下我的公众号,获得最及时的更新:

    原文作者:张雅宸
    原文地址: https://www.cnblogs.com/zhangyachen/p/14900963.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。