iOS多線程開發:幾個容易被忽略的細節

Eternal_Love 2019-06-06 13:31:38 5617

一般情況下,iOS開發者只要會使用GCD、@synchronized、NSLock等幾個簡單的API,就可以應對大部分多線程開發了,不過這樣是否真正做到了多線程安全,又是否真正充分利用了多線程的效率優勢呢?看看以下幾個容易被忽略的細節。

讀者寫者問題(Readers-writers problem)

先看下讀者寫者問題的描述:

有讀者和寫者兩組并發線程,共享同一數據,當兩個或以上的讀線程同時訪問共享數據時不會產生副作用,但若某個寫線程和其他線程(讀線程或寫線程)同時訪問共享數據時則可能導致數據不一致的錯誤。因此要求:

  • 允許多個讀者可以同時對共享數據執行讀操作;

  • 只允許一個寫者寫共享數據;

  • 任一寫者在完成寫操作之前不允許其他讀者或寫者工作;

  • 寫者執行寫操作前,應讓已有的讀者和寫者全部退出。

從以上描述可以得知,所謂“讀者寫者問題”是指保證一個寫線程必須與其他線程互斥地訪問共享對象的同步問題,允許并發讀操作,但是寫操作必須和其他讀寫操作是互斥的。

大部分客戶端App做的事情無非就是從網絡拉取最新數據、加工數據、展現列表,這個過程中既有拿到最新數據后寫入本地的操作,也有上層業務對本地數據的讀取操作,因此會牽涉大量的多線程讀寫操作,很顯然,這些基本都屬于讀者寫者問題的范疇[1]。

然而筆者注意到,在遇到多線程讀寫問題時,多數iOS開發者都會立即想到加鎖,或者干脆避免使用多線程,但卻少有人會嘗試用讀者寫者問題的思路去進一步提升效率。

以下是實現一個簡單cache的示例代碼:

//實現一個簡單的cache
- (void)setCache:(id)cacheObject forKey:(NSString *)key {
    if (key.length == 0) {
        return;
    }
    [_cacheLock lock];
    self.cacheDic[key] = cacheObject;
    ...
    [_cacheLock unlock];
}

- (id)cacheForKey:(NSString *key) {
    if (key.length == 0) {
        return nil;
    }
    [_cacheLock lock];
    id cacheObject = self.cacheDic[key];
    ...
    [_cacheLock unlock];
    return cacheObject;
}

上述代碼用互斥鎖來實現多線程讀寫,做到了數據的安全讀寫,但是效率卻并不是最高的,因為這種情況下,雖然寫操作和其他操作之間是互斥的,但同時讀操作之間卻也是互斥的,這會浪費cpu資源,如何改良呢?不難發現,這其實是個典型的讀者寫者問題。先看下解決讀者寫者問題的偽代碼:

semaphore ReaderWriterMutex = 1;    //實現讀寫互斥
int Rcount = 0;             //讀者數量
semaphore CountMutex = 1;   //讀者修改計數互斥

writer(){
    while(true){
        P(ReaderWriterMutex);
        write;
        V(ReaderWriterMutex);   
    }
    
}

reader(){
    while(true){
        P(CountMutex);
        if(Rcount == 0)     //當第一個讀者進來時,阻塞寫者
            P(ReaderWriterMutex);
        ++Rcount;
        V(CountMutex);
        
        read;
        
        P(CountMutex);
        --Rcount;
        if(Rcount == 0)
            V(ReaderWriterMutex);   //當最后一個讀者離開后,釋放寫者
        V(CountMutex);
    }
}

在iOS中,上述代碼中的PV原語可以替換成GCD中的信號量API,dispatch_semaphore_t來實現,但是需要額外維護一個readerCount以及實現readerCount互斥訪問的信號量,手動實現比較麻煩,封裝成統一接口有一定難度。不過好在iOS開發中可以找到現成的讀者寫者鎖:

pthread_rwlock_t

這是一個古老的C語言層面的函數,用法如下:

// Initialization of lock, pthread_rwlock_t is a value type and must be declared as var in order to refer it later. Make sure not to copy it.
var lock = pthread_rwlock_t()
pthread_rwlock_init(&lock, nil)

// Protecting read section:
pthread_rwlock_rdlock(&lock)
// Read shared resource
pthread_rwlock_unlock(&lock)

// Protecting write section:
pthread_rwlock_wrlock(&lock)
// Write shared resource
pthread_rwlock_unlock(&lock)

// Clean up
pthread_rwlock_destroy(&lock)

接口簡潔但是卻不友好,需要注意pthread_rwlock_t是值類型,用=賦值會直接拷貝,不小心就會浪費內存,另外用完后還需要記得銷毀,容易出錯,有沒有更高級更易用的API呢?

