swiftmultithreadinggrand-central-dispatchdata-synchronizationbarrier

How to implement a Thread Safe HashTable (PhoneBook) Data Structure in Swift?


I am trying to implement a Thread-Safe PhoneBook object. The phone book should be able to add a person, and look up a person based on their name and phoneNumber. From an implementation perspective this simply involves two hash tables, one associating name -> Person and another associating phone# -> Person.

The caveat is I want this object to be threadSafe. This means I would like to be able to support concurrent lookups in the PhoneBook while ensuring only one thread can add a Person to the PhoneBook at a time. This is the basic reader-writers problem, and I am trying to solve this using GrandCentralDispatch and dispatch barriers. I am struggling to solve this though as I am running into issues.. Below is my Swift playground code:

//: Playground - noun: a place where people can play

import UIKit
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

public class Person: CustomStringConvertible {
    public var description: String {
        get {
            return "Person: \(name), \(phoneNumber)"
        }
    }

    public var name: String
    public var phoneNumber: String
    private var readLock = ReaderWriterLock()

    public init(name: String, phoneNumber: String) {
        self.name = name
        self.phoneNumber = phoneNumber
    }


    public func uniquePerson() -> Person {
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    }
}

public enum Qos {
    case threadSafe, none
}

public class PhoneBook {

    private var qualityOfService: Qos = .none
    public var nameToPersonMap = [String: Person]()
    public var phoneNumberToPersonMap = [String: Person]()
    private var readWriteLock = ReaderWriterLock()


    public init(_ qos: Qos) {
        self.qualityOfService = qos
    }

    public func personByName(_ name: String) -> Person? {
        var person: Person? = nil
        if qualityOfService == .threadSafe {
            readWriteLock.concurrentlyRead { [weak self] in
                guard let strongSelf = self else { return }
                person = strongSelf.nameToPersonMap[name]
            }
        } else {
            person = nameToPersonMap[name]
        }

        return person
    }

    public func personByPhoneNumber( _ phoneNumber: String) -> Person? {
        var person: Person? = nil
        if qualityOfService == .threadSafe {
            readWriteLock.concurrentlyRead { [weak self] in
                guard let strongSelf = self else { return }
                person = strongSelf.phoneNumberToPersonMap[phoneNumber]
            }
        } else {
            person = phoneNumberToPersonMap[phoneNumber]
        }

        return person
    }

    public func addPerson(_ person: Person) {
        if qualityOfService == .threadSafe {
            readWriteLock.exclusivelyWrite { [weak self] in
                guard let strongSelf = self else { return }
                strongSelf.nameToPersonMap[person.name] = person
                strongSelf.phoneNumberToPersonMap[person.phoneNumber] = person
            }
        } else {
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        }
    }

}


// A ReaderWriterLock implemented using GCD and OS Barriers.
public class ReaderWriterLock {

    private let concurrentQueue = DispatchQueue(label: "com.ReaderWriterLock.Queue", attributes: DispatchQueue.Attributes.concurrent)
    private var writeClosure: (() -> Void)!

    public func concurrentlyRead(_ readClosure: (() -> Void)) {
        concurrentQueue.sync {
            readClosure()
        }
    }

    public func exclusivelyWrite(_ writeClosure: @escaping (() -> Void)) {
        self.writeClosure = writeClosure
        concurrentQueue.async(flags: .barrier) { [weak self] in
            guard let strongSelf = self else { return }
            strongSelf.writeClosure()
        }
    }

}

// MARK: Testing the synchronization and thread-safety

for _ in 0..<5 {
    let iterations = 1000
    let phoneBook = PhoneBook(.none)

    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: DispatchQueue.Attributes.concurrent)
    for _ in 0..<iterations {
        let person = Person(name: "", phoneNumber: "").uniquePerson()
        concurrentTestQueue.async {
            phoneBook.addPerson(person)
        }
    }

    sleep(10)
    print(phoneBook.nameToPersonMap.count)
}

To test my code I run 1000 concurrent threads that simply add a new Person to the PhoneBook. Each Person is unique so after the 1000 threads complete I am expecting the PhoneBook to contain a count of 1000. Everytime I perform a write I perform a dispatch_barrier call, update the hash tables, and return. To my knowledge this is all we need to do; however, after repeated runs of the 1000 threads I get the number of entries in the PhoneBook to be inconsistent and all over the place:

