Net二十八线程编制程序,8天玩转并行开发

By admin in 4858.com on 2019年3月25日

难题抽象:当某一财富一样时刻同意一定数额的线程使用的时候,供给有个机制来阻塞多余的线程,直到能源再度变得可用。
线程同步方案:Semaphore、SemaphoreSlim、CountdownEvent
方案个性:限量供应;除全体者外,别的人无条件等待;先到先得,并未先后顺序

当八个职责或线程并行运维时,难以制止的对有些有限的能源拓展并发的拜访。能够考虑采用信号量来进展那方面包车型大巴主宰(System.Threading.Semaphore)是象征叁个Windows内核的信号量对象。要是预测等待的岁月较短,能够设想动用SemaphoreSlim,它则带来的支出更小。.NetFrameWork中的信号量通过跟踪进入和距离的职分或线程来协调对能源的拜会。信号量须求知道能源的最大数额,当二个职务进入时,能源计数器会被减1,当计数器为0时,要是有职分访问能源,它会被堵塞,直到有职务离开截止。
比方须求有跨进程或AppDomain的一路时,能够设想使用Semaphore。Semaphore是获得的Windows
内核的信号量,所以在全方位种类中是有效的。它最首要的接口是Release和WaitOne,使用的法门和SemaphoreSlim是均等的。
信号量Semaphore是别的三个CLLX570中的内核同步对象。在.net中,类Semaphore封装了这些指标。与专业的排他锁对象(Monitor,Mutex,SpinLock)不相同的是,它不是三个排他的锁对象,它与SemaphoreSlim,ReaderWriteLock等同样允许四个少于的线程同时访问共享内部存款和储蓄器财富。

   

1.简介

1、Semaphore类
     
用于控制线程的造访数量,暗许的构造函数为initialCount和maximumCount,表示暗许设置的信号量个数和最大信号量个数。当您WaitOne的时候,信号量自减,当Release的时候,信号量自增,不过当信号量为0的时候,后续的线程就无法得到WaitOne了,所以必须等待先前的线程通过Release来释放。

Semaphore就恍如一个栅栏,有必然的容积,当个中的线程数量到达安装的最大值时候,就没无线程能够进入。然后,若是2个线程工作成功之后出来了,那下一个线程就可以进去了。Semaphore的WaitOne或Release等操作分别将自动地递减可能递增信号量的脚下计数值。当线程试图对计数值已经为0的信号量执行WaitOne操作时,线程将卡住直到计数值大于0。在结构Semaphore时,最少供给三个参数。信号量的始发体量和最大的体量。

   
 承接上一篇,大家后续说下.net4.0中的同步机制,是的,当出现了并行总括的时候,轻量级别的一道机制应运而生,在信号量这一块

新的轻量级同步原语:Barrier,Countdown伊夫nt,马努alReset伊芙ntSlim,SemaphoreSlim,SpinLock,SpinWait。轻量级同步原语只好用在三个经过内。而相应的那么些重量级版本帮忙跨进度的联手。

4858.com 14858.com 2

Semaphore的WaitOne恐怕Release方法的调用大致会消耗1阿秒的系统时间,而优化后的SemaphoreSlim则要求大概四分一阿秒。在测算中山大学量再三使用它的时候塞马phoreSlim依旧优势明显,加上SemaphoreSlim还抬高了重重接口,特别方便大家进行支配,所以在4.0过后的二十四线程开发中,推荐使用SemaphoreSlim。SemaphoreSlim的落成如下:

并发了一漫山遍野的轻量级,今日一连介绍下边包车型大巴三个信号量 Countdown伊夫nt,SemaphoreSlim,马努alReset伊芙ntSlim。

2.Barrier 