GCD barrier

dispatch_barrier_async / dispatch_barrier_sync并不是專門用來解決讀者寫者問題的,barrier主要用于以下場景:當執行某一任務A時,需要該隊列上之前添加的所有操作都執行完,而之后添加進來的任務,需要等待任務A執行完畢才可以執行,從而達到將任務A隔離的目的,具體過程如下圖所示:

image.png

如果將barrier任務之前和之后的并發任務換為讀操作,barrier任務本身換為寫操作,就可以將dispatch_barrier_async / dispatch_barrier_sync當做讀者寫者鎖來使用了,下面把文初的使用普通鎖實現的cache代碼,用dispatch_barrier_async重寫,做下對比:

//實現一個簡單的cache(使用普通鎖)
- (void)setCache:(id)cacheObject forKey:(NSString *)key {
    if (key.length == 0) {
        return;
    }
    [_cacheLock lock];
    self.cacheDic[key] = cacheObject;
    ...
    [_cacheLock unlock];
}

- (id)cacheForKey:(NSString *key) {
    if (key.length == 0) {
        return nil;
    }
    [_cacheLock lock];
    id cacheObject = self.cacheDic[key];
    ...
    [_cacheLock unlock];
    return cacheObject;
}
//實現一個簡單的cache(使用讀者寫者鎖)
static dispatch_queue_t queue = dispatch_queue_create("com.gfzq.testQueue", DISPATCH_QUEUE_CONCURRENT);

- (void)setCache:(id)cacheObject forKey:(NSString *)key {
    if (key.length == 0) {
        return;
    }
    dispatch_barrier_async(queue, ^{
        self.cacheDic[key] = cacheObject;
        ...
    });
}

- (id)cacheForKey:(NSString *key) {
    if (key.length == 0) {
        return nil;
    }
    __block id cacheObject = nil;
    dispatch_sync(queue, ^{
        cacheObject = self.cacheDic[key];
        ...
    });
    return cacheObject;
}

這樣實現的cache就可以并發執行讀操作,同時又有效地隔離了寫操作,兼顧了安全和效率。

對于聲明為atomic而且又自己手動實現getter或者setter的屬性,也可以用barrier來改進:

@property (atomic, copy) NSString *someString;

- (NSString *)someString {
    __block NSString *tempString;
    dispatch_sync(_syncQueue, ^{
        tempString = _someString;
    });
    return tempString;
}