Phone Book Entries: 856
Phone Book Entries: 901
Phone Book Entries: 876
Phone Book Entries: 902
Phone Book Entries: 912

Can anyone please help me figure out what is going on? Is there something wrong with my locking code or even worse something wrong with how my test is constructed? I am very new to this multi-threaded problem space, thanks!


Solution

  • The problem is your ReaderWriterLock. You are saving the writeClosure as a property, and then asynchronously dispatching a closure that calls that saved property. But if another exclusiveWrite came in during the intervening period of time, your writeClosure property would be replaced with the new closure.

    In this case, it means that you can be adding the same Person multiple times. And because you're using a dictionary, those duplicates have the same key, and therefore don't result in you're seeing all 1000 entries.

    You can actually simplify ReaderWriterLock, completely eliminating that property. I’d also make concurrentRead a generic, returning the value (just like sync does), and rethrowing any errors (if any).

    public class ReaderWriterLock {
        private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
        
        public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
            return try queue.sync {
                try block()
            }
        }
        
        public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
            queue.async(flags: .barrier) {
                block()
            }
        }
    }
    

    A couple of other, unrelated observations:

    1. By the way, this simplified ReaderWriterLock happens to solves another concern. That writeClosure property, which we've now removed, could have easily introduced a strong reference cycle.

      Yes, you were scrupulous about using [weak self], so there wasn't any strong reference cycle, but it was possible. I would advise that wherever you employ a closure property, that you set that closure property to nil when you're done with it, so any strong references that closure may have accidentally entailed will be resolved. That way a persistent strong reference cycle is never possible. (Plus, the closure itself and any local variables or other external references it has will be resolved.)

    2. You're sleeping for 10 seconds. That should be more than enough, but I'd advise against just adding random sleep calls (because you never can be 100% sure). Fortunately, you have a concurrent queue, so you can use that:

      concurrentTestQueue.async(flags: .barrier) { 
          print(phoneBook.count) 
      }
      

      Because of that barrier, it will wait until everything else you put on that queue is done.

    3. Note, I did not just print nameToPersonMap.count. This array has been carefully synchronized within PhoneBook, so you can't just let random, external classes access it directly without synchronization.

      Whenever you have some property which you're synchronizing internally, it should be private and then create a thread-safe function/variable to retrieve whatever you need:

      public class PhoneBook {
      
          private var nameToPersonMap = [String: Person]()
          private var phoneNumberToPersonMap = [String: Person]()
      
          ...
      
          var count: Int {
              return readWriteLock.concurrentlyRead {
                  nameToPersonMap.count
              }
          }
      }
      
    4. You say you're testing thread safety, but then created PhoneBook with .none option (achieving no thread-safety). In that scenario, I'd expect problems. You have to create your PhoneBook with the .threadSafe option.

    5. You have a number of strongSelf patterns. That's rather unswifty. It is generally not needed in Swift as you can use [weak self] and then just do optional chaining.

    Pulling all of this together, here is my final playground:

    PlaygroundPage.current.needsIndefiniteExecution = true
    
    public class Person {
        public let name: String
        public let phoneNumber: String
        
        public init(name: String, phoneNumber: String) {
            self.name = name
            self.phoneNumber = phoneNumber
        }
        
        public static func uniquePerson() -> Person {
            let randomID = UUID().uuidString
            return Person(name: randomID, phoneNumber: randomID)
        }
    }
    
    extension Person: CustomStringConvertible {
        public var description: String {
            return "Person: \(name), \(phoneNumber)"
        }
    }
    
    public enum ThreadSafety { // Changed the name from Qos, because this has nothing to do with quality of service, but is just a question of thread safety
        case threadSafe, none
    }
    
    public class PhoneBook {
        
        private var threadSafety: ThreadSafety
        private var nameToPersonMap = [String: Person]()        // if you're synchronizing these, you really shouldn't expose them to the public
        private var phoneNumberToPersonMap = [String: Person]() // if you're synchronizing these, you really shouldn't expose them to the public
        private var readWriteLock = ReaderWriterLock()
        
        public init(_ threadSafety: ThreadSafety) {
            self.threadSafety = threadSafety
        }
        
        public func personByName(_ name: String) -> Person? {
            if threadSafety == .threadSafe {
                return readWriteLock.concurrentlyRead { [weak self] in
                    self?.nameToPersonMap[name]
                }
            } else {
                return nameToPersonMap[name]
            }
        }
        
        public func personByPhoneNumber(_ phoneNumber: String) -> Person? {
            if threadSafety == .threadSafe {
                return readWriteLock.concurrentlyRead { [weak self] in
                    self?.phoneNumberToPersonMap[phoneNumber]
                }
            } else {
                return phoneNumberToPersonMap[phoneNumber]
            }
        }
        
        public func addPerson(_ person: Person) {
            if threadSafety == .threadSafe {
                readWriteLock.exclusivelyWrite { [weak self] in
                    self?.nameToPersonMap[person.name] = person
                    self?.phoneNumberToPersonMap[person.phoneNumber] = person
                }
            } else {
                nameToPersonMap[person.name] = person
                phoneNumberToPersonMap[person.phoneNumber] = person
            }
        }
        
        var count: Int {
            return readWriteLock.concurrentlyRead {
                nameToPersonMap.count
            }
        }
    }
    
    // A ReaderWriterLock implemented using GCD concurrent queue and barriers.
    
    public class ReaderWriterLock {
        private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
        
        public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T {
            return try queue.sync {
                try block()
            }
        }
        
        public func exclusivelyWrite(_ block: @escaping (() -> Void)) {
            queue.async(flags: .barrier) {
                block()
            }
        }
    }
    
    
    for _ in 0 ..< 5 {
        let iterations = 1000
        let phoneBook = PhoneBook(.threadSafe)
        
        let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: .concurrent)
        for _ in 0..<iterations {
            let person = Person.uniquePerson()
            concurrentTestQueue.async {
                phoneBook.addPerson(person)
            }
        }
        
        concurrentTestQueue.async(flags: .barrier) {
            print(phoneBook.count)
        }
    }
    

    Personally, I'd be inclined to take it a step further and

    For example:

    public struct Person {
        public let name: String
        public let phoneNumber: String
        
        public static func uniquePerson() -> Person {
            return Person(name: UUID().uuidString, phoneNumber: UUID().uuidString)
        }
    }
    
    public struct PhoneBook {
        
        private var synchronizedPeople = Synchronized([Person]())
        
        public func people(name: String? = nil, phone: String? = nil) -> [Person]? {
            return synchronizedPeople.value.filter {
                (name == nil || $0.name == name) && (phone == nil || $0.phoneNumber == phone)
            }
        }
        
        public func append(_ person: Person) {
            synchronizedPeople.writer { people in
                people.append(person)
            }
        }
        
        public var count: Int {
            return synchronizedPeople.reader { $0.count }
        }
    }
    
    /// A structure to provide thread-safe access to some underlying object using reader-writer pattern.
    
    public class Synchronized<T> {
        /// Private value. Use `public` `value` computed property (or `reader` and `writer` methods)
        /// for safe, thread-safe access to this underlying value.
        
        private var _value: T
        
        /// Private reader-write synchronization queue
        
        private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".synchronized", qos: .default, attributes: .concurrent)
        
        /// Create `Synchronized` object
        ///
        /// - Parameter value: The initial value to be synchronized.
        
        public init(_ value: T) {
            _value = value
        }
        
        /// A threadsafe variable to set and get the underlying object, as a convenience when higher level synchronization is not needed        
        
        public var value: T {
            get { reader { $0 } }
            set { writer { $0 = newValue } }
        }
        
        /// A "reader" method to allow thread-safe, read-only concurrent access to the underlying object.
        ///
        /// - Warning: If the underlying object is a reference type, you are responsible for making sure you
        ///            do not mutating anything. If you stick with value types (`struct` or primitive types),
        ///            this will be enforced for you.
        
        public func reader<U>(_ block: (T) throws -> U) rethrows -> U {
            return try queue.sync { try block(_value) }
        }
        
        /// A "writer" method to allow thread-safe write with barrier to the underlying object
        
        func writer(_ block: @escaping (inout T) -> Void) {
            queue.async(flags: .barrier) {
                block(&self._value)
            }
        }
    }