using System;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            Thread t1 = new Thread(Run1);
            t1.Start();
            Thread t2 = new Thread(Run2);
            t2.Start();
            Thread t3 = new Thread(Run3);
            t3.Start();
            Console.ReadKey();
        }

        //初始可以授予2个线程信号,因为第3个要等待前面的Release才能得到信号
        static Semaphore sem = new Semaphore(2, 10);

        static void Run1()
        {
            sem.WaitOne();
            Console.WriteLine("大家好,我是Run1;" + DateTime.Now.ToString("mm:ss"));

            //两秒后
            Thread.Sleep(2000);
            sem.Release();
        }

        static void Run2()
        {
            sem.WaitOne();
            Console.WriteLine("大家好,我是Run2;" + DateTime.Now.ToString("mm:ss"));

            //两秒后
            Thread.Sleep(2000);
            sem.Release();
        }

        static void Run3()
        {
            sem.WaitOne();
            Console.WriteLine("大家好,我是Run3;" + DateTime.Now.ToString("mm:ss"));

            //两秒后
            Thread.Sleep(2000);
            sem.Release();
        }
    }
}
public class SemaphoreSlim : IDisposable
    {  
        private volatile int m_currentCount; //可用数的资源数,<=0开始阻塞
        private readonly int m_maxCount;
        private volatile int m_waitCount; //阻塞的线程数
        private object m_lockObj;
        private volatile ManualResetEvent m_waitHandle;
        private const int NO_MAXIMUM = Int32.MaxValue;
        //Head of list representing asynchronous waits on the semaphore.
        private TaskNode m_asyncHead;
        // Tail of list representing asynchronous waits on the semaphore.
        private TaskNode m_asyncTail;
         // A pre-completed task with Result==true
        private readonly static Task<bool> s_trueTask =
            new Task<bool>(false, true, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, default(CancellationToken));

        public SemaphoreSlim(int initialCount) : this(initialCount, NO_MAXIMUM){ }        
        public SemaphoreSlim(int initialCount, int maxCount)
        {
            if (initialCount < 0 || initialCount > maxCount)
            {
                throw new ArgumentOutOfRangeException("initialCount", initialCount, GetResourceString("SemaphoreSlim_ctor_InitialCountWrong"));
            }
            if (maxCount <= 0)
            {
                throw new ArgumentOutOfRangeException("maxCount", maxCount, GetResourceString("SemaphoreSlim_ctor_MaxCountWrong"));
            }
            m_maxCount = maxCount;
            m_lockObj = new object();
            m_currentCount = initialCount;
        }
        public void Wait(){Wait(Timeout.Infinite, new CancellationToken());}
        public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckDispose();
            if (millisecondsTimeout < -1)
            {
                throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong"));
            }
            cancellationToken.ThrowIfCancellationRequested();
            uint startTime = 0;
            if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout > 0)
            {
                startTime = TimeoutHelper.GetTime();
            }

            bool waitSuccessful = false;
            Task<bool> asyncWaitTask = null;
            bool lockTaken = false;

            CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this);
            try
            {
                SpinWait spin = new SpinWait();
                while (m_currentCount == 0 && !spin.NextSpinWillYield)
                {
                    spin.SpinOnce();
                }
                try { }
                finally
                {
                    Monitor.Enter(m_lockObj, ref lockTaken);
                    if (lockTaken)
                    {
                        m_waitCount++;
                    }
                }

                // If there are any async waiters, for fairness we'll get in line behind
                if (m_asyncHead != null)
                {
                    Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't");
                    asyncWaitTask = WaitAsync(millisecondsTimeout, cancellationToken);
                }
                // There are no async waiters, so we can proceed with normal synchronous waiting.
                else
                {
                    // If the count > 0 we are good to move on.
                    // If not, then wait if we were given allowed some wait duration
                    OperationCanceledException oce = null;
                    if (m_currentCount == 0)
                    {
                        if (millisecondsTimeout == 0)
                        {
                            return false;
                        }
                        // Prepare for the main wait...
                        // wait until the count become greater than zero or the timeout is expired
                        try
                        {
                            waitSuccessful = WaitUntilCountOrTimeout(millisecondsTimeout, startTime, cancellationToken);
                        }
                        catch (OperationCanceledException e) { oce = e; }
                    }

                    Contract.Assert(!waitSuccessful || m_currentCount > 0, "If the wait was successful, there should be count available.");
                    if (m_currentCount > 0)
                    {
                        waitSuccessful = true;
                        m_currentCount--;
                    }
                    else if (oce != null)
                    {
                        throw oce;
                    }
                    if (m_waitHandle != null && m_currentCount == 0)
                    {
                        m_waitHandle.Reset();
                    }
                }
            }
            finally
            {
                // Release the lock
                if (lockTaken)
                {
                    m_waitCount--;
                    Monitor.Exit(m_lockObj);
                }

                // Unregister the cancellation callback.
                cancellationTokenRegistration.Dispose();
            }
            return (asyncWaitTask != null) ? asyncWaitTask.GetAwaiter().GetResult() : waitSuccessful;
        }

        private bool WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, CancellationToken cancellationToken)
        {
            int remainingWaitMilliseconds = Timeout.Infinite;
            //Wait on the monitor as long as the count is zero
            while (m_currentCount == 0)
            {
                // If cancelled, we throw. Trying to wait could lead to deadlock.
                cancellationToken.ThrowIfCancellationRequested();
                if (millisecondsTimeout != Timeout.Infinite)
                {
                    remainingWaitMilliseconds = TimeoutHelper.UpdateTimeOut(startTime, millisecondsTimeout);
                    if (remainingWaitMilliseconds <= 0)
                    {
                        // The thread has expires its timeout
                        return false;
                    }
                }
                // ** the actual wait **
                if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds))
                {
                    return false;
                }
            }
            return true;
        }
        public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckDispose();
            // Validate input
            if (millisecondsTimeout < -1)
            {
                throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong"));
            }
            // Bail early for cancellation
            if (cancellationToken.IsCancellationRequested)
                return Task.FromCancellation<bool>(cancellationToken);

            lock (m_lockObj)
            {
                // If there are counts available, allow this waiter to succeed.
                if (m_currentCount > 0)
                {
                    --m_currentCount;
                    if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset();
                    return s_trueTask;
                }
                    // If there aren't, create and return a task to the caller.
                    // The task will be completed either when they've successfully acquired
                    // the semaphore or when the timeout expired or cancellation was requested.
                else
                {
                    Contract.Assert(m_currentCount == 0, "m_currentCount should never be negative");
                    var asyncWaiter = CreateAndAddAsyncWaiter();
                    return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?
                        asyncWaiter :
                        WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);
                }
            }
        }

        /// <summary>Creates a new task and stores it into the async waiters list.</summary>
        /// <returns>The created task.</returns>
        private TaskNode CreateAndAddAsyncWaiter()
        {
            Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
            // Create the task
            var task = new TaskNode();
            // Add it to the linked list
            if (m_asyncHead == null)
            {
                Contract.Assert(m_asyncTail == null, "If head is null, so too should be tail");
                m_asyncHead = task;
                m_asyncTail = task;
            }
            else
            {
                Contract.Assert(m_asyncTail != null, "If head is not null, neither should be tail");
                m_asyncTail.Next = task;
                task.Prev = m_asyncTail;
                m_asyncTail = task;
            }
            // Hand it back
            return task;
        }

        private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            Contract.Assert(asyncWaiter != null, "Waiter should have been constructed");
            Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
            using (var cts = cancellationToken.CanBeCanceled ?
                CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :
                new CancellationTokenSource())
            {
                var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));
                if (asyncWaiter == await waitCompleted.ConfigureAwait(false))
                {
                    cts.Cancel(); // ensure that the Task.Delay task is cleaned up
                    return true; // successfully acquired
                }
            }

            // If we get here, the wait has timed out or been canceled.

            // If the await completed synchronously, we still hold the lock.  If it didn't,
            // we no longer hold the lock.  As such, acquire it.
            lock (m_lockObj)
            {
                // Remove the task from the list.  If we're successful in doing so,
                // we know that no one else has tried to complete this waiter yet,
                // so we can safely cancel or timeout.
                if (RemoveAsyncWaiter(asyncWaiter))
                {
                    cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred
                    return false; // timeout occurred
                }
            }

            // The waiter had already been removed, which means it's already completed or is about to
            // complete, so let it, and don't return until it does.
            return await asyncWaiter.ConfigureAwait(false) await asyncWaiter.ConfigureAwait(false);
        }
        public int Release(){ return Release(1);}

        public int Release(int releaseCount)
        {
            CheckDispose();

            // Validate input
            if (releaseCount < 1)
            {
                throw new ArgumentOutOfRangeException( "releaseCount", releaseCount, GetResourceString("SemaphoreSlim_Release_CountWrong"));
            }
            int returnCount;

            lock (m_lockObj)
            {
                // Read the m_currentCount into a local variable to avoid unnecessary volatile accesses inside the lock.
                int currentCount = m_currentCount;
                returnCount = currentCount;

                // If the release count would result exceeding the maximum count, throw SemaphoreFullException.
                if (m_maxCount - currentCount < releaseCount)
                {
                    throw new SemaphoreFullException();
                }

                // Increment the count by the actual release count
                currentCount += releaseCount;

                // Signal to any synchronous waiters
                int waitCount = m_waitCount;
                if (currentCount == 1 || waitCount == 1)
                {
                    Monitor.Pulse(m_lockObj);
                }
                else if (waitCount > 1)
                {
                    Monitor.PulseAll(m_lockObj);
                }

                // Now signal to any asynchronous waiters, if there are any.  While we've already
                // signaled the synchronous waiters, we still hold the lock, and thus
                // they won't have had an opportunity to acquire this yet.  So, when releasing
                // asynchronous waiters, we assume that all synchronous waiters will eventually
                // acquire the semaphore.  That could be a faulty assumption if those synchronous
                // waits are canceled, but the wait code path will handle that.
                if (m_asyncHead != null)
                {
                    Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't null");
                    int maxAsyncToRelease = currentCount - waitCount;
                    while (maxAsyncToRelease > 0 && m_asyncHead != null)
                    {
                        --currentCount;
                        --maxAsyncToRelease;

                        // Get the next async waiter to release and queue it to be completed
                        var waiterTask = m_asyncHead;
                        RemoveAsyncWaiter(waiterTask); // ensures waiterTask.Next/Prev are null
                        QueueWaiterTask(waiterTask);
                    }
                }
                m_currentCount = currentCount;

                // Exposing wait handle if it is not null
                if (m_waitHandle != null && returnCount == 0 && currentCount > 0)
                {
                    m_waitHandle.Set();
                }
            }

            // And return the count
            return returnCount;
        }

        ///Removes the waiter task from the linked list.</summary>
        private bool RemoveAsyncWaiter(TaskNode task)
        {
            Contract.Requires(task != null, "Expected non-null task");
            Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");

            // Is the task in the list?  To be in the list, either it's the head or it has a predecessor that's in the list.
            bool wasInList = m_asyncHead == task || task.Prev != null;

            // Remove it from the linked list
            if (task.Next != null) task.Next.Prev = task.Prev;
            if (task.Prev != null) task.Prev.Next = task.Next;
            if (m_asyncHead == task) m_asyncHead = task.Next;
            if (m_asyncTail == task) m_asyncTail = task.Prev;
            Contract.Assert((m_asyncHead == null) == (m_asyncTail == null), "Head is null iff tail is null");

            // Make sure not to leak
            task.Next = task.Prev = null;

            // Return whether the task was in the list
            return wasInList;
        }
        private static void QueueWaiterTask(TaskNode waiterTask)
        {
            ThreadPool.UnsafeQueueCustomWorkItem(waiterTask, forceGlobal: false);
        }
        public int CurrentCount
        {
            get { return m_currentCount; }
        }
        public WaitHandle AvailableWaitHandle
        {
            get
            {
                CheckDispose();
                if (m_waitHandle != null)
                    return m_waitHandle;
                lock (m_lockObj)
                {
                    if (m_waitHandle == null)
                    {
                        m_waitHandle = new ManualResetEvent(m_currentCount != 0);
                    }
                }
                return m_waitHandle;
            }
        }
        private sealed class TaskNode : Task<bool>, IThreadPoolWorkItem
        {
            internal TaskNode Prev, Next;
            internal TaskNode() : base() {}

            [SecurityCritical]
            void IThreadPoolWorkItem.ExecuteWorkItem()
            {
                bool setSuccessfully = TrySetResult(true);
                Contract.Assert(setSuccessfully, "Should have been able to complete task");
            }

            [SecurityCritical]
            void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae) { /* nop */ }
        }
    }

 