- (void)setSomeString :(NSString *)someString {
    dispatch_barrier_async(_syncQueue, ^{
        _someString = someString
        ...
    }
}

在做到atomic的同時,getter之間還可以并發執行,比直接把setter和getter都放到串行隊列或者加普通鎖要更高效。

讀者寫者鎖能提升多少效率?

使用讀者寫者鎖一定比所有讀寫都加鎖以及使用串行隊列要快,但是到底能快多少呢?Dmytro Anokhin在[3]中做了實驗對比,測出了分別使用NSLock、GCD barrier和pthread_rwlock時獲取鎖所需要的平均時間,實驗樣本數在100到1000之間,去掉最高和最低的10%,結果如下列圖表所示:

image.png

3 writers / 10 readers

image.png

1 writer / 10 readers

image.png

5 writers / 5 readers

image.png

10 writers / 1 reader


分析可知:

(1)使用讀者寫者鎖(GCD barrier、pthread_rwlock),相比單純使用普通鎖(NSLock),效率有顯著提升;

(2)讀者數量越多,寫者數量越少,使用讀者寫者鎖的效率優勢越明顯;

(3)使用GCD barrier和使用pthread_rwlock的效率差異不大。

由于pthread_rwlock不易使用且容易出錯,而且GCD barrier和pthread_rwlock對比性能相當,建議使用GCD barrier來解決iOS開發中遇到的讀者寫者問題。另外,使用GCD還有個潛在優勢:GCD面向隊列而非線程,dispatch至某一隊列的任務,可能在任一線程上執行,這些對開發者是透明的,這樣設計的好處顯而易見,GCD可以根據實際情況從自己管理的線程池中挑選出開銷最小的線程來執行任務,最大程度減小context切換次數。

image.png

何時使用讀者寫者鎖

需要注意的是,并非所有的多線程讀寫場景都一定是讀者寫者問題,使用時要注意辨別。例如以下YYCache的代碼:

//讀cache
- (id)objectForKey:(id)key {
    if (!key) return nil;
    pthread_mutex_lock(&_lock);
    _YYLinkedMapNode *node = CFDictionaryGetValue(_lru->_dic, (__bridge const void *)(key));
    if (node) {
        node->_time = CACurrentMediaTime();
        [_lru bringNodeToHead:node];
    }
    pthread_mutex_unlock(&_lock);
    return node ? node->_value : nil;
}
//寫cache
- (void)setObject:(id)object forKey:(id)key withCost:(NSUInteger)cost {
    if (!key) return;
    if (!object) {
        [self removeObjectForKey:key];
        return;
    }
    pthread_mutex_lock(&_lock);
    _YYLinkedMapNode *node = CFDictionaryGetValue(_lru->_dic, (__bridge const void *)(key));
    NSTimeInterval now = CACurrentMediaTime();
    if (node) {
        _lru->_totalCost -= node->_cost;
        _lru->_totalCost += cost;
        node->_cost = cost;
        node->_time = now;
        node->_value = object;
        [_lru bringNodeToHead:node];
    } else {
        node = [_YYLinkedMapNode new];
        node->_cost = cost;
        node->_time = now;
        node->_key = key;
        node->_value = object;
        [_lru insertNodeAtHead:node];
    }
    if (_lru->_totalCost > _costLimit) {
        dispatch_async(_queue, ^{
            [self trimToCost:_costLimit];
        });
    }
    if (_lru->_totalCount > _countLimit) {
        _YYLinkedMapNode *node = [_lru removeTailNode];
        if (_lru->_releaseAsynchronously) {
            dispatch_queue_t queue = _lru->_releaseOnMainThread ? dispatch_get_main_queue() : YYMemoryCacheGetReleaseQueue();
            dispatch_async(queue, ^{
                [node class]; //hold and release in queue
            });
        } else if (_lru->_releaseOnMainThread && !pthread_main_np()) {
            dispatch_async(dispatch_get_main_queue(), ^{
                [node class]; //hold and release in queue
            });
        }
    }
    pthread_mutex_unlock(&_lock);
}

這里的cache由于使用了LRU淘汰策略,每次在讀cache的同時,會將本次的cache放到數據結構的最前面,從而延緩最近使用的cache被淘汰的時機,因為每次讀操作的同時也會發生寫操作,所以這里直接使用pthread_mutex互斥鎖,而沒有使用讀者寫者鎖。

綜上所述,如果你所遇到的多線程讀寫場景符合:

(1)存在單純的讀操作(即讀任務里沒有同時包含寫操作);

(2)讀者數量較多,而寫者數量較少。

都應該考慮使用讀者寫者鎖來進一步提升并發率。

注意:

(1)讀者寫者問題包含“讀者優先”和“寫者優先”兩類:前者表示讀線程只要看到有其他讀線程正在訪問文件,就可以繼續作讀訪問,寫線程必須等待所有讀線程都不訪問時才能寫文件,即使寫線程可能比一些讀線程更早提出申請;而寫者優先表示寫線程只要提出申請,再后來的讀線程就必須等待該寫線程完成。GCD的barrier屬于寫者優先的實現。具體請參考文檔[2]。

(2)串行隊列上沒必要使用GCD barrier,應該使用dispatch_queue_create建立的并發隊列;dispatch_get_global_queue由于是全局共享隊列,使用barrier達不到隔離當前任務的效果,會自動降級為dispatch_sync / dispatch_async。[5]

鎖的粒度(Granularity)

首先看兩段代碼:

代碼段1

@property (atomic, copy) NSString *atomicStr;

//thread A
atomicSr = @"am on thread A";
NSLog(@"%@", atomicStr);

//thread B
atomicSr = @"am on thread B";
NSLog(@"%@", atomicStr);

代碼段2

- (void)synchronizedAMethod {
    @synchronized (self) {
        ...
    }
}

- (void)synchronizedBMethod {
    @synchronized (self) {
        ...
    }
}

- (void)synchronizedCMethod {
    @synchronized (self) {
        ...
    }
}

粒度過小

執行代碼段1,在線程A上打印出來的字符串卻可能是“am on thread B”,原因是雖然atomicStr是原子操作,但是取出atomicStr之后,在執行NSLog之前,atomicStr仍然可能會被線程B修改。因此atomic聲明的屬性,只能保證屬性的get和set是完整的,但是卻不能保證get和set完之后的關于該屬性的操作是多線程安全的,這就是aomic聲明的屬性不一定能保證多線程安全的原因。

同樣的,不僅僅是atomic聲明的屬性,在開發中自己加的鎖如果粒度太小,也不能保證線程安全,代碼段1其實和下面代碼效果一致:

@property (nonatomic, strong) NSLock *lock;
@property (nonatomic, copy) NSString *atomicStr;

//thread A
[_lock lock];
atomicSr = @"am on thread A";
[_lock unlock];
NSLog(@"%@", atomicStr);

//thread B
[_lock lock];
atomicSr = @"am on thread B";
[_lock unlock];
NSLog(@"%@", atomicStr);

如果想讓程序按照我們的初衷,設置完atomicStr后打印出來的就是設置的值,就需要加大鎖的范圍,將NSLog也包括在臨界區內:

//thread A
[_lock lock];
atomicSr = @"am on thread A";
NSLog(@"%@", atomicStr);
[_lock unlock];

//thread B
[_lock lock];
atomicSr = @"am on thread B";
NSLog(@"%@", atomicStr);
[_lock unlock];

示例代碼很簡單,很容易看出問題所在,但是在實際開發中遇到更復雜些的代碼塊時,一不小心就可能踏入坑里。因此在設計多線程代碼時,要特別注意代碼之間的邏輯關系,若后續代碼依賴于加鎖部分的代碼,那這些后續代碼也應該一并加入鎖中。

粒度過大

@synchronized關鍵字會自動根據傳入對象創建一個與之關聯的鎖,在代碼塊開始時自動加鎖,并在代碼塊結束后自動解鎖,語法簡單明了,很方便使用,但是這也導致部分開發者過渡依賴于@synchronized關鍵字,濫用@synchronized(self)。如上述代碼段2中的寫法,在一整個類文件里,所有加鎖的地方用的都是@synchronized(self),這就可能會導致不相關的線程執行時都要互相等待,原本可以并發執行的任務不得不串行執行。另外使用@synchronized(self)還可能導致死鎖:

//class A
@synchronized (self) {
    [_sharedLock lock];
    NSLog(@"code in class A");
    [_sharedLock unlock];
}

//class B
[_sharedLock lock];
@synchronized (objectA) {
    NSLog(@"code in class B");
}
[_sharedLock unlock];

原因是因為self很可能會被外部對象訪問,被用作key來生成一鎖,類似上述代碼中的@synchronized (objectA)。兩個公共鎖交替使用的場景就容易出現死鎖。所以正確的做法是傳入一個類內部維護的NSObject對象,而且這個對象是對外不可見的[2]。

因此,不相關的多線程代碼,要設置不同的鎖,一個鎖只管一個臨界區。除此之外,還有種常見的錯誤做法會導致并發效率下降:

//thread A
[_lock lock];
atomicSr = @"am on thread A";
NSLog(@"%@", atomicStr);
//do some other tasks which are none of business with atomicStr;
for (int i = 0; i < 100000; i ++) {
    sleep(5);
}
[_lock unlock];

//thread B
[_lock lock];
atomicSr = @"am on thread B";
NSLog(@"%@", atomicStr);
//do some other tasks which are none of business with atomicStr;
for (int i = 0; i < 100000; i ++) {
    sleep(5);
}
[_lock unlock];

即在臨界區內包含了與當前加鎖對象無關的任務,實際應用中,需要我們尤其注意臨界區內的每一個函數,因為其內部實現可能調用了耗時且無關的任務。

遞歸鎖(Recursive lock)

相比較上述提到的@synchronized(self),下面這種情形引起的死鎖更加常見:

@property (nonatomic,strong) NSLock *lock;

_lock = [[NSLock alloc] init];

- (void)synchronizedAMethod {
    [_lock lock];
    //do some tasks
    [self synchronizedBMethod];
    [_lock unlock];
}

- (void)synchronizedBMethod {
    [_lock lock];
    //do some tasks
    [_lock unlock];
}

A方法已獲取鎖后,再調用B方法,就會觸發死鎖,B方法在等待A方法執行完成釋放鎖后才能繼續執行,而A方法執行完成的前提是執行完B方法。實際開發中,可能發生死鎖的情形往往隱蔽在方法的層層調用中。因此建議在不能確定是否會產生死鎖時,最好使用遞歸鎖。更保守一點的做法是不論何時都使用遞歸鎖,因為很難保證以后的代碼會不會在同一線程上多次加鎖。

遞歸鎖允許同一個線程在未釋放其擁有的鎖時反復對該鎖進行加鎖操作,內部通過一個計數器來實現。除了NSRecursiveLock,也可以使用性能更佳的pthread_mutex_lock,初始化時參數設置為PTHREAD_MUTEX_RECURSIVE即可:

pthread_mutexattr_t attr;
pthread_mutexattr_init (&attr);
pthread_mutexattr_settype (&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init (&_lock, &attr);
pthread_mutexattr_destroy (&attr);

值得注意的是,@synchronized內部使用的也是遞歸鎖:

// Begin synchronizing on 'obj'. 
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.  
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        assert(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }
    return result;
}

總結

想寫出高效、安全的多線程代碼,只是熟悉GCD、@synchronized、NSLock這幾個API是不夠的,還需要了解更多API背后的知識,深刻理解臨界區的概念、理清各個任務之間的時序關系是必要條件。

參考文檔

作者:Eternal_Love

鏈接:https://www.jianshu.com/p/b053b3c3c95c

广东26选5开奖结果查