主要成员

Program

SemaphoreSlim类有多少个私有字段很重大,m_currentCount表示可用能源,假设m_currentCount>0老是调用Wait都会减1,当m_currentCount<=0时重新调用Wait方法就会堵塞。每便调用Release方法m_currentCount都会加1.m_maxCount表示最大可用能源数,是在构造函数中内定的。m_waitCount代表方今不通的线程数。TaskNode
m_asyncHead,m_asyncTail那贰个变量主要用于异步方法。

Net二十八线程编制程序,8天玩转并行开发。一:CountdownEvent

1)public Barrier(int participantCount, Action<巴里r>
postPhaseAction);构造 函数,participantCount:参加的线程个数(参加者的个数),
postPhaseAction每一个阶段后举行的操作。

在上述的章程中Release()方法也就是自增2个信号量,Release(5)自增四个信号量。可是,Release()到构造函数的第一个参数maximumCount的值就无法再自增了。

咱俩先是来看看Wait方法,那里还有它的异步版本WaitAsync。在Wait方法中率先检查m_currentCount是不是为0,若是是大家用SpinWait自旋12次;任意三回Wait都亟需锁住m_lockObj对象,m_asyncHead
!=
null代表近年来早就存在异步的靶子,所以大家调用WaitAsync方法,假使没有那么大家调用WaitUntilCountOrTimeout方法,该措施在m_currentCount==0会阻塞到到m_currentCount不为0或然逾期;看到WaitUntilCountOrTimeout方法中【if
(!Monitor.Wait(m_lockObj,
remainingWaitMilliseconds))】,就很明了Wait方法中【CancellationTokenRegistration
cancellationTokenRegistration =
cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceled伊芙ntHandler,
this)】存在的缘由了,确实很抢眼【那里和马努alReset伊夫ntSlim相似】。以往大家重返WaitAsync方法,该方法也是率先检查m_currentCount是不是大于0,大于直接回到。否者调用CreateAndAddAsyncWaiter创造3个Task<bool>【Task<bool>是三个链表结构】,若是没有收回且超时超越-1,那么就调用WaitUntilCountOrTimeoutAsync方法,该办法首先包装一个Task【var
waitCompleted = Task.WhenAny(asyncWaiter,
Task.Delay(millisecondsTimeout, cts.Token))】然后等待线程【await
waitCompleted.ConfigureAwait(false)】重回的是asyncWaiter也许另3个Delay的Task。假诺回去的不是asyncWaiter表达已经过期须求调用RemoveAsyncWaiter,然后回到
await
asyncWaiter.ConfigureAwait(false),如若回到的是asyncWaiter,那么就调用Cancel方法。那么那里的asyncWaiter.ConfigureAwait(false)哪一天退出了【或许说不阻塞】,那即将看Release中的QueueWaiterTask方法了。

   
 那种利用信号状态的同台基元相当适合在动态的fork,join的景观,它应用“信号计数”的法子,就比如那样,贰个麻将桌只可以包容几个

2) public void SignalAndWait();发出信号,表示参与者已落得屏障并等候全部别的加入者也实现屏障。

塞马phore可用于进程级交互。

QueueWaiterTask方法或调用TaskNode的ExecuteWorkItem方法。
那今后我们来看望Release方法,该办法会把currentCount加1,然后把等待线程转为就绪线程【Monitor.Pulse(m_lockObj)或
Monitor.PulseAll(m_lockObj)】,假使存在异步的话,看看还足以自由多少个异步task【
int maxAsyncToRelease = currentCount –
waitCount】,那里Release的表明很关键,只是没怎么通晓,现释同步的waiters,然后在出狱异步的waiters,可是自由同步后锁的财富没有自由,在刑释异步的waiters时候是把currentCount减1,那样感觉异步waiters优先获得资源。也不精晓自家的知情是不是科学?
1)当ConfigureAwait(true),代码由协助进行施行进入异步执行时,当前二头施行的线程上下文新闻(比如HttpConext.Current,Thread.CurrentThread.CurrentCulture)就会被捕获并保存至SynchronizationContext中,供异步执行中选用,并且供异步执行到位之后(await之后的代码)的一道实施中运用(纵然await之后是手拉手执行的,不过发生了线程切换,会在别的1个线程中实施「ASP.NET场景」)。那么些捕获当然是有代价的,当时我们误以为质量难题是其一地点的支出引起,但实则那么些费用不大,在大家的利用场景不至于会带来质量难题。

人打麻将,如若后来的人也想搓一把碰碰运气,那么她必须等待直到麻将桌上的人走掉一位。好,那正是简不难单的信号计数机制,从技术角

3) public bool SignalAndWait(int millisecondsTimeout); 假诺持有参加者都已在钦定时间内达到屏障,则为
true;不然为 false。

4858.com 34858.com 4

2)当Configurewait(flase),则不开展线程上下文音信的破获,async方法中与await之后的代码执行时就不能够获取await在此之前的线程的上下文音信,在ASP.NET中最直接的影响正是HttpConext.Current的值为null。

度上的话它是概念了最多能够进加入关贸总协定组织键代码的线程数。

4) public int ParticipantCount { get; } 获取屏障中加入者的总和。

using System;
using System.Diagnostics;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {

            Thread t1 = new Thread(Run1);
            t1.Start();

            Thread t2 = new Thread(Run2);
            t2.Start();

            Console.Read();
        }

        //初始可以授予2个线程信号,因为第3个要等待前面的Release才能得到信号
        static Semaphore sem = new Semaphore(3, 10, "命名Semaphore");

        static void Run1()
        {
            sem.WaitOne();

            Console.WriteLine("进程:" + Process.GetCurrentProcess().Id + "  我是Run1" + DateTime.Now.TimeOfDay);
        }

        static void Run2()
        {
            sem.WaitOne();

            Console.WriteLine("进程:" + Process.GetCurrentProcess().Id + "  我是Run2" + DateTime.Now.TimeOfDay);
        }
    }
}

   
 可是Countdown伊夫nt更牛X之处在于我们能够动态的改动“信号计数”的大小,比如一会儿力所能及容纳八个线程,一下又多少个,一下又10个,

5) public long CurrentPhaseNumber { get; internal set; }获取屏障的近日阶段编号。

Program

如此那般做有怎样利益吗?仍然承接上一篇文章所说的,比如2个职分急需加载1w条数据,那么大概出现那种境况。

6)public int ParticipantsRemaining {
get; }获取屏障中并未在时下阶段发出信号的到场者的数额。每当新阶段开首时,那几个值等于ParticipantCount ,每当有加入者调用这几个性格时,其减一。

4858.com 5

 

注意:

从来运转四回bin目录的exe文件,就能窥见最五只可以输出3个。

加载User表:         依照user表的数据量,大家必要开五个task。

1) 每当屏障(Barrier实例)接收到来自全数到场者的信号今后,屏障就会递增其等级数,运营构造函数中钦定的动作,并且化解阻塞每七个出席者。

Semaphore能够界定可同时做客某一能源或财富池的线程数。

加载Product表:    产品表数据相对比较多,总括之后供给开7个task。

2)Barrier使用完要调用Dispose()方法释放财富

       
Semaphore类在中间维护2个计数器,当一个线程调用Semaphore对象的Wait类别措施时,此计数器减一,只要计数器依旧三个正数,线程就不会卡住。当计数器减到0时,再调用Semaphore对象Wait连串措施的线程将被封堵,直到有线程调用Semaphore对象的Release()方法扩大计数器值时,才有可能打消阻塞状态。

加载order表:       由于本人的网站订单增加,总结之后须求开十二个task。

3.CountdownEvent

 

 

根本成员:

以身作则表明:
体育场面都安插有若干台公用总结机供读者查询新闻,当某日读者对比多时,必须排队等候。UseLibraryComputer实例用八线程模拟了几人选用多台计算机的经过

先前的稿子也说了,大家须求协调task在多阶段加载数据的协同难题,那么怎么样回应那里的5,8,12,幸亏,Countdown伊芙nt给大家提供了

1) public int InitialCount { get; } 获取设置事件时最初的信号数。

4858.com 64858.com 7

能够动态修改的缓解方案。

1) public CountdownEvent(int initialCount);

using System;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        //图书馆拥有的公用计算机  
        private const int ComputerNum = 3;
        private static Computer[] LibraryComputers;
        //同步信号量  
        public static Semaphore sp = new Semaphore(ComputerNum, ComputerNum);

        static void Main(string[] args)
        {
            //图书馆拥有ComputerNum台电脑  
            LibraryComputers = new Computer[ComputerNum];
            for (int i = 0; i < ComputerNum; i++)
                LibraryComputers[i] = new Computer("Computer" + (i + 1).ToString());
            int peopleNum = 0;
            Random ran = new Random();
            Thread user;
            System.Console.WriteLine("敲任意键模拟一批批的人排队使用{0}台计算机,ESC键结束模拟……", ComputerNum);
            //每次创建若干个线程,模拟人排队使用计算机  
            while (System.Console.ReadKey().Key != ConsoleKey.Escape)
            {
                peopleNum = ran.Next(0, 10);
                System.Console.WriteLine("\n有{0}人在等待使用计算机。", peopleNum);

                for (int i = 1; i <= peopleNum; i++)
                {
                    user = new Thread(UseComputer);
                    user.Start("User" + i.ToString());
                }
            }
        }

        //线程函数  
        static void UseComputer(Object UserName)
        {
            sp.WaitOne();//等待计算机可用  

            //查找可用的计算机  
            Computer cp = null;
            for (int i = 0; i < ComputerNum; i++)
                if (LibraryComputers[i].IsOccupied == false)
                {
                    cp = LibraryComputers[i];
                    break;
                }
            //使用计算机工作  
            cp.Use(UserName.ToString());

            //不再使用计算机,让出来给其他人使用  
            sp.Release();
        }
    }

    class Computer
    {
        public readonly string ComputerName = "";
        public Computer(string Name)
        {
            ComputerName = Name;
        }
        //是否被占用  
        public bool IsOccupied = false;
        //人在使用计算机  
        public void Use(String userName)
        {
            System.Console.WriteLine("{0}开始使用计算机{1}", userName, ComputerName);
            IsOccupied = true;
            Thread.Sleep(new Random().Next(1, 2000)); //随机休眠,以模拟人使用计算机  
            System.Console.WriteLine("{0}结束使用计算机{1}", userName, ComputerName);
            IsOccupied = false;
        }
    }
}

 

2) public bool Signal();向 Countdown伊芙nt 注册信号,同时减小CurrentCount的值。

Program

  1 using System.Collections.Concurrent;
  2 using System.Threading.Tasks;
  3 using System;
  4 using System.Diagnostics;
  5 using System.Collections.Generic;
  6 using System.Linq;
  7 using System.Threading;
  8 
  9 class Program
 10 {
 11     //默认的容纳大小为“硬件线程“数
 12     static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);
 13 
 14     static void Main(string[] args)
 15     {
 16         //加载User表需要5个任务
 17         var userTaskCount = 5;
 18 
 19         //重置信号
 20         cde.Reset(userTaskCount);
 21 
 22         for (int i = 0; i < userTaskCount; i++)
 23         {
 24             Task.Factory.StartNew((obj) =>
 25             {
 26                 LoadUser(obj);
 27             }, i);
 28         }
 29 
 30         //等待所有任务执行完毕
 31         cde.Wait();
 32 
 33         Console.WriteLine("\nUser表数据全部加载完毕!\n");
 34 
 35         //加载product需要8个任务
 36         var productTaskCount = 8;
 37 
 38         //重置信号
 39         cde.Reset(productTaskCount);
 40 
 41         for (int i = 0; i < productTaskCount; i++)
 42         {
 43             Task.Factory.StartNew((obj) =>
 44             {
 45                 LoadProduct(obj);
 46             }, i);
 47         }
 48 
 49         cde.Wait();
 50 
 51         Console.WriteLine("\nProduct表数据全部加载完毕!\n");
 52 
 53         //加载order需要12个任务
 54         var orderTaskCount = 12;
 55 
 56         //重置信号
 57         cde.Reset(orderTaskCount);
 58 
 59         for (int i = 0; i < orderTaskCount; i++)
 60         {
 61             Task.Factory.StartNew((obj) =>
 62             {
 63                 LoadOrder(obj);
 64             }, i);
 65         }
 66 
 67         cde.Wait();
 68 
 69         Console.WriteLine("\nOrder表数据全部加载完毕!\n");
 70 
 71         Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");
 72 
 73         Console.Read();
 74     }
 75 
 76     static void LoadUser(object obj)
 77     {
 78         try
 79         {
 80             Console.WriteLine("当前任务:{0}正在加载User部分数据!", obj);
 81         }
 82         finally
 83         {
 84             cde.Signal();
 85         }
 86     }
 87 
 88     static void LoadProduct(object obj)
 89     {
 90         try
 91         {
 92             Console.WriteLine("当前任务:{0}正在加载Product部分数据!", obj);
 93         }
 94         finally
 95         {
 96             cde.Signal();
 97         }
 98     }
 99 
100     static void LoadOrder(object obj)
101     {
102         try
103         {
104             Console.WriteLine("当前任务:{0}正在加载Order部分数据!", obj);
105         }
106         finally
107         {
108             cde.Signal();
109         }
110     }
111 }

3) public void Reset(int count);将
System.Threading.Countdown伊芙nt.InitialCount
属性重新设置为钦赐值。

 

 

注意:

2、SemaphoreSlim类

4858.com 8

肯定要保管各种参加工作的线程都调用了Signal,倘使有最少二个从未有过调用,那么职务会永远阻塞。所以一般在finally块中调用Signal是个好习惯。

     在.net
4.0从前,framework中有3个重量级的Semaphore,能够跨进度同步,SemaphoreSlim轻量级不行,msdn对它的诠释为:限制可同时做客某一财富或能源池的线程数。

咱俩看看有多少个基本点格局:Wait和Signal。每调用二次Signal也等于麻将桌上走了1人,直到全数人都搓过麻将wait才给放行,那里同样要

4.ManualResetEvent与ManualResetEventSlim

4858.com 94858.com 10

瞩目也正是“超时“难点的存在性,特别是在并行总结中,轻量级别给大家提供了”裁撤标记“的体制,那是在重量级别中不存在的,比如上边包车型客车

马努alReset伊芙nt:可实现跨进度或AppDomain的2只。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);

        static void Main(string[] args)
        {
            for (int i = 0; i < 12; i++)
            {
                Task.Factory.StartNew((obj) =>
                {
                    Run(obj);
                }, i);
            }
            Console.Read();
        }

        static void Run(object obj)
        {
            slim.Wait();
            Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
            //这里busy3s中
            Thread.Sleep(3000);
            slim.Release();
        }
    }
}

重载public bool Wait(int millisecondsTimeout, CancellationToken
cancellationToken),具体使用能够看前一篇小说的牵线。

重中之重成员:

4858.com,Program

 

1)public bool
Reset();将事件意况设置为非终止状态,导致线程阻止,重回值指示操作是或不是成功。

同等,幸免死锁的情景,我们必要知道”超时和注销标记“的化解方案,像SemaphoreSlim那种定死的”线程请求范围“,其实是下降了增添性,使用需谨慎,在认为有要求的时候使用它

二:SemaphoreSlim

2)public bool
Set();将事件景况设置为停止情状,允许一个或四个等待线程继续,再次来到值提醒操作是还是不是成功。

注:Semaphore类是SemaphoreSlim类的老版本,该版本选择纯粹的根本时间(kernel-time)方式。

     在.net
4.0在此以前,framework中有2个重量级的塞马phore,人家能够跨进程同步,咋轻量级不行,msdn对它的解释为:限制可同时做客

马努alReset伊夫ntSlim:不可使用于跨进度的一块儿。

    SemaphoreSlim类不行使Windows内核信号量,而且也不援助进度间一块。所以在跨程序同步的场景下能够动用Semaphore

某一能源或能源池的线程数。关于它的份额级demo,我的上三个多元有示范,你也足以知晓为Countdown伊夫nt是
SemaphoreSlim的职能加

首要成员:

 

强版,好了,举一个轻量级使用的例证。

1) public bool IsSet { get; }获取是不是设置了轩然大波。

3、CountdownEvent类

 1 using System.Collections.Concurrent;
 2 using System.Threading.Tasks;
 3 using System;
 4 using System.Diagnostics;
 5 using System.Collections.Generic;
 6 using System.Linq;
 7 using System.Threading;
 8 
 9 class Program
10 {
11     static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12);
12 
13     static void Main(string[] args)
14     {
15         for (int i = 0; i < 12; i++)
16         {
17             Task.Factory.StartNew((obj) =>
18             {
19                 Run(obj);
20             }, i);
21         }
22 
23         Console.Read();
24     }
25 
26     static void Run(object obj)
27     {
28         slim.Wait();
29 
30         Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
31 
32         //这里busy3s中
33         Thread.Sleep(3000);
34 
35         slim.Release();
36     }
37 }

2) public void
Reset();将事件意况设置为非终止状态,从而导致线程受阻,重临值提示操作是或不是成功。

   
 那种使用信号状态的联合基元10分适合在动态的fork,join的地方,它使用“信号计数”的法门,就比如那样,三个麻将桌只可以包容3位打麻将,要是后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉1个人。好,那正是不难的信号计数机制,从技术角度上来说它是概念了最多能够进加入关贸总协定协会键代码的线程数。

 

3)public bool
Set();将事件情状设置为平息景况,允许三个或八个等待线程继续,重返值提醒操作是或不是中标。

   
 不过Countdown伊夫nt更牛X之处在于我们能够动态的变动“信号计数”的轻重,比如一会儿力所能及容纳7个线程,一下又五个,一下又十个,那样做有怎样便宜呢?比如一个任务需求加载1w条数据,那么只怕出现那种景观。

4858.com 11

4)public void Wait();阻止当前线程,直到设置了现阶段 马努alReset伊芙ntSlim
截至。

例如:

一样,制止死锁的场馆,大家须要驾驭”超时和注销标记“的化解方案,像SemaphoreSlim那种定死的”线程请求范围“,其实是下跌了扩展性,

5) public void Dispose();释放能源。

加载User表:         根据user表的数据量,大家必要开四个task。

故此说,试水有危机,使用需谨慎,在认为有必不可少的时候利用它。

6)public ManualResetEventSlim(bool initialState, int spinCount);

加载Product表:    产品表数据相对相比多,总括之后必要开七个task。

 

5.Semaphore与SemaphoreSlim

加载order表:       由于本人的网站订单拉长,总结之后要求开11个task。

三: ManualResetEventSlim

Semaphore:可完毕跨进度或AppDomain的同台,可使用WaitHandle操作递减信号量的计数。

4858.com 124858.com 13

   
 相信它的份量级别大家都掌握是马努alReset,而那些轻量级别选取的是”自旋等待“+”内核等待“,也便是说先采纳”自旋等待的法子“等待,

首要成员:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    class Program
    {
        //默认的容纳大小为“硬件线程“数
        static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount);

        static void LoadUser(object obj)
        {
            try
            {
                Console.WriteLine("ThreadId={0};当前任务:{1}正在加载User部分数据!", Thread.CurrentThread.ManagedThreadId, obj);
            }
            finally
            {
                cde.Signal();
            }
        }

        static void LoadProduct(object obj)
        {
            try
            {
                Console.WriteLine("ThreadId={0};当前任务:{1}正在加载Product部分数据!", Thread.CurrentThread.ManagedThreadId, obj);
            }
            finally
            {
                cde.Signal();
            }
        }

        static void LoadOrder(object obj)
        {
            try
            {
                Console.WriteLine("ThreadId={0};当前任务:{1}正在加载Order部分数据!", Thread.CurrentThread.ManagedThreadId, obj);
            }
            finally
            {
                cde.Signal();
            }
        }

        static void Main(string[] args)
        {
            //加载User表需要5个任务
            var userTaskCount = 5;
            //重置信号
            cde.Reset(userTaskCount);
            for (int i = 0; i < userTaskCount; i++)
            {
                Task.Factory.StartNew((obj) =>
                {
                    LoadUser(obj);
                }, i);
            }
            //等待所有任务执行完毕
            cde.Wait();
            Console.WriteLine("\nUser表数据全部加载完毕!\n");

            //加载product需要8个任务
            var productTaskCount = 8;
            //重置信号
            cde.Reset(productTaskCount);
            for (int i = 0; i < productTaskCount; i++)
            {
                Task.Factory.StartNew((obj) =>
                {
                    LoadProduct(obj);
                }, i);
            }
            cde.Wait();
            Console.WriteLine("\nProduct表数据全部加载完毕!\n");

            //加载order需要12个任务
            var orderTaskCount = 12;
            //重置信号
            cde.Reset(orderTaskCount);
            for (int i = 0; i < orderTaskCount; i++)
            {
                Task.Factory.StartNew((obj) =>
                {
                    LoadOrder(obj);
                }, i);
            }
            cde.Wait();
            Console.WriteLine("\nOrder表数据全部加载完毕!\n");

            Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");
            Console.Read();
        }
    }
}

直到另三个职务调用set方法来刑释它。借使迟迟等不到自由,那么任务就会进去基于内核的等候,所以说若是我们领悟等待的时辰比较短,采

1)public Semaphore(int initialCount, int maximumCount);

Program

用轻量级的本子会具备更好的性质,原理大概就那样,下边举个小例子。

2)public int Release();退出信号量并再次来到前1个计数。

大家看出有三个根本方法:Wait和Signal。每调用二次Signal约等于麻将桌上走了一个人,直到全体人都搓过麻将wait才给放行,那里同样要留心也正是“超时“难点的存在性,尤其是在并行计算中,轻量级别给我们提供了”撤除标记“的编写制定,那是在重量级别中不设有的

 1 using System.Collections.Concurrent;
 2 using System.Threading.Tasks;
 3 using System;
 4 using System.Diagnostics;
 5 using System.Collections.Generic;
 6 using System.Linq;
 7 using System.Threading;
 8 
 9 class Program
10 {
11     //2047:自旋的次数
12     static ManualResetEventSlim mrs = new ManualResetEventSlim(false, 2047);
13 
14     static void Main(string[] args)
15     {
16 
17         for (int i = 0; i < 12; i++)
18         {
19             Task.Factory.StartNew((obj) =>
20             {
21                 Run(obj);
22             }, i);
23         }
24 
25         Console.WriteLine("当前时间:{0}我是主线程{1},你们这些任务都等2s执行吧:\n",
26         DateTime.Now,
27         Thread.CurrentThread.ManagedThreadId);
28         Thread.Sleep(2000);
29 
30         mrs.Set();
31 
32         Console.Read();
33     }
34 
35     static void Run(object obj)
36     {
37         mrs.Wait();
38 
39         Console.WriteLine("当前时间:{0}任务 {1}已经进入。", DateTime.Now, obj);
40     }
41 }

3)public virtual bool WaitOne(); 阻止当前线程,直到近来System.Threading.WaitHandle 收到信号。 如果当前实例收到信号,则为 true。
假诺当前实例永远收不到信号,则
System.Threading.WaitHandle.WaitOne(System.Int32,System.Boolean)永不重临。

注:假如调用Signal()没有到达内定的次数,那么Wait()将直接等候,请保管使用各类线程实现后都要调用Signal方法。

 

注意:

 

4858.com 14

选拔完Semaphore立即调用Dispose()方法释放能源。

 

SemaphoreSlim:不可完毕跨进度或AppDomain的一起,不可动用WaitHandle操作递减信号量的计数。

关键成员:

1)public SemaphoreSlim(int initialCount, int maxCount);

2)public int CurrentCount { get; } 获取将同意进入
System.Threading.SemaphoreSlim 的线程的多少。

3)public int Release();退出 System.Threading.SemaphoreSlim 一次。

4)public void Wait();阻止当前线程,直至它可进入
System.Threading.塞马phoreSlim 结束。

5)public WaitHandle AvailableWaitHandle { get;
}重返2个可用于在信号量上等候的 System.Threading.WaitHandle。

注意:

选用完SemaphoreSlim立即调用Dispose()方法释放财富。

6.SpinLock:自旋锁,对SpinWait的包装

重在成员:

1)public void Enter(ref bool lockTaken);
选取可信赖的章程获取锁,那样,即便在格局调用中生出特别的景况下,都能应用可相信的不二法门检查
lockTaken 以分明是否已获得锁。

2)public void Exit(bool useMemoryBarrier);释放锁

说明:

1)不要将SpinLock注解为只读字段。

2)确定保证每一次任务完结后都释放锁。

7.SpinWait:基于自旋的等候

最主要成员:

1)public static void SpinUntil(Func<bool>
condition);在内定条件获得满足此前自旋。

2)public static bool SpinUntil(Func<bool> condition, int
millisecondsTimeout);在内定条件获得满意或钦点超时过期事先自旋。

说明:

1)适用情况:等待有些条件满意急需的岁月非常短,并且不愿意发生昂贵的上下文切换。

2)内部存款和储蓄器开支非常的小。其是二个结构体,不会生出不须求的内部存款和储蓄器开支。

3)假使自旋时间过长,SpinWait会让出底层线程的时间片并触发上下文切换。

8.Look:互斥锁

说明:

1)通过应用lock关键字能够获得一个对象的互斥锁。

2)使用lock,这会调用System.Threading.Monitor.Enter(object
obj, ref bool lockTaken)和System.Threading.Monitor.Exit(object
obj)方法。

3)不要对值类型使用Lock

4)防止锁定类的外表对象,防止跨成员或类的境界获得和假释1个锁,制止获得锁的时候调用未知代码。

9.Monitor

根本成员:

1)public static void Enter(object obj, ref bool
lockTaken);获取钦定对象上的排他锁,并自行安装2个值,提醒是还是不是收获了该锁。

2)public static void Exit(object obj);释放钦赐对象上的排他锁。

3)public static void TryEnter(object obj, int millisecondsTimeout, ref
bool
lockTaken);在钦定的纳秒数中,尝试获得钦点对象上的排他锁,并自动安装贰个值,提示是或不是获得了该锁。

说明:

1)不要对值类型使用Monitor。

2)幸免锁定类的外表对象,制止跨成员或类的界线得到和刑满释放解除劳教多少个锁,制止获得锁的时候调用未知代码。

10.volatile修饰符

作用:

当共享变量被分化的线程访问和翻新且从未锁和原子操作的时候,最新的值总能在共享变量中展现出来。

注意:

1)可以将这几个修饰符用于类和struct的字段,但不可能宣称使用volatile关键字的一些变量。

2)Volatile可修饰的品类为:整型,bool,带有整型的枚举,引用类型,推到为引用类型的泛型类型,不安全上下文中的指针类型以及代表指针或许句柄的阳台相关项目。

11.Interlocked:为多义务或线程共享的变量提供原子操作

驷不及舌成员:

1)public static int Increment(ref int
location);以原子操作的样式递增钦点变量的值并存储结果。

2)public static int Add(ref int location1, int value);对八个 33个人整数实行求和并用和替换第一个整数,上述操作作为三个原子操作完毕。

3)public static float CompareExchange(ref float location1, float value,
float comparand);
比较多个单精度浮点数是不是等于,要是相等,则替换在那之中2个值。

4)public static int Decrement(ref int
location);以原子操作的样式递减钦定变量的值并存款和储蓄结果。

注意:

最大的利益:花费低,功效高。

 

12 使用格局

1)Barrier

 1 public static void BarrierTest1()
 2 {
 3             //构造函数的参数participantCount表示参与者的数量。
 4             //注意:父线程也是一个参与者,所以两个任务,但是Barrier的participantCount为3
 5             //注意:无法保证任务1和任务2完成的先后顺序。
 6             //Barrier(int participantCount, Action<Barrier> postPhaseAction);也可使 用此方法
 7             //当所有参与者都已到达屏障后,执行要处理的任务,即对两个任务产生的数据统一处理的过程可放在此处执行。
 8             using (Barrier bar = new Barrier(3))
 9             {
10                 Task.Factory.StartNew(() =>
11                 {
12 
13                     //具体业务
14 
15                     //当业务完成时,执行下面这行代码;发出信号,表明任务已完成,并等待其他参与者
16                     bar.SignalAndWait();
17 
18                 });
19 
20                 Task.Factory.StartNew(() =>
21                 {
22 
23                     //具体业务
24 
25                     //当业务完成时,执行下面这行代码;发出信号,表明任务已完成,并等待其他参与者
26                     bar.SignalAndWait();
27 
28                 });
29 
30                 //保证上面两个任务都能完成才执行bar.SignalAndWait();这一句之后的代码
31                 bar.SignalAndWait();
32                 //当上述两个任务完成后,对两个任务产生的数据进行统一处理。
33 
34             }
35 
36         }
37 
38         public static void BarrierTest2()
39         {
40             //构造函数的参数participantCount表示参与者的数量。
41             using (Barrier bar = new Barrier(3))
42             {
43                 Task.Factory.StartNew(() =>
44                 {
45 
46                     //具体业务
47 
48                     //当业务完成时,执行下面这行代码;移除一个参与者
49                     //注意:bar.SignalAndWait();与bar.RemoveParticipant();可以混用
50                     bar.RemoveParticipant();
51 
52                 });
53 
54                 Task.Factory.StartNew(() =>
55                 {
56 
57                     //具体业务
58 
59                     //当业务完成时,执行下面这行代码;移除一个参与者
60                     bar.RemoveParticipant();
61 
62                 });
63 
64                 bar.SignalAndWait();
65                 //当上述两个任务完成后,对两个任务产生的数据进行统一处理。
66             }
67 }

2)CountdownEvent

 1 public static void CountdownEventTest()
 2 {
 3             //注意初始化信号数等于并行的任务数
 4             int initialCount = N;
 5             using (CountdownEvent cd = new CountdownEvent(initialCount))
 6             {
 7                 //多个并行任务,完成一个减少一个信号
 8                 for (int i = 0; i < N; i++)
 9                 {
10                     Task.Factory.StartNew(() => 
11                     {
12                         try
13                         {
14                             //真正的业务
15                         }
16                         finally
17                         {
18                             //确保不论何种情况都能减少信号量,防止死循环
19                             cd.Signal();
20                         }
21                     });
22                 }
23 
24                 //等待上述多个任务执行完毕
25                 cd.Wait();
26             }
27 }

3)ManualResetEvent与ManualResetEventSlim

 1 public static void ManualResetEventTest()
 2 {
 3             ManualResetEvent mre = new ManualResetEvent(false);
 4             ManualResetEvent mre1 = new ManualResetEvent(false);
 5 
 6             try
 7             {
 8                 Task.Factory.StartNew(() =>
 9                 {
10                     //业务
11                     mre.Set();
12                 });
13 
14                 Task.Factory.StartNew(() =>
15                 {
16                     mre.WaitOne();
17 
18                     //使用任务1的数据
19                     
20                     mre1.Set();
21 
22                 });
23 
24                 //等待任务全部执行完
25                 mre1.WaitOne();
26             }
27             finally
28             {
29                 mre.Dispose();
30                 mre1.Dispose();
31             }
32         }
33         //注意:本示例并不是一个最佳实践,目的在于演示ManualResetEventSlim
34         //当没有更好的协调机制时,可考虑使用本示例
35         public static void ManualResetEventSlimTest1()
36         {
37             ManualResetEventSlim mreslim = new ManualResetEventSlim();
38             ManualResetEventSlim mreslim1 = new ManualResetEventSlim();
39             try
40             {
41                 Task.Factory.StartNew(() =>
42                 {
43                     mreslim.Set();
44 
45                     //业务
46 
47                     mreslim.Reset();
48 
49                 });
50 
51                 Task.Factory.StartNew(() =>
52                 {
53                     //当mreslim.Set()被调用时,mreslim.Wait()立即返回,即解除阻塞。
54                     mreslim.Wait();
55                     //直到mreslim.Reset()被调用,循环才会结束
56                     while (mreslim.IsSet)
57                     {
58                         //业务
59                     }
60                     mreslim1.Set();
61                 });
62 
63                 //等待第二个任务完成
64                 mreslim1.Wait();
65             }
66             finally
67             {
68                 mreslim.Dispose();
69                 mreslim1.Dispose();
70             }
71 }

4)Semaphore与SemaphoreSlim

 1 public static void SemaphoreSlimTest()
 2 {
 3             int initialCount = 10;//可以是其他值
 4             List<string> list = new List<string>();
 5             var tasks = new Task[initialCount];
 6        //如果使用Semaphore,实例化的时候,那么最少要传递两个参数,信号量的初始请求数和信号量的最大请求数
 7             using(SemaphoreSlim ssl = new SemaphoreSlim(initialCount))
 8             {
 9                 for (int i = 0; i < initialCount; i++)
10                 {
11                     int j = i;
12                     tasks[j] = Task.Factory.StartNew(() =>
13                     {
14                         try
15                         {
16                             //等待,直到进入SemaphoreSlim为止
17                  //如果使用Semaphore,应调用WaitOne
18                             ssl.Wait();
19                             //直到进入SemaphoreSlim才会执行下面的代码
20                             list.Add(""+j);//可将这部分替换成真实的业务代码
21                         }
22                         finally
23                         {
24                             ssl.Release();
25                         }
26                     });
27                 }
28                 //注意一定要在using块的最后阻塞线程,直到所有的线程均处理完任务
29                 //如果没有等待任务全部完成的语句,会导致SemaphoreSlim资源被提前释放。
30                 Task.WaitAll(tasks);
31             }          
32 }

5)SpinLock

 1 public static void SpinLockTest()
 2 {
 3             bool lockTaken = false;
 4             SpinLock sl = new SpinLock(true);
 5             try
 6             {
 7                 //获得锁
 8                 //如果不能获得锁,将会等待并不断检测锁是否可用
 9                 //获得锁后,lockTaken为true,此行代码之后的部分才会开始运行
10                 sl.Enter(ref lockTaken);
11 
12                 //或使用含有超时机制的TryEnter方法
13                 //sl.TryEnter(1000,ref lockTaken);
14                 //然后抛出超时异常
15                 //if (!lockTaken)
16                 //{
17                 //    throw new TimeoutException("超时异常");
18                 //}
19 
20                 //真正的业务。。。
21                 
22             }
23             finally
24             {
25                 if (lockTaken)
26                 {
27                     //释放锁
28                     //SpinLock没有使用内存屏障,因此设成false
29                     sl.Exit(false);
30                 }
31             }                        
32 }

6)SpinWait

 1 public static void SpinWaitTest()
 2 {
 3             bool isTrue = false;
 4             //任务一,处理业务,成功将isTrue设置为true
 5             Task.Factory.StartNew(() => 
 6             {
 7                 //处理业务,返回结果result指示是否成功
 8                 bool result = ...;
 9                 if (result)
10                 {
11                     isTrue = true;
12                 }
13             });
14 
15             //可设定等待时间,如果超时,则向下执行
16             Task.Factory.StartNew(() => {
17                 SpinWait.SpinUntil(()=>isTrue,10000);
18                 //真正的业务
19             });
20 }

7) Look

 1 下面两段代码是等价的。
 2 lock (Object)
 3 {
 4        //do something
 5 }
 6 
 7 //等价代码
 8 bool lockTaken = false;
 9 try
10 {
11     Monitor.Enter(object,lockTaken);
12      //do something
13 }
14 finally
15 {
16      if(lockTaken)
17      {
18             Monitor.Exit(object);
19        }
20 }

8)Interlocked

 1 public static void InterlockedTest()
 2 {
 3             Task[] tasks = new Task[10];
 4             long j = 0;
 5             for(int i=0;i<10;i++)
 6             {
 7                 int t = i;
 8                 tasks[t] = Task.Factory.StartNew(()=>
 9                 {
10                 //以安全的方式递增j
11                     Interlocked.Increment(ref j);
12                 });
13             }
14             Task.WaitAll(tasks);           
15 }

转发与引用请注脚出处。

时光匆忙,水平有限,如有不当之处,欢迎指正。

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图
Copyright @ 2010-2019 美高梅手机版4858 版权所有