Browsed by
分类:人行码路

Scala 语法:implicit

Scala 语法:implicit

Implicit是暗示的意思,这个关键字在Scala中给一个类增加一些方法,用于接收不同类型的对象。

Javascript逻辑运算符及优先级

Javascript逻辑运算符及优先级

首先可以看看《运算符优先级》和《逻辑运算符

然后重点说说Javascript逻辑运算符及优先级,基本的原则是
a && b
如果 a 为true,直接返回b,而不管b为true或者false 。如果 a 为false 那么直接返回a。
a || b
如果 a 为true,直接返回a,而不会继续往下执行。如果 a 为false,直接返回b,而不管b为true或者false 。

需要注意的是true可以为true、非零的数字、非空字符、对象,false可以为false、null、数字0、空字符串\”\”、数字NaN、undefined

Java IO: BIO, NIO, AIO

Java IO: BIO, NIO, AIO

BIO, NIO, AIO,本身的描述都是在Java语言的基础上的。
而描述IO,我们需要从三个层面:
1. 编程语言
2. 实现原理
3. 底层基础

从编程语言层面

BIO, NIO, AIO以Java的角度理解
– BIO,同步阻塞式IO,简单理解:一个连接一个线程
– NIO,同步非阻塞IO,简单理解:一个请求一个线程
– AIO,异步非阻塞IO,简单理解:一个有效请求一个线程

BIO

在JDK1.4之前,用Java编写网络请求,都是建立一个ServerSocket,然后,客户端建立Socket时就会询问是否有线程可以处理,如果没有,要么等待,要么被拒绝。即:一个连接,要求Server对应一个处理线程。

public class PlainEchoServer {
  public void serve(int port) throws IOException {
    final ServerSocket socket = new ServerSocket(port); //Bind server to port
    try {
      while (true) {
        //Block until new client connection is accepted
        final Socket clientSocket = socket.accept();
        System.out.println(\"Accepted connection from \" + clientSocket);
        //Create new thread to handle client connection
        new Thread(new Runnable() {
          @Override
          public void run() {
            try {
              BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
              PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
              //Read data from client and write it back
              while (true) {
                writer.println(reader.readLine());
                writer.flush();
              }
            } catch (IOException e) {
              e.printStackTrace();
              try {
                clientSocket.close();
              } catch (IOException ex) {
                // ignore on close
              }
            }
          }
        }).start();
        //Start thread
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}

NIO

在Java里的由来,在JDK1.4及以后版本中提供了一套API来专门操作非阻塞I/O,我们可以在java.nio包及其子包中找到相关的类和接口。由于这套API是JDK新提供的I/O API,因此,也叫New I/O,这就是包名nio的由来。这套API由三个主要的部分组成:缓冲区(Buffers)、通道(Channels)和非阻塞I/O的核心类组成。在理解NIO的时候,需要区分,说的是New I/O还是非阻塞IO,New I/O是Java的包,NIO是非阻塞IO概念。这里讲的是后面一种。
NIO本身是基于事件驱动思想来完成的,其主要想解决的是BIO的大并发问题:在使用同步I/O的网络应用中,如果要同时处理多个客户端请求,或是在客户端要同时和多个服务器进行通讯,就必须使用多线程来处理。也就是说,将每一个客户端请求分配给一个线程来单独处理。这样做虽然可以达到我们的要求,但同时又会带来另外一个问题。由于每创建一个线程,就要为这个线程分配一定的内存空间(也叫工作存储器),而且操作系统本身也对线程的总数有一定的限制。如果客户端的请求过多,服务端程序可能会因为不堪重负而拒绝客户端的请求,甚至服务器可能会因此而瘫痪。
NIO基于Selector,当socket有流可读或可写入socket时,操作系统会相应的通知引用程序进行处理,应用再将流读取到缓冲区或写入操作系统。也就是说,这个时候,已经不是一个连接就要对应一个处理线程了,而是有效的请求,对应一个线程,当连接没有数据时,是没有工作线程来处理的。

public class PlainNioEchoServer {
  public void serve(int port) throws IOException {
    System.out.println(\"Listening for connections on port \" + port);
    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    ServerSocket ss = serverChannel.socket();
    InetSocketAddress address = new InetSocketAddress(port);
    //Bind server to port
    ss.bind(address);
    serverChannel.configureBlocking(false);
    Selector selector = Selector.open();
    //Register the channel with the selector to be interested in new Client connections that get accepted
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    while (true) {
      try {
        //Block until something is selected
        selector.select();
      } catch (IOException ex) {
        ex.printStackTrace();
        //handle in a proper way
        break;
      }
      //Get all SelectedKey instances
      Set readyKeys = selector.selectedKeys();
      Iterator iterator = readyKeys.iterator();
      while (iterator.hasNext()) {
        SelectionKey key = (SelectionKey) iterator.next();
        //Remove the SelectedKey from the iterator
        iterator.remove();
        try {
          if (key.isAcceptable()) {
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            //Accept the client connection
            SocketChannel client = server.accept();
            System.out.println(\"Accepted connection from \" + client);
            client.configureBlocking(false);
            //Register connection to selector and set ByteBuffer
            client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));
          }
          //Check for SelectedKey for read
          if (key.isReadable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer output = (ByteBuffer) key.attachment();
            //Read data to ByteBuffer
            client.read(output);
          }
          //Check for SelectedKey for write
          if (key.isWritable()) {
            SocketChannel client = (SocketChannel) key.channel();
            ByteBuffer output = (ByteBuffer) key.attachment();
            output.flip();
            //Write data from ByteBuffer to channel
            client.write(output);
            output.compact();
          }
        } catch (IOException ex) {
          key.cancel();
          try {
            key.channel().close();
          } catch (IOException cex) {
          }
        }
      }
    }
  }
}

AIO

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。
即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。
在JDK1.7中,这部分内容被称作NIO.2,主要在java.nio.channels包下增加了下面四个异步通道:
+ AsynchronousSocketChannel
+ AsynchronousServerSocketChannel
+ AsynchronousFileChannel
+ AsynchronousDatagramChannel
其中的read/write方法,会返回一个带回调函数的对象,当执行完读取/写入操作后,直接调用回调函数。

public class PlainNio2EchoServer {
  public void serve(int port) throws IOException {
    System.out.println(\"Listening for connections on port \" + port);
    final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress(port);
    // Bind Server to port
    serverChannel.bind(address);
    final CountDownLatch latch = new CountDownLatch(1);
    // Start to accept new Client connections. Once one is accepted the CompletionHandler will get called.
    serverChannel.accept(null, new CompletionHandler() {
      @Override
      public void completed(final AsynchronousSocketChannel channel, Object attachment) {
        // Again accept new Client connections
        serverChannel.accept(null, this);
        ByteBuffer buffer = ByteBuffer.allocate(100);
        // Trigger a read operation on the Channel, the given CompletionHandler will be notified once something was read
        channel.read(buffer, buffer, new EchoCompletionHandler(channel));
      }

      @Override
      public void failed(Throwable throwable, Object attachment) {
        try {
          // Close the socket on error
          serverChannel.close();
        } catch (IOException e) {
          // ingnore on close
        } finally {
          latch.countDown();
        }
      }
    });
    try {
      latch.await();
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private final class EchoCompletionHandler implements CompletionHandler {
    private final AsynchronousSocketChannel channel;

    EchoCompletionHandler(AsynchronousSocketChannel channel) {
      this.channel = channel;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
      buffer.flip();
      // Trigger a write operation on the Channel, the given CompletionHandler will be notified once something was written
      channel.write(buffer, buffer, new CompletionHandler() {
        @Override
        public void completed(Integer result, ByteBuffer buffer) {
          if (buffer.hasRemaining()) {
            // Trigger again a write operation if something is left in the ByteBuffer
            channel.write(buffer, buffer, this);
          } else {
            buffer.compact();
            // Trigger a read operation on the Channel, the given CompletionHandler will be notified once something was read
            channel.read(buffer, buffer, EchoCompletionHandler.this);
          }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
          try {
            channel.close();
          } catch (IOException e) {
            // ingnore on close
          }
        }
      });
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
      try {
        channel.close();
      } catch (IOException e) {
        // ingnore on close
      }
    }
  }
}

实现原理

说道实现原理,还要从操作系统的IO模型上了解
按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。
如何区分呢?首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。

收到操作系统的IO模型,又不得不提select/poll/epoll/iocp。
可以理解的说明是:在Linux 2.6以后,java NIO的实现,是通过epoll来实现的,这点可以通过jdk的源代码发现。而AIO,在windows上是通过IOCP实现的,在linux上还是通过epoll来实现的。
这里强调一点:AIO,这是I/O处理模式,而epoll等都是实现AIO的一种编程模型;换句话说,AIO是一种接口标准,各家操作系统可以实现也可以不实现。在不同操作系统上在高并发情况下最好都采用操作系统推荐的方式。Linux上还没有真正实现网络方式的AIO。

底层基础

在windows上,AIO的实现是通过IOCP来完成的,看JDK的源代码,可以发现

WindowsAsynchronousSocketChannelImpl

看实现接口:

implements Iocp.OverlappedChannel

再看实现方法:里面的read0/write0方法是native方法,调用的jvm底层实现。

在linux上,AIO的实现是通过epoll来完成的,看JDK源码,可以发现,实现源码是:

UnixAsynchronousSocketChannelImpl

看实现接口:

implements Port.PollableChannel

这是与windows最大的区别,poll的实现,在linux2.6后,默认使用epoll。

Java 并发编程:AQS框架

Java 并发编程:AQS框架

1. 简介

Java内置的锁,其优势是可以花最小的空间开销创建锁(因为每个JAVA对象都可以作为锁使用)和最少的时间开销获得锁(单线程可以在最短时间内获得锁)。线程同步越来越多地被用在多处理器上,特别是在高并发的情况下,然而,JVM内置锁的表现一般,而且不支持任何公平策略。从Java 5开始在java.util.concurrent包中引入了有别于synchronized的同步框架。
设计一个同步器至少应该具以下有两种操作:一个获取方法,如果当前状态不允许,将一直阻塞这个线程;一个释放方法,修改状态,让其他线程有运行的机会。
并发包中并没有为同步器提供一个统一的API,获取和释放方法在不同的类中的名称不同,比如获取方法有:Lock.lock, Semaphore.acquire, CountDownLatch.await和FutureTask.get.这些方法一般都重载有多种版本:阻塞与非阻塞版本、支持超时、支持中断。
java.util.concurrent包中有很多同步类,比如互斥锁、读写锁、信号量等,这些同步类几乎都可以用不同方式来实现,但是JSR166建立了一个同步中心类AbstractQueuedSynchronizer(简称:AQS)的框架,其中提供了大量的同步操作,而且用户还可以在此类的基础上自定义自己的同步类。其设计目标主要有两点:
1、提高可扩展性,用户可以自定义自己的同步类
2、最大限度地提高吞吐量,提供自定义公平策略

2. 设计和实现

同步器的设计比较直接,前面提到包含获取和释放两个操作:
获取操作过程如下:

while (synchronization state does not allow acquire) {
    enqueue current thread if not already queued;
    possibly block current thread;
}
dequeue current thread if it was queued;

释放操作:

update synchronization state;
if (state may permit a blocked thread to acquire)
    unblock one or more queued threads;

要满足以上两个操作,需要以下3点来支持:
1、原子操作同步状态;
2、阻塞或者唤醒一个线程;
3、内部应该维护一个队列。

3. 同步状态

AQS用的是一个32位的整型来表示同步状态的,可以通过以下几个方法来设置和修改这个状态字段:getState(),setState(),compareAndSetState().这些方法都需要java.util.concurrent.atomic包的支持,采用CAS操作。
将state设置为32位整型是一个务实的决定,虽然JSR166提供了64位版本的原子操作,但它还是使用对象内部锁来实现的,如果采用64位的state会导致同步器表现不良好。32位同步器满足大部分应用,如果确实需要64位的状态,可以使用AbstractQueuedLongSynchronizer类。
AQS是一个抽象类,如果它的实现类想要拥有对获取和释放的控制权,那它必须实现tryAcquire和tryRelease两个方法。

4. 阻塞

JSR166以前还没有好的阻塞和解除阻塞线程的API可以使用,只有Thread.suspend() 和 Thread.resume(),但这两个方法已经被废弃了,原因是有可能导致死锁。如果一个线程拥有监视器然后调用 Thread.suspend() 使自已阻塞,另一个线程试图调用Thread.resume()去唤醒它,那么这个线程去获取监视器时即出现死锁。
前面提到的LockSupport解决了这个问题,LockSupport.park()可以阻塞一个线程,LockSupport.unpack()可以解除阻塞.调用一次park(),然后调用多次unpack()只会唤醒一个线程.阻塞针对线程而不是针对同步器。

5. 队列

同步框架最重要的是要有一个同步队列,在这里被严格限制为FIFO队列,因此这个同步框架不支持基于优先级的同步策略。同步队列采用非阻塞队列毋庸置疑,当时非阻塞队列只有两个可供选择CLH队列锁和MCS队列锁.原始的CLH Lock仅仅使用自旋锁,但是相对于MCS Lock它更容易处理cancel和timeout,所以选择了CLH Lock。
CLH队列锁的优点是:进出队快,无锁,畅通无阻(即使在有竞争的情况下,总有一个线程总是能够很快插入到队尾);检查是否有线程在等待也是很容易的(只需要检查头尾指针是否相同)。最后设计出来的变种CLH Lock和原始的CLH Lock有较大的差别:
1、为了可以处理timeout和cancel操作,每个node维护一个指向前驱的指针。如果一个node的前驱被cancel,这个node可以前向移动使用前驱的状态字段。
2、第二个变动是在每个node里使用一个状态字段去控制阻塞,而不是自旋。一个排队的线程调用acquire(),只有在通过了子类实现的tryAcquire()才能返回,确保只有队头线程才允许调用tryAcquire()。
3、另外还有一些微小的改动:head结点使用的是傀儡结点。
变种的CLH队列如下图所示:
\"java-concurrent-4\"
结点中有一个状态位,这个状态位与线程状态密切相关,这个状态位(waitStatus)是一个32位的整型常量,它的取值如下:

static final int CANCELLED =  1;  //因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收
static final int SIGNAL    = -1;  //表示这个结点的继任结点被阻塞了,到时需要通知它
static final int CONDITION = -2;  //表示这个结点在条件队列中,因为等待某个条件而被阻塞
static final int PROPAGATE = -3;  //使用在共享模式头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播
0://新结点会处于这种状态。

同步框架提供了一个ConditionObject,一般和Lock接口配合来支持互斥模型,它提供类似JVM同步器的操作。条件对象可以和其他同步器有效的整合,它修复了JVM内置同步器的不足:一个锁可以有多个条件。条件结点内部也有一个状态字段,条件结点是通过nextWaiter指针串起来的一个独立的队列。条件队列中的线程在获取锁之前,必须先被transfer到同步队列中去。transfer先断开条件队列的第一个结点,然后插入到同步队列中,这个新插入到同步队列中的结点和同步队列中的结点一起排队等待获取锁。

6. 用法

AbstractQueuedSynchronizer是一个采用模板方法模式实现的同步器基类,子类只需要实现获取和释放方法。子类一般不直接用于同步控制。因为获取和释放方法一般是私有的,实现细节不必暴露出来,所以常用委派的方法来使用同步器类:在一个类的内部申请一个私有的AQS的子类,委派它的所有同步方法。
AbstractQueuedSynchronizer类还提供了其他一些同步控制方法,包括超时和中断版的获取方法,还集成了独占模式的同步器,如acquireShared(),tryReleaseShared()等方法。
虽然内部队列被设计为FIFO,但并不意味着这个同步器一定是公平的。前面谈到,在tryAcquire()检查之后再排队。因此,新线程完全可以偷偷排在第一个线程前面。之所以不采用FIFO,有时候是想获得更高的吞吐量,为了减少等待时间,新到的线程与队列头部的线程一起公平竞争,如果新来的线程比队头的线程快,那么这个新来的线程就获取锁。队头线程失去竞争会再次阻塞,它的继任也将会被阻塞,但这样能避免饥饿。
如果需要绝对公平,那很简单,只需要在tryAcquire()方法,不在队头返回false即可。检查是否在队头可以使用getFirstQueuedThread()方法。有一情况是,队列是空的,同时有多个线程一拥而入,谁先抢到锁就谁运行,这其实与公平并不冲突,是对公平的补充。

7. 获取与释放锁

AbstractQueuedSynchronizer中比较重要的两个操作是获取和释放,以下是各种获取操作:

public final void acquire(int arg);
public final void acquireInterruptibly(int arg);
public final void acquireShared(int arg);
public final void acquireSharedInterruptibly(int arg);
protected boolean tryAcquire(int arg); 
protected int tryAcquireShared(int arg);
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException;
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;

释放操作

public final boolean release(int arg);
protected boolean tryRelease(int arg);
protected boolean tryReleaseShared(int arg);

AbstractQueuedSynchronizer的内部类Node类中有两个常量SHARE和EXCLUSIVE,顾名思义这两个常量用于表示这个结点支持共享模式还是独占模式,共享模式指的是允许多个线程获取同一个锁而且可能获取成功,独占模式指的是一个锁如果被一个线程持有,其他线程必须等待。多个线程读取一个文件可以采用共享模式,而当有一个线程在写文件时不会允许另一个线程写这个文件,这就是独占模式的应用场景。

1) 独占模式

AbstractQueuedSynchronizer类方法中方法名不含shared的默认是独占模式,在独占模式下,子类需要重写tryAcquire()方法。
线程首先通过tryAcquire()方法在独占模式下获取锁,如果获取成功就直接返回,否则通过acquireQueued()获取锁,如果仍然失败则selfInterrupt当前线程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //如果获取锁失败,那么就创建一个代表当前线程的结点加入到等待队列的尾部
            selfInterrupt();
    }

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) { //判断队列中是否有元素
            node.prev = pred; //如果有
            if (compareAndSetTail(pred, node)) { //就设置当前结点为队尾结点
                pred.next = node;
                return node;
            }
        }
        enq(node); //如果没有元素,表示队列为空,做入队操作
        return node;
    }

    private Node enq(final Node node) { //enq方法采用的是变种CLH算法
        for (;;) {
            Node t = tail;
            if (t == null) { //先看头结点是否为空,这一步只会在队列初始化时会执行
                if (compareAndSetHead(new Node())) //如果为空就创建一个傀儡结点
                    tail = head; //头尾指针都指向这个傀儡结点
            } else { //如果头结点非空
                node.prev = t;
                if (compareAndSetTail(t, node)) { //采用CAS操作将当前结点插入到头结点后面
                    t.next = node; //如果在插入的时候尾结点有变化,就将尾结点向后移动直到移动到最后一个结点为止,然后再把当前结点插入到尾结点后面,尾指针指向当前结点
                    return t; //入队成功
                }
            }
        }
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //判断新结点的前趋结点是否为头结点
                if (p == head && tryAcquire(arg)) { //如果它的前趋是头结点,让前趋获取锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && //如果不是头结点,就将前趋结点的状态标志位设置为SIGNAL
                    parkAndCheckInterrupt()) //当前线程可以安全地挂起,整个过程结束
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

将新加入的结点放入队列之后,这个结点有两种状态,要么获取锁,要么就挂起,如果这个结点不是头结点,就看看这个结点是否应该挂起,如果应该挂起,就挂起当前结点,是否应该挂起是通过shouldParkAfterFailedAcquire()方法来判断的

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; //首先检查前趋结点的waitStatus位
        if (ws == Node.SIGNAL) //如果为SIGNAL,表示前趋结点会通知它,那么它可以放心大胆地挂起了
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { //如果前趋结点是一个被取消的结点
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); //就向前遍历跳过被取消的结点,直到找到一个没有被取消的结点为止
            pred.next = node; //将找到的这个结点作为它的前趋结点
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don\'t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); //将找到的这个结点的waitStatus位设置为SIGNAL
        }
        return false; //返回false表示线程不应该被挂起
    }

独占模式下释放锁是通过方法release ()来实现的,首先调用子类的tryRelease()尝试释放锁,如果失败,直接返回;如果成功调用unparkSuccessor ()方法做后续处理。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

释放锁成功后需要唤醒继任结点,是通过方法unparkSuccessor实现的

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; //node参数传进来的是头结点,首先检查头结点的waitStatus位
        if (ws < 0) //如果为负,表示头结点还需要通知后继结点,后面会通知后续节点,因此将该该标志位清0.
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; //然后查看头结点的下一个结点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        } //如果下一个结点不为空且它的waitStatus<=0,表示后继结点没有被取消,是一个可以唤醒的结点,于是唤醒后继结点返回;如果后继结点为空或者被取消了,则寻找下一个可唤醒的结点,然后唤醒它返回。
        if (s != null)
            LockSupport.unpark(s.thread);
}

2) 共享模式

如果子类想支持共享模式,同样必须重写tryAcquireShared()方法,线程首先通过tryAcquireShared()方法在共享模式下获取锁,如果获取成功就直接返回,否则通过doAcquireShared()获取锁

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) //如果获取成功就直接返回
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); //创建一个新结点(共享模式),加入到队尾
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //判断新结点的前趋结点是否为头结点
                if (p == head) { //如果它的前趋是头结点 
                    int r = tryAcquireShared(arg); //让前趋在共享模式下获取锁
                    if (r >= 0) { //如果获取成功,把当前结点设置为头结点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) && //如果不是头结点,就将前趋结点的状态标志位设置为SIGNAL
                    parkAndCheckInterrupt()) //当前线程可以安全地挂起,整个过程结束
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

共享模式下释放锁是通过方法releaseShared()来实现的,首先调用子类的tryReleaseShared()尝试释放锁,如果失败,直接返回;如果成功调用doReleaseShared方法做后续处理。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
doReleaseShared()方法
    private void doReleaseShared() { //这个方法就一个目的,就是把当前结点设置为SIGNAL或者PROPAGATE
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { //如果当前结点不是头结点也不是尾结点
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //先判断当前结点的状态位是否为SIGNAL,如果是就设置为0
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

因为共享模式下更多使用PROPAGATE来传播,SIGNAL会被经过两步改为PROPAGATE:

compareAndSetWaitStatus(h, Node.SIGNAL, 0)
compareAndSetWaitStatus(h, 0, Node.PROPAGATE)

为什么要经过两步呢?原因在unparkSuccessor()方法:如果直接从SIGNAL到PROPAGATE,那么到unparkSuccessor()方法里面又被设置为0:SIGNAL->PROPAGATE->0->PROPAGATE,对头结点相当于多做了一次compareAndSet操作。

Java 并发编程:锁

Java 并发编程:锁

1. Object上的锁

首先介绍一下Java中最基本的锁:Object.wait()、Object.notify()和Object.notifyAll()。
wait、notify和notifyAll方法是Object类的final native方法。所以这些方法不能被子类重写。
void notifyAll()解除所有那些在该对象上调用wait方法的线程的阻塞状态。该方法只能在同步方法或同步块内部调用。如果当前线程不是锁的持有者,该方法抛出一个IllegalMonitorStateException异常。
void notify()随机选择一个在该对象上调用wait方法的线程,解除其阻塞状态。该方法也只能在同步方法或同步块内部调用。如果当前线程不是锁的持有者,该方法抛出一个IllegalMonitorStateException异常。需要注意的是notify选择在该对象上调用wait方法的线程是随机的
void wait()、void wait(long millis)和void wait(long millis,int nanos)使线程进入等待状态,直到它被其他线程通过notify或者notifyAll唤醒。该方法只能在同步方法中调用。如果当前线程不是锁的持有者,该方法抛出一个IllegalMonitorStateException异常。
Object.wait()和Object.notify()和Object.notifyAll()必须写在synchronized方法内部或者synchronized块内部,这是因为:这几个方法要求当前正在运行Object.wait()方法的线程拥有object的对象锁。即使你确实知道当前上下文线程确实拥有了对象锁,也不能将Object.wait()这样的语句写在当前上下文中。
下面是一个简单的例子:
info.halo9pan.samples.java.thread.obj.ObjectNotify

        final Object lock = new Object();
        Thread waitThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Wait Thread was started.\");
                    synchronized (lock) {
                        lock.wait();
                    }
                    System.out.println(\"Wait Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread notifyThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Notify Thread was started.\");
                    System.out.println(\"Notify Thread sleep 1s.\");
                    Thread.sleep(1000L);
                    System.out.println(\"Notify Thread notify.\");
                    synchronized (lock) {
                        lock.notify();
                    }
                    System.out.println(\"Notify Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        waitThread.start();
        notifyThread.start();
Wait Thread was started.
Notify Thread was started.
Notify Thread sleep 1s.
Notify Thread notify.
Notify Thread was finished.
Wait Thread was finished.

这三个方法提供了最基本的锁机制,而且这三个方法是Java的超级父类Object类的final方法,因此所有的Java对象都可以做锁。

2. Thread常用方法

在Java的多线程操作中,用得最多的还是Thread类了。
Thread类中也提供了线程控制的方法,sleep()算是用得最多的了。join()也是一个很实用的方法。Thread.join()把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。比如在线程B中调用了线程A的join()方法,直到线程A执行完毕后,才会继续执行线程B。主线程生成并起动了子线程,而子线程里要进行大量的耗时的运算,当主线程处理完其他的事务后,需要用到子线程的处理结果,这个时候一般就要用到join()方法了。
Thread.interrupt()也是线程控制常用的方法。但是Thread.interrupt()生效的情况比较复杂,调用Thread.interrupt()时只会在Object.wait()、Thread.join()和Thread.sleep()几个方法会主动抛出InterruptedException异常。而在其它的情况下,只是通过设置了Thread的一个标志位信息,需要程序自我进行处理。下面是Thread.interrupt()的一段示例:
info.halo9pan.samples.java.thread.obj.ThreadInterrupt

        class WaitThread extends Thread {
            @Override
            public void run() {
                System.out.println(\"Wait Thread was started.\");
                int times = Integer.MIN_VALUE;
                while (times++ < Integer.MAX_VALUE) {
                }
                System.out.println(\"Wait Thread was finished.\");
            }
        }
        final WaitThread waitThread = new WaitThread();
        Thread interruptThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Interrupt Thread was started.\");
                    TimeUnit.MILLISECONDS.sleep(10L);
                    System.out.println(\"Wait Thread state.\" + waitThread.getState());
                    waitThread.interrupt();
                    System.out.println(\"Interrupt Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        waitThread.start();
        interruptThread.start();
    }
Wait Thread was started.
Interrupt Thread was started.
Wait Thread state.RUNNABLE
Interrupt Thread was finished.
Wait Thread was finished.

这里的Thread.interrupt()并不会打断线程的执行。在Object.wait()的情况下线程才会被打断:
info.halo9pan.samples.java.thread.obj.ThreadInterruptWait

        final Object lock = new Object();
        class WaitThread extends Thread {

            @Override
            public void run() {
                try {
                    System.out.println(\"Wait Thread was started.\");
                    synchronized (lock) {
                        lock.wait();
                    }
                    System.out.println(\"Wait Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        final WaitThread waitThread = new WaitThread();
        Thread interruptThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Interrupt Thread was started.\");
                    TimeUnit.SECONDS.sleep(1L);
                    System.out.println(\"Wait Thread state.\" + waitThread.getState());
                    waitThread.interrupt();
                    System.out.println(\"Interrupt Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        waitThread.start();
        interruptThread.start();
    }
Wait Thread was started.
Interrupt Thread was started.
Wait Thread state.WAITING
Interrupt Thread was finished.
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:503)
    at info.halo9pan.samples.java.thread.obj.ThreadInterruptWait$1WaitThread.run(ThreadInterruptWait.java:16)

Thread.interrupt()设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持RUNNING状态。比如Thread.join()和一些支持interrupt的NIO channel设计:
info.halo9pan.samples.java.thread.obj.ThreadInterruptCheck

        class WaitThread extends Thread {
            @Override
            public void run() {
                System.out.println(\"Wait Thread was started.\");
                int times = Integer.MIN_VALUE;
                try {
                    while (times++ < Integer.MAX_VALUE) {
                        if (Thread.interrupted()) {
                            System.out.println(\"Wait Thread was interrupted.\");
                            throw new InterruptedException(Thread.currentThread().getName());
                        }
                    }
                } catch (InterruptedException e) {
                    new RuntimeException(e);
                }
                System.out.println(\"Wait Thread was finished.\");
            }
        }
        final WaitThread waitThread = new WaitThread();
        Thread interruptThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Interrupt Thread was started.\");
                    TimeUnit.MILLISECONDS.sleep(1000L);
                    System.out.println(\"Wait Thread state.\" + waitThread.getState());
                    waitThread.interrupt();
                    System.out.println(\"Interrupt Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        waitThread.start();
        interruptThread.start();
    }
Wait Thread was started.
Interrupt Thread was started.
Wait Thread state.RUNNABLE
Interrupt Thread was finished.
Wait Thread was interrupted.
Wait Thread was finished.

3. java.util.concurrent.locks.Lock

之前介绍的都是比较常规的Java锁,也是Java 5之前用得较多的锁实现。但是在Java 5中引入了全新的并发框架,在并发编程的时候,锁的选择也越来越多了。java.util.concurrent.locks.Lock就是在Java 5中引入的,主要有下面几个方法:

public interface Lock {
    void lock(); //常规地获得锁
    void lockInterruptibly() throws InterruptedException; //可中断地获得锁
    boolean tryLock(); //尝试性获得锁,非阻塞
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //尝试性获得锁,如果超时则返回
    void unlock(); //解锁
    Condition newCondition(); //生成和当前锁相关的条件(队列)对象
}

而java.util.concurrent.locks.ReentrantLock是Lock的主要实现,实现了Lock和Serializable接口。主要的属性只有内部类Sync的对象属性sync,ReentrantLock类的操作实际上都落在了sync身上。除此之外,ReentrantLock是可重入锁,还有一些支持可重入的方法,这里不细说。可以说ReetrantLock是基于它的内部类Sync的对象来实现的。在ReentrantLock中,Sync有FairSync和Nonfair两个子类,而父层有AbstactOwnableSynchronizer和AbstractQueuedSynchronizer。前者实现了当前同步器被哪个线程占有的逻辑,实现了get/setExclusiveOwnerThread()方法,确定获取当前互斥锁的Thread对象。后者则是java.util.concurrent包中非常重要的类,它为并发包中的其他synchronizers提供了一组公共的基础设施。AbstractQueuedSynchronizer会在下面的章节中介绍。这里还是主要介绍ReentrantLock。
ReentrantLock三个加锁的方法中,lockInterruptibly 与 lock比较区别在于:lockInterruptibly 优先考虑响应中断,而不是响应锁定的普通获取或重入获取,而tryLock是非阻塞的,只是尝试性的加锁。下面通过三个例子来看看它们的区别:
info.halo9pan.samples.java.thread.lock.BasicReentrantLock

    final Lock lock = new ReentrantLock();
        class WaitThread extends Thread{
            String token;
            public WaitThread(String token) {
                super();
                this.token = token;
            }
            @Override
            public void run() {
                try {
                    System.out.println(\"Wait Thread \" + token + \" was started.\");
                    lock.tryLock();
                    System.out.println(\"Wait Thread \" + token + \" got the lock.\");
                    TimeUnit.SECONDS.sleep(4L);
                    System.out.println(\"Wait Thread \" + token + \" was finished.\");
                    lock.unlock();
                } catch (InterruptedException e) {
                    System.out.println(\"Wait Thread \" + token + \" was interrupted.\");
                }
            }
        }
        final WaitThread oneThread = new WaitThread(\"one\");
        final WaitThread twoThread = new WaitThread(\"two\");
        Thread modifyThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Interrupt Thread was started.\");
                    TimeUnit.SECONDS.sleep(1L);
                    twoThread.interrupt();
                    System.out.println(\"Interrupt Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        oneThread.start();
        twoThread.start();
        modifyThread.start();
    }
Wait Thread one was started.
Interrupt Thread was started.
Wait Thread two was started.
Wait Thread one got the lock.
Interrupt Thread was finished.
Wait Thread one was finished.
Wait Thread two got the lock.
Wait Thread two was interrupted.

info.halo9pan.samples.java.thread.lock.BasicReentrantInterruptLock

    lock.lockInterruptibly();
Wait Thread one was started.
Wait Thread one got the lock.
Wait Thread two was started.
Interrupt Thread was started.
Interrupt Thread was finished.
Wait Thread two was interrupted.
Wait Thread one was finished.

可以看到,在调用ReentrantLock.lockInterruptibly()加锁时,Thread two优先被中断了,而ReentrantLock.lock ()加锁时,Thread two获得了锁之后才会被中断。
info.halo9pan.samples.java.thread.lock.BasicReentrantTryLock

    boolean success = lock.tryLock();
    System.out.println(\"Wait Thread \" + token + \" got the lock: \" + success);
Wait Thread one was started.
Wait Thread one got the lock: true
Interrupt Thread was started.
Wait Thread two was started.
Wait Thread two got the lock: false
Interrupt Thread was finished.
Wait Thread two was interrupted.
Wait Thread one was finished.

在调用ReentrantLock.tryLock()加锁时,Thread one获得了锁,但是Thread two并没有获得锁。

4. Condition条件变量

条件变量是线程同步对象中的一种,主要用来等待某种条件的发生,条件发生后,可以唤醒等待在该条件上的一个线程,或所有线程。条件变量要与锁一起协同工作。条件变量作用于前面提到的Object.wait()、Object.notify()和Object.notifyAll()是一致的。
条件变量调用Lock.newCondition()获得一个实例,通常的调用方式如下:
info.halo9pan.samples.java.thread.lock.BasicLockCondition

    final Lock lock = new ReentrantLock();
        final Condition condition = lock.newCondition();
        class WaitThread extends Thread{
            String token;
            public WaitThread(String token) {
                super();
                this.token = token;
            }
            @Override
            public void run() {
                try {
                    System.out.println(\"Wait Thread \" + token + \" was started.\");
                    lock.lock();
                    System.out.println(\"Wait Thread \" + token + \" got the lock.\");
                    System.out.println(\"Wait Thread \" + token + \" start to wait.\");
                    condition.await();
                    System.out.println(\"Wait Thread \" + token + \" finish waiting.\");
                    lock.unlock();
                    System.out.println(\"Wait Thread \" + token + \" was finished.\");
                } catch (InterruptedException e) {
                    System.out.println(\"Wait Thread \" + token + \" was interrupted.\");
                }
            }
        }
        final WaitThread waitThread = new WaitThread(\"one\");
        Thread modifyThread = new Thread() {
            @Override
            public void run() {
                try {
                    System.out.println(\"Signal Thread was started.\");
                    TimeUnit.SECONDS.sleep(1L);
                    lock.lock();
                    System.out.println(\"Signal Thread got the lock.\");
                    condition.signal();
                    lock.unlock();
                    System.out.println(\"Signal Thread was finished.\");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        waitThread.start();
        modifyThread.start();
    }

值得注意的是,当Condition.await()时,隐式的将条件变量关联的Lock解锁,而使其他线程有机会获得Lock,而检查条件,并在条件满足时,等待在条件变量上。

5. java.util.concurrent.locks.ReentrantReadWriteLock

在使用某些种类的 Collection 时,可以使用 ReentrantReadWriteLock 来提高并发性。通常,在预期 collection很大,读取者线程访问它的次数多于写入者线程,并且读写操作的开销高于同步开销时,和适合使用ReadWriteLock。
而且Java 5中还提供了读写锁:ReadWriteLock。在多线程的环境下,对同一份数据进行读写,会涉及到线程安全的问题。比如在一个线程读取数据的时候,另外一个线程在写数据,而导致前后数据的不一致性;一个线程在写数据的时候,另一个线程也在写,同样也会导致线程前后看到的数据的不一致性。这时候可以在读写方法中加入互斥锁,任何时候只能允许一个线程的一个读或写操作,而不允许其他线程的读或写操作,这样是可以解决这样以上的问题,但是效率却大打折扣了。因为在真实的业务场景中,一份数据,读取数据的操作次数通常高于写入数据的操作,而线程与线程间的“读-读”操作是不涉及到线程安全的问题,没有必要加入互斥锁,只要在“读-写”,“写-写”期间上锁就行了。
对于这种情况,读写锁则最好的解决方案!读写锁的基本原则是:“读-读”不互斥,“读-写”互斥,“写-写”互斥,即在任何时候必须保证:只有一个线程在写入;线程正在读取的时候,写入操作等待;线程正在写入的时候,其他线程的写入操作和读取操作都要等待;

6. synchronized与ReentrantLock性能比较

前面提到过synchronized关键字,加上ReentrantLock,是Java里面主要的两种锁机制。ReentrantLock是Java 5的新特性,采用ReentrantLock可以完全替代替换synchronized传统的锁机制,而且采用ReentrantLock的方式更加面向对象,也更加灵活。也许有不少老文章认为ReentrantLock有比synchronized更好的性能,但是随着Java对synchronized的不断优化,synchronized在大部分的场景下已经有很好的性能,所以简单的写了一些例子测试一下,比较一下两者的性能。
info.halo9pan.samples.java.thread.lock.perf.InstantOperation

    public static void main(String[] args) {
        int[] numbers = {1, 2, 4, 8, 16, 2000};
        int times = 1;
        for (int i : numbers) {
            System.out.println(\"================================================================================\");
            SimpleDemo sync = new InstantOperation(i, new SyncRunner());
            System.out.print(String.format(\"%-20s\", \"Using synchronize:\"));
            sync.batchShow(times);
            SimpleDemo lock = new InstantOperation(i, new LockRunner());
            System.out.print(String.format(\"%-20s\", \"Using ReentrantLock:\"));
            lock.batchShow(times);
        }
    }
    public InstantOperation(int threadNumber, Runner runner) {
        super(threadNumber, runner);
    }
    static class LockRunner extends Runner {
        private Lock lock = new ReentrantLock();
        @Override
        public void doSomething(int invoker) {
            lock.lock();
            this.identifier++;
            lock.unlock();
        }
    }
    static class SyncRunner extends Runner {
        @Override
        public void doSomething(int invoker) {
            synchronized (this) {
                this.identifier++;
            }
        }
}
================================================================================
Using synchronize:  Used thread:    1. Run times: 1. Spent time:      7636137ns. Average time:  7636137ns. 
Using ReentrantLock:Used thread:    1. Run times: 1. Spent time:       741505ns. Average time:   741505ns. 
================================================================================
Using synchronize:  Used thread:    2. Run times: 1. Spent time:      1095153ns. Average time:   547576ns. 
Using ReentrantLock:Used thread:    2. Run times: 1. Spent time:      1124660ns. Average time:   562330ns. 
================================================================================
Using synchronize:  Used thread:    4. Run times: 1. Spent time:      1261072ns. Average time:   315268ns. 
Using ReentrantLock:Used thread:    4. Run times: 1. Spent time:       972424ns. Average time:   243106ns. 
================================================================================
Using synchronize:  Used thread:    8. Run times: 1. Spent time:      2028663ns. Average time:   253582ns. 
Using ReentrantLock:Used thread:    8. Run times: 1. Spent time:      1751133ns. Average time:   218891ns. 
================================================================================
Using synchronize:  Used thread:   16. Run times: 1. Spent time:      3870453ns. Average time:   241903ns. 
Using ReentrantLock:Used thread:   16. Run times: 1. Spent time:      3483023ns. Average time:   217688ns. 
================================================================================
Using synchronize:  Used thread: 2000. Run times: 1. Spent time:    489466690ns. Average time:   244733ns. 
Using ReentrantLock:Used thread: 2000. Run times: 1. Spent time:    381708507ns. Average time:   190854ns.

info.halo9pan.samples.java.thread.lock.perf.ProcessOperation

    public static void main(String[] args) {
        int[] numbers = {1, 2, 4, 8, 16, 2000};
        int times = 1;
        for (int i : numbers) {
            System.out.println(\"================================================================================\");
            SimpleDemo sync = new ProcessOperation(i, new SyncRunner());
            System.out.print(String.format(\"%-20s\", \"Using synchronize:\"));
            sync.batchShow(times);
            SimpleDemo lock = new ProcessOperation(i, new LockRunner());
            System.out.print(String.format(\"%-20s\", \"Using ReentrantLock:\"));
            lock.batchShow(times);
        }
    }
    public ProcessOperation(int threadNumber, Runner runner) {
        super(threadNumber, runner);
    }
    static class LockRunner extends Runner {
        private Lock lock = new ReentrantLock();
        @Override
        public void doSomething(int invoker) {
            lock.lock();
            try {
                TimeUnit.MICROSECONDS.sleep(2);
            } catch (InterruptedException e) {
            }
            this.identifier++;
            lock.unlock();
        }
    }
    static class SyncRunner extends Runner {
        @Override
        public void doSomething(int invoker) {
            synchronized (this) {
                try {
                    TimeUnit.MICROSECONDS.sleep(2);
                } catch (InterruptedException e) {
                }
                this.identifier++;
            }
        }
}
================================================================================
Using synchronize:  Used thread:    1. Run times: 1. Spent time:      6863843ns. Average time:  6863843ns. 
Using ReentrantLock:Used thread:    1. Run times: 1. Spent time:       722262ns. Average time:   722262ns. 
================================================================================
Using synchronize:  Used thread:    2. Run times: 1. Spent time:      2398133ns. Average time:  1199066ns. 
Using ReentrantLock:Used thread:    2. Run times: 1. Spent time:      1552714ns. Average time:   776357ns. 
================================================================================
Using synchronize:  Used thread:    4. Run times: 1. Spent time:      4536268ns. Average time:  1134067ns. 
Using ReentrantLock:Used thread:    4. Run times: 1. Spent time:      4403704ns. Average time:  1100926ns. 
================================================================================
Using synchronize:  Used thread:    8. Run times: 1. Spent time:      9560887ns. Average time:  1195110ns. 
Using ReentrantLock:Used thread:    8. Run times: 1. Spent time:      9407796ns. Average time:  1175974ns. 
================================================================================
Using synchronize:  Used thread:   16. Run times: 1. Spent time:     18672337ns. Average time:  1167021ns. 
Using ReentrantLock:Used thread:   16. Run times: 1. Spent time:     18420465ns. Average time:  1151279ns. 
================================================================================
Using synchronize:  Used thread: 2000. Run times: 1. Spent time:   2358474671ns. Average time:  1179237ns. 
Using ReentrantLock:Used thread: 2000. Run times: 1. Spent time:   2316398302ns. Average time:  1158199ns. 

InstantOperation只是做了简单的自增,而ProcessOperation模拟某些操作,中间暂停了两微秒,在Intel酷睿i3的处理器,Windows 7上做的测试。可以看出,ReentrantLock比synchronized有20%左右的性能提升。

7. synchronized与ReentrantReadWriteLock性能比较

info.halo9pan.samples.java.thread.lock.perf.InstantReadWrite

    public static void main(String[] args) {
        Runner sync = new SyncRunner();
        Runner lock = new LockRunner();
        int[] readNumber = { 2, 4, 1, 4, 2000, 1, 2000 };
        int[] writeNumber = { 2, 1, 4, 4, 1, 2000, 2000 };
        int times = 1;
        for (int i = 0 ; i < readNumber.length; i++) {
            int read = readNumber[i];
            int write = writeNumber[i];
            System.out.println(\"================================================================================\");
            ReadWriteDemo syncDemo = new InstantReadWrite(read, write, sync);
            System.out.print(String.format(\"%-32s\", \"Using synchronize:\"));
            syncDemo.batchShow(times);
            ReadWriteDemo lockDemo = new InstantReadWrite(read, write, lock);
            System.out.print(String.format(\"%-32s\", \"Using ReentrantReadWriteLock:\"));
            lockDemo.batchShow(times);
        }
    }
    public InstantReadWrite(int read, int write, Runner runner) {
        super(read, write);
        setReader(runner);
        setWriter(runner);
    }
    static class SyncRunner extends Runner{
        @Override
        protected String safeRead() {
            String key = getRandomKey();
            String s = null;
            synchronized (this){
                s = super.read(key);
            }
            return s;
        }
        @Override
        protected String safeWrite() {
            String key = getRandomKey();
            String value = randomString(32);
            String s = null;
            synchronized (this){
                s = super.write(key, value);
            }
            return s;
        }
    }
    static class LockRunner extends Runner{
        ReadWriteLock lock = new ReentrantReadWriteLock();
        Lock readLock = lock.readLock();
        Lock writeLock = lock.writeLock();
        @Override
        protected String safeRead() {
            String key = getRandomKey();
            String s = null;
            readLock.lock();
            s = super.read(key);
            readLock.unlock();
            return s;
        }
        @Override
        protected String safeWrite() {
            String key = getRandomKey();
            String value = randomString(32);
            String s = null;
            writeLock.lock();
            s = super.write(key, value);
            writeLock.unlock();
            return s;
        }
    }
================================================================================
Using synchronize:              Used thread:        2/2. Run times: 1. Spent time:      4665840ns. Average time:  1166460ns. 
Using ReentrantReadWriteLock:   Used thread:        2/2. Run times: 1. Spent time:      1255941ns. Average time:   313985ns. 
================================================================================
Using synchronize:              Used thread:        4/1. Run times: 1. Spent time:      1450083ns. Average time:   290016ns. 
Using ReentrantReadWriteLock:   Used thread:        4/1. Run times: 1. Spent time:      1222158ns. Average time:   244431ns. 
================================================================================
Using synchronize:              Used thread:        1/4. Run times: 1. Spent time:      1214889ns. Average time:   242977ns. 
Using ReentrantReadWriteLock:   Used thread:        1/4. Run times: 1. Spent time:     29544329ns. Average time:  5908865ns. 
================================================================================
Using synchronize:              Used thread:        4/4. Run times: 1. Spent time:      1789192ns. Average time:   223649ns. 
Using ReentrantReadWriteLock:   Used thread:        4/4. Run times: 1. Spent time:      1521069ns. Average time:   190133ns. 
================================================================================
Using synchronize:              Used thread:     2000/1. Run times: 1. Spent time:    494456670ns. Average time:   247104ns. 
Using ReentrantReadWriteLock:   Used thread:     2000/1. Run times: 1. Spent time:    383064085ns. Average time:   191436ns. 
================================================================================
Using synchronize:              Used thread:     1/2000. Run times: 1. Spent time:    559353787ns. Average time:   279537ns. 
Using ReentrantReadWriteLock:   Used thread:     1/2000. Run times: 1. Spent time:    456515882ns. Average time:   228143ns. 
================================================================================
Using synchronize:              Used thread:  2000/2000. Run times: 1. Spent time:   1158579953ns. Average time:   289644ns. 
Using ReentrantReadWriteLock:   Used thread:  2000/2000. Run times: 1. Spent time:   1044899135ns. Average time:   261224ns. 

info.halo9pan.samples.java.thread.lock.perf.ProcessReadWrite

    public static void main(String[] args) {
        Runner sync = new SyncRunner();
        Runner lock = new LockRunner();
        int[] readNumber = { 2, 4, 1, 4, 2000, 1, 2000 };
        int[] writeNumber = { 2, 1, 4, 4, 1, 2000, 2000 };
        int times = 1;
        for (int i = 0 ; i < readNumber.length; i++) {
            int read = readNumber[i];
            int write = writeNumber[i];
            System.out.println(\"================================================================================\");
            ReadWriteDemo syncDemo = new ProcessReadWrite(read, write, sync);
            System.out.print(String.format(\"%-32s\", \"Using synchronize:\"));
            syncDemo.batchShow(times);
            ReadWriteDemo lockDemo = new ProcessReadWrite(read, write, lock);
            System.out.print(String.format(\"%-32s\", \"Using ReentrantReadWriteLock:\"));
            lockDemo.batchShow(times);
        }
    }
    public ProcessReadWrite(int read, int write, Runner runner) {
        super(read, write);
        setReader(runner);
        setWriter(runner);
    }
    static class SyncRunner extends ProcessRunner{
        @Override
        protected String safeRead() {
            String key = getRandomKey();
            String s = null;
            synchronized (this){
                s = super.read(key);
            }
            return s;
        }
        @Override
        protected String safeWrite() {
            String key = getRandomKey();
            String value = randomString(32);
            String s = null;
            synchronized (this){
                s = super.write(key, value);
            }
            return s;
        }
    }
    static class LockRunner extends ProcessRunner{
        ReadWriteLock lock = new ReentrantReadWriteLock();
        Lock readLock = lock.readLock();
        Lock writeLock = lock.writeLock();
        @Override
        protected String safeRead() {
            String key = getRandomKey();
            String s = null;
            readLock.lock();
            s = super.read(key);
            readLock.unlock();
            return s;
        }
        @Override
        protected String safeWrite() {
            String key = getRandomKey();
            String value = randomString(32);
            String s = null;
            writeLock.lock();
            s = super.write(key, value);
            writeLock.unlock();
            return s;
        }
    }
================================================================================
Using synchronize:              Used thread:        2/2. Run times: 1. Spent time:      9807628ns. Average time:  2451907ns. 
Using ReentrantReadWriteLock:   Used thread:        2/2. Run times: 1. Spent time:      3200789ns. Average time:   800197ns. 
================================================================================
Using synchronize:              Used thread:        4/1. Run times: 1. Spent time:      6286118ns. Average time:  1257223ns. 
Using ReentrantReadWriteLock:   Used thread:        4/1. Run times: 1. Spent time:      2699610ns. Average time:   539922ns. 
================================================================================
Using synchronize:              Used thread:        1/4. Run times: 1. Spent time:     27321096ns. Average time:  5464219ns. 
Using ReentrantReadWriteLock:   Used thread:        1/4. Run times: 1. Spent time:      5425305ns. Average time:  1085061ns. 
================================================================================
Using synchronize:              Used thread:        4/4. Run times: 1. Spent time:      9596380ns. Average time:  1199547ns. 
Using ReentrantReadWriteLock:   Used thread:        4/4. Run times: 1. Spent time:      7441139ns. Average time:   930142ns. 
================================================================================
Using synchronize:              Used thread:     2000/1. Run times: 1. Spent time:   2367678917ns. Average time:  1183247ns. 
Using ReentrantReadWriteLock:   Used thread:     2000/1. Run times: 1. Spent time:    524961877ns. Average time:   262349ns. 
================================================================================
Using synchronize:              Used thread:     1/2000. Run times: 1. Spent time:   2300219030ns. Average time:  1149534ns. 
Using ReentrantReadWriteLock:   Used thread:     1/2000. Run times: 1. Spent time:   2381686271ns. Average time:  1190248ns. 
================================================================================
Using synchronize:              Used thread:  2000/2000. Run times: 1. Spent time:   4661729854ns. Average time:  1165432ns. 
Using ReentrantReadWriteLock:   Used thread:  2000/2000. Run times: 1. Spent time:   2915582988ns. Average time:   728895ns. 

InstantReadWrite只是做了简单的自增,ReentrantReadWriteLock与ReentrantLock比较接近,比synchronized有20%左右的性能提升。而ProcessReadWrite模拟某些操作,中间暂停了两微秒,在2000读线程,1个写线程的情景下,ReentrantReadWriteLock就充分体现了优势,花费的时间只有synchronized的10%!而且在2000读线程,2000写线程的情景下,平均时间也只有synchronized的60%。这个性能的提升是非常明显的。

8. java.util.concurrent.locks.LockSupport

最后再简单说说LockSupport,LockSupport主要有两个方法:park和unpark,它们主要是为了代替之前Thread中的Thread.suspend()和Thread.resume()的,这两个在Thread中的方法,Java已经不建议使用了,被标注为@Deprecated。
LockSupport的park()和unpark(),与Object.wait()和notify()的功能比较类似,但还是有区别的。首先,它们面向的主体不一样。LockSuport主要是针对Thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。Object.wait()是以对象为基础,阻塞当前的线程和唤醒单个或者所有线程。其次,实现机制不同。虽然LockSuport可以指定观察的object对象,但和Object.wait(),两者的阻塞队列并不交叉,Object.notifyAll()不能唤醒LockSupport的阻塞线程。
LockSupport还有一个方法:park(Object)可以传递一个blocker对象,对应的blcoker会记录在Thread的一个parkBlocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象。
LockSupport.park()能响应interrupt事件,但不会抛出InterruptedException异常。并且LockSupport.park()在任何时候都会“无原因”的返回,所以通常会把它放在无限循环中通过条件判断来退出,它提供了一个比自旋锁性能更好锁机制。
注意LockSupport的park()和unpark()是基于许可的,并且任何时候只有一个许可是有效的。这意味着如果就算LockSupport.park()之前被调用了多次,而LockSupport.unpark()只需要调用一次就能获得许可。同时如果LockSupport.unpark()先于LockSupport.park()调用,LockSupport同样能获得许可而不会被阻塞。
LockSupport往往被应用于线程的框架程序中,而在应用级别的并发程序中用得较少。如果你想开发自己的并发框架,才会较多的用到LockSupport,而编写一般的多线程程序,LockSupport就不是特别合适了。

Java 并发编程:Atomic操作

Java 并发编程:Atomic操作

1. CAS操作

在讨论Java的原子操作之前,首先要普及一下CAS的概念。CAS是compare and swap的缩写
在Java语言之前,并发就已经广泛存在并在服务器领域得到了大量的应用。所以硬件厂商老早就在芯片中加入了大量并发操作的原语,从而在硬件层面提升效率。在Java发展初期,Java是不能够利用硬件提供的这些便利来提升系统的性能的。而随着Java本地方法(JNI)的出现,为Java程序越过JVM直接调用本地方法提供了一种便捷的方式,因而java在并发的手段上也多了起来。而在Java提供的concurrent包中,CAS理论是实现整个Java并发包的基石。
CAS 操作包含三个操作数:内存位置(V)、预期原值(A)和新值(B)。 如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值 。否则,处理器不做任何操作。无论哪种情况,它都会返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不返回当前值。)
通常将 CAS 用于同步的方式是从地址V读取值A,执行多步计算来获得新值B,然后使用CAS将V的值从A改为B。如果V处的值没有被更改,则CAS操作成功。CAS指令允许算法执行读-修改-写操作,而无需害怕其它线程同时修改变量,因为如果其他线程修改变量,那么CAS会检测到,这是在CPU指令级别提供的保障。
可以看到,CAS的操作是一条CPU的指令,是没有锁的,因此也不存在堵塞,比较和交换的操作是在一条CPU指令中完成的,所以它的性能是非常非常的高的。

2. volatile关键字

在Java内存模型中,有主内存,每个线程也有自己的内存空间,每个线程的栈都是该线程私有的,当然堆则是所有线程共享的。为了性能,一个线程会在自己的线程栈中保持要访问的变量值或者引用的副本。这样就会出现同一个变量在某个瞬间,在一个线程中的值或者引用可能与另外一个线程中的值或者引用,或者主内存中的不一致的情况。
所以,Java引入了volatile关键字,如果一个变量声明为volatile,就意味着这个变量是随时会被其他线程修改的,它的值或者引用不会放在线程栈中,而是放在了主栈中,供所有线程访问修改。
用volatile修饰后的变量不允许有不同于主内存区域的变量拷贝。换句话说,一个变量经 volatile修饰后在所有线程中必须是同步的;任何线程中改变了它的值,所有其他线程立即获取到了相同的值。理所当然的,volatile修饰的变量存取时比一般变量消耗的资源要多一点,因为线程有它自己的变量拷贝时更为高效。
但是volatile一般情况下不能代替sychronized,因为volatile不能保证操作的原子性,即使只是i++,实际上也是由多个原子操作组成:读,加,写。假如多个线程同时执行i++,volatile只能保证他们操作的i是同一块内存,但依然可能出现写入脏数据的情况。
info.halo9pan.samples.java.thread.atom.VolatileField

    volatile private int number;

    @Override
    public void doSomething(int invoker) {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.5));
            int n = this.number;
            this.number = n + 1;
            Thread.sleep((long) (time * 0.5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
No synchronized key work,one field add volatile key work.
[1] finished, spent time: 1000
[0] finished, spent time: 1000
.
.
.
.
[658] finished, spent time: 1005
[698] finished, spent time: 1000
Max spent time: 1030
Runner Identifier: 992

这个例子可以看出来,虽然成员变量添加了volatile修饰,但是在多线程环境下,其自增的操作仍然是线程不安全的。

3. java.util.concurrent.atomic

JDK在java.util.concurrent.atomic的包里面封装了大部分的原子操作类,下面简单介绍一下。

1) AtomicBoolean, AtomicInteger, AtomicLong

AtomicBoolean, AtomicInteger, AtomicLong这三个类比较类似,提供了boolean,int和long型的原子操作。以AtomicInteger为例。

int get()

获取当前值。

void set(int newValue)

设置为给定值。直接修改原始值,也就是i=newValue操作。

int getAndIncrement()

线程安全版本的i++操作。

int incrementAndGet()

线程安全版本的++i操作。

int getAndDecrement()

线程安全版本的i–操作。

int decrementAndGet()

线程安全版本的–i操作。

int addAndGet(int delta)

以原子方式将给定值与当前值相加。线程安全版本的i=i+delta操作。

int getAndAdd(int delta)

以原子方式将给定值与当前值相加。线程安全版本的n=i;i+=delta;return n;操作。

boolean compareAndSet(int expect, int update)

如果当前值等于预期值,则以原子方式将该值设置为给定的更新值。如果成功就返回true,否则返回false,并且不修改原值。
int getAndSet(int newValue)
以原子方式设置为给定值,并返回旧值。相当于线程安全版本的n=i;i=newValue;return n;操作。

void lazySet(int newValue)

延时设置变量值,这个区别于set()方法是,由于AtomicInteger中实际的值是volatile类型的,值是放在主内存栈中的,而不是线程栈,因此set()方法修改值时会比非volatile的值有稍微的性能延时,尽管可以忽略。所以如果不需要立即读取设置的新值,那么就可以用这个方法,延迟在主内存中设值,而且可以立即返回而不需要等待。
boolean weakCompareAndSet(int expect, int update)
按照JSR规范中来说说,调用这个方法时可能存在操作失败。但是从Java源码来看,其实这个方法并没有实现JSR规范的要求,最后效果和compareAndSet是等效的,都调用了unsafe.compareAndSwapInt()完成操作。
还是来看一个简单的例子
info.halo9pan.samples.java.thread.atom.FieldIncrement

    private AtomicInteger atom = new AtomicInteger();

    @Override
    public void doSomething(int invoker) {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.5));
            this.atom.getAndIncrement();
            Thread.sleep((long) (time * 0.5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.identifier = this.atom.get();
    }
AtomicInteger++
[2] finished, spent time: 1000
[0] finished, spent time: 1000
.
.
.
.
[630] finished, spent time: 1021
[649] finished, spent time: 1004
Max spent time: 1040
Runner Identifier: 1000

可以看到,最终结果和我们预期的一样,而且比较运行时间,可以发现,耗时和不用AtomicInteger几乎是很有差别的,性能上的损耗微乎其微。

2) AtomicReference

Java对原子操作的支持当然不会局限于boolean,int和long这些原生类型,java.util.concurrent.atomic包也提供了对对象的支持, AtomicReference类就封装了对象的原子操作,实现了在多线程的环境下对象引用的原子操作。
info.halo9pan.samples.java.thread.atom.ReferenceSet

    private AtomicReference atom = new AtomicReference();

    @Override
    public void doSomething(int invoker) {
        ReferenceSetTarget target = new ReferenceSetTarget();
        target.number = invoker;
        this.atom.set(target);
    }

3) AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray

大部分的场景中,数据操作除了对原始对象赋值,对对象传递引用,还有就是对数组的操作了。java.util.concurrent.atomic包也提供了对数组做原子操作的对象封装:AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray。需要注意的是,这里做原子操作的是数组里面的值或者对象,而不是数组本身。如果需要对数组本身进行原子操作,用AtomicReference就可以了。
很强大了,对吧。基本上所有的原子操作都有了封装对象,AtomicBoolean, AtomicInteger, AtomicLong和AtomicReference,也有了对数组原子操作的封装,AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray。但仅仅有这些还是不够的。考虑到具体的业务场景,这些封装的原子操作类对值或者引用的修改都封装到了Atomic对象内部,在小规模的系统中不会有什么问题,因为简单,易用性反而更好。但是到了大规模的系统中,这样的结构,数据和操作就耦合得太紧了。如果需要对Atomic对象操作,要么需要把Atomic对象再封装,暴露出它的一些基本方法,要么就需要把对Atomic对象相关操作的所有方法都封装到一个类中。无论是哪一种,都不是良好的设计。因此Java中还提供了其他的原子操作的类:AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater
还是先拿举AtomicIntegerFieldUpdater例子,假设有一个业务对象:

class IntegerTarget {
    volatile int number;

    protected int getNumber() {
        return number;
    }
}

然后还有一个控制器,这里IntegerFieldUpdaterRunner就是一个控制器的角色,由它来改变业务对象的属性,很典型的MVC的结构:

class IntegerFieldUpdaterRunner extends Runner {
    private AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater(IntegerTarget.class, \"number\");
    private IntegerTarget integerTarget = new IntegerTarget();

    @Override
    public void doSomething(int invoker) {
        long time = getRunTime();
        this.updater.getAndIncrement(this.integerTarget);
    }
}

主要的方法是AtomicIntegerFieldUpdater.newUpdater(IntegerTarget.class, \”number\”);通过AtomicIntegerFieldUpdater.newUpdater创建了一个对IntegerTarget的number的成员变量做原子操作的Updater,然后所有的原子操作都是通过Updater来做的。
AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater是通过反射来原子更新成员变量的值或者引用的,相应的API也是非常简单的,但是也是有一些约束的:
1. 成员变量必须是volatile类型的,保证了多线程条件下值的唯一性。
2. 成员变量的类型(public/protected/default/private)需要能被调用者访问到。也就是说调用者能够直接操作成员变量,那么就可以反射进行原子操作。但是对于父类的成员变量,子类是不能直接操作的,尽管子类可以访问父类的成员变量。
3. 只能是实例变量,不能是类变量,也就是说不能加static关键字。
4. 只能是可修改变量,不能使final变量,因为final的语义就是不可修改。
理解了AtomicIntegerFieldUpdater,再来看AtomicIntegerFieldUpdater就很简单了
info.halo9pan.samples.java.thread.atom.ReferenceFieldUpdater

class ReferenceTarget {
    volatile String updateTime;

    protected String getUpdateTime() {
        return updateTime;
    }
}

class ReferenceFieldUpdaterRunner extends Runner {

    private AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater
            .newUpdater(ReferenceTarget.class, String.class, \"updateTime\");
    private ReferenceTarget referenceTarget = new ReferenceTarget();

    @Override
    public void doSomething(int invoker) {
        String now = Calendar.getInstance().getTime().toString();
        this.updater.set(this.referenceTarget, now);
    }

    @Override
    public Object getIdentifier() {
        return this.referenceTarget.getUpdateTime();
    }
}

4) AtomicMarkableReference, AtomicStampedReference

现在已经差不多介绍完java.util.concurrent.atomic包里面所有的类了,最后再介绍两个比较实用的类AtomicMarkableReference, AtomicStampedReference。在某些编程场景中,我们往往会用到一些标志位,用来判断状态,然后根据状态选择不同的操作。单线程时,简单的boolean值就行了。在多线程的环境下,就要用到AtomicMarkableReference或者AtomicStampedReference了。
来看看AtomicMarkableReference的例子
info.halo9pan.samples.java.thread.atom.MarkableReference

class MarkableReferenceKey {
    volatile String key;
    public MarkableReferenceKey(String key) {
        this.key = key;
    }
    @Override
    public String toString() {
        return \"[\" + key + \"]\";
    }
}

class MarkableReferenceRunner extends Runner {
    private MarkableReferenceKey odd = new MarkableReferenceKey(\"odd\");
    private MarkableReferenceKey even = new MarkableReferenceKey(\"even\");
    private AtomicMarkableReference reference = new AtomicMarkableReference(even, false);

    @Override
    public void doSomething(int invoker) {
        boolean mark = (invoker % 2 == 0) ? true : false;
        MarkableReferenceKey key = mark ? this.even : this.odd;
        this.reference.set(key, true);
        int tryTime = invoker;
        StringBuffer buffer = new StringBuffer();
        buffer.append(invoker).append(\":\");
        int i = 0;
        for (; (i < tryTime); i++) {
            boolean success = this.reference.attemptMark(this.even, true);
            if (success) {
                break;
            } else {
                try {
                    TimeUnit.NANOSECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                buffer.append(\'.\');
            }
        }
        System.out.println(buffer);
    }

    @Override
    public Object getIdentifier() {
        return this.reference.getReference().toString() + \":\" + this.reference.isMarked();
    }

    @Override
    public void close() {
        this.reference.compareAndSet(this.odd, this.even, true, false);
    }

}

Runner运行的时候根据Invoker的ID的奇偶性,在AtomicMarkableReference中选择用不同的MarkableReferenceKey对象作为reference,java.util.concurrent.atomic.AtomicMarkableReference.set(V, boolean)不比较旧值和新值,直接赋值,然后调用java.util.concurrent.atomic.AtomicMarkableReference.attemptMark(V, boolean)设置新的reference和mark。如果AtomicMarkableReference中的reference和新的reference相同,则设置成功,返回true,否则返回false。偶数线程会将AtomicMarkableReference的reference变成even 的MarkableReferenceKey,因此奇数线程在尝试几次之后会发现attemptMark(even, boolean)返回了true。
之前的例子都比较简单,一般都直接调用了set方法赋值,而java.util.concurrent.atomic包的真正的魅力在于compareAndSet的方法。仔细看看上面稍微复杂的例子,你会发现它实现了基于CAS的无锁自旋的同步机制,在此基础之上,可以扩展出更复杂的高性能同步机制。
AtomicStampedReference的用法和AtomicMarkableReference类似,不过它用了一个int变量来存储对应的值,int变量可以存储比boolean型更多的信息,可以安位取标志位。

5) 小结

这次介绍了CAS,volatile关键字和java.util.concurrent.atomic包,这些是Java 1.5之后Java并发框架的基础,涉及到了底层硬件的层面。对于了解Java并发框架的基础是很重要的。后面会逐渐涉及到在实际中经常用到的一些类。

Java 并发编程:synchronized关键字

Java 并发编程:synchronized关键字

1. synchronized的用法

synchronized关键字在Java中就代表了锁机制,在Java的多线程并发操作时,需要随时考虑到资源的并发访问,在多个线程都能访问到的资源上必须要加上锁。
synchronized的用法有两种:
一种是加在方法之前,synchronized method(){}; 当这个类被实例化之后,可以防止多个线程同时访问这个对象的synchronized方法。而且需要注意的是,如果一个对象有多个synchronized方法,只要一个线程访问了其中的一个synchronized方法,其它线程不能同时访问这个对象中任何一个synchronized方法。可以认为在方法前添加synchronized关键字,是锁住了整个对象。当然,不同的对象实例的synchronized方法是不相干扰的。也就是说,其它线程照样可以同时访问相同类的另一个对象实例中的synchronized方法。
也可以在静态方法前加synchronized关键字,synchronized static staticMethod{};
另外,synchronized关键字还可以用于方法中的某个区块中,表示只对这个区块的资源加锁。用法是: synchronized(lock){/区块/},它的作用域是当前对象。其中,lock的选择是很有重要的。对于不同业务流程的不同方法,一定要选择不同的对象做锁,避免没有必要的线程等待。
synchronized关键字是不能继承的,也就是说,基类的方法synchronized f(){} 在继承类中并不自动是synchronized f(){},而是变成了f(){}。继承类需要显式的指定它的某个方法为synchronized方法。

2. synchronized示例

下面是一段基本代码,模拟某个操作,运行一段时间之后,将自己的一个成员变量+1。

    public void doSomething() {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.4));
            int number = this.number;
            Thread.sleep((long) (time * 0.2));
            this.number = number + 1;
            Thread.sleep((long) (time * 0.4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

定义了这个方法之后,会用一个线程池来创建多个线程并启动执行

    protected void show() {
        this.threadPool = Executors.newFixedThreadPool(THREAD_NUMBER);
        this.runner = createRunner();
        for (int i = 0; i < THREAD_NUMBER; i++) {
            Invoker invoker = createInvoker(i);
            list.add(invoker);
            this.threadPool.execute(invoker);
        }
        this.threadPool.shutdown();
        this.printNumber();
    }

下面来看看各种不同的情况。

1) 不添加synchronized关键字

就是直接运行上面的基本代码,假设每个操作执行1000毫秒。
info.halo9pan.samples.java.thread.sync.demo.NoneLock

    public void doSomething() {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.4));
            int number = this.number;
            Thread.sleep((long) (time * 0.2));
            this.number = number + 1;
            Thread.sleep((long) (time * 0.4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
没有synchronized关键字
[1]结束运行,耗时1000
[0]结束运行,耗时1000
[2]结束运行,耗时1001
[3]结束运行,耗时1001
最大耗时:1001
运行计数:1

可以看到,在输出中,线程的顺序并不是一致的,因为没有synchronized关键字加锁,速度很快,没有堵塞,所以每个线程执行1000毫秒就结束了。但是我们可以发现运行计数只有1,我们初始化了4个线程,每个线程将成员变量+1,最后结果期待的是4。所以我们可以看到,在多线程的情况下,如果在操作成员变量这些资源的时候,如果不加锁,会产生一些非预期的错误。而且这些错误还不是每次都会发生,而是随机发生的。所以我们必须养成一个良好的习惯,在多线程编程的时候,对资源的操作,首先就要考虑到锁机制。

2) 在方法前添加synchronized关键字

info.halo9pan.samples.java.thread.sync.demo.MethodLock

    public synchronized void doSomething() {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.4));
            int number = this.number;
            Thread.sleep((long) (time * 0.2));
            this.number = number + 1;
            Thread.sleep((long) (time * 0.4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
synchronized关键字加在方法前
[0]结束运行,耗时1000
[3]结束运行,耗时2000
[2]结束运行,耗时3000
[1]结束运行,耗时4000
最大耗时:4000
运行计数:4

这次我们可以看到,在输出中,线程的顺序也不是一致的,但是因为在方法前添加了synchronized关键字,每个线程访问这个方法的时候,如果之前有其他线程访问了,会把这个方法锁住,使其他的线程不能访问这个方法,只能等到线程访问结束,才会释放掉锁,允许其他线程访问。所以这次速度很慢。但是方法加锁,保证了成员变量操作的原子性,最后的结果是4。
之前提到过,如果一个对象有多个synchronized方法,只要一个线程访问了其中的一个synchronized方法,其它线程不能同时访问这个对象中任何一个synchronized方法。
info.halo9pan.samples.java.thread.sync.demo.DoubleMethodLock

    public synchronized void doSomething() {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.4));
            int number = this.number;
            Thread.sleep((long) (time * 0.2));
            this.number = number + 1;
            Thread.sleep((long) (time * 0.4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public synchronized void doAnother() {
        long time = getRunTime() * 2;
        try {
            Thread.sleep((long) (time * 0.8));
            int number = this.number;
            Thread.sleep((long) (time * 0.1));
            this.number = number + 1;
            Thread.sleep((long) (time * 0.1));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
synchronized关键字加在方法前,多个方法前添加synchronized关键字
[0]结束运行,耗时1000
[3]结束运行,耗时3000
[2]结束运行,耗时4000
[1]结束运行,耗时6000
最大耗时:6000
运行计数:4运行计数:4

这次4个线程,两个线程会调用doSomething,两个线程会调用doAnother。我们可以看到,虽然是两个方法,但是在同一个对象中,线程访问其中的一个方法,会把其他的添加synchronized关键字的方法也锁住。

3) 在静态方法前添加synchronized关键字

info.halo9pan.samples.java.thread.sync.demo.DoubleStaticLock

    private static int number;

    public static synchronized void doSomethingStatic() {
        long time = 1000L;
        try {
            Thread.sleep((long) (time * 0.4));
            int n = number;
            Thread.sleep((long) (time * 0.2));
            number = n + 1;
            Thread.sleep((long) (time * 0.4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static synchronized void doAnotherStatic() {
        long time = 2000L;
        try {
            Thread.sleep((long) (time * 0.8));
            int n = number;
            Thread.sleep((long) (time * 0.1));
            number = n + 1;
            Thread.sleep((long) (time * 0.1));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
synchronized关键字加在静态方法前,多个静态方法前添加synchronized关键字
[0]结束运行,耗时1000
[3]结束运行,耗时3000
[2]结束运行,耗时4000
[1]结束运行,耗时6000
最大耗时:6000
运行计数:4

synchronized加在静态方法前,虽然是两个方法,但是属于同一个类的静态方法,线程访问其中的一个方法,会把其他的添加synchronized关键字的静态方法也锁住。

4) 方法前添加synchronized关键字小结

从前面的例子可以看出来,方法前添加synchronized关键字可以保证多线程操作的原子性,使每个方法都是线程安全的,可以得到预期正确的结果。但是代价是巨大的,整个方法的加锁带来了长时间的等待。后面可以看到,线程安全有很多种方法,方法前添加synchronized关键字是最简单也是最低效的方法。新手可以用一用,但是如果你想写出更高效的多线程程序,就放弃这种方法吧。

5) 代码块添加synchronized关键字

info.halo9pan.samples.java.thread.sync.demo.SelfLock

    public void doSomething() {
        long time = getRunTime();
        try {
            synchronized (this) {
                Thread.sleep((long) (time * 0.4));
                int number = this.number;
                Thread.sleep((long) (time * 0.2));
                this.number = number + 1;
                Thread.sleep((long) (time * 0.4));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
synchronized语句块,this作为锁
[0]结束运行,耗时1000
[3]结束运行,耗时1999
[2]结束运行,耗时2999
[1]结束运行,耗时4001
最大耗时:4001
运行计数:4

info.halo9pan.samples.java.thread.sync.demo.ObjectLock

    public void doSomething() {
        long time = getRunTime();
        try {
            synchronized (this.lock) {
                Thread.sleep((long) (time * 0.4));
                int number = this.number;
                Thread.sleep((long) (time * 0.2));
                this.number = number + 1;
                Thread.sleep((long) (time * 0.4));
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
synchronized语句块锁,this.lock加锁
[0]结束运行,耗时1000
[1]结束运行,耗时2000
[2]结束运行,耗时3001
[3]结束运行,耗时4001
最大耗时:4001
运行计数:4

代码块加锁的时候,锁的选择有两种,一种是简单的用this做锁,另一种是单独创建一个对象new Object()或者其他对象做锁。性能上并没有什么区别,但是如果在一个对象中存在多个代码块加锁的情况时,最好不要用this做锁。前面已经提到过,this做锁,当有线程访问其他的代码块时,也会被堵塞住。所以在选择锁的时候,基本原则是一个资源对应一个锁,对相应的资源访问的时候,就使用相应的锁。

6) 部分代码块添加synchronized关键字

info.halo9pan.samples.java.thread.sync.demo.LessBlockLock

    public void doSomething() {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.4));
            synchronized (this) {
                int number = this.number;
                Thread.sleep((long) (time * 0.2));
                this.number = number + 1;
            }
            Thread.sleep((long) (time * 0.4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
synchronized语句块,this作为锁,局部加锁
[0]结束运行,耗时1000
[2]结束运行,耗时1200
[1]结束运行,耗时1400
[3]结束运行,耗时1600
最大耗时:1600
运行计数:4

前面的一些例子都是整块的代码块加锁,带来的阻塞时间是非常长的。但是实际中是绝对不会这么做的。我们只需要在对资源(大部分情况就是成员变量)操作的时候才需要加锁,因此我们可以只在这部分地方加上synchronized关键字。我们可以看到,这个过程耗时少了很多,而且返回的运行计数也是预期的。被锁住的代码块应该尽可能的少。

7) 成员变量自增

之前的例子都是模拟首先获得成员变量的值,然后用这个值运算一段时间,获得新值,再把这个值赋给原来的成员变量。但是如果我们只是对成员变量自操作呢?比如自增,number++?这时还需要加锁吗?
info.halo9pan.samples.java.thread.sync.demo.FieldPlus

    public void doSomething() {
        long time = getRunTime();
        try {
            Thread.sleep((long) (time * 0.5));
            this.number++;
            Thread.sleep((long) (time * 0.5));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
没有synchronized关键字,number++
[3]结束运行,耗时1000
[2]结束运行,耗时1000
.
.
.
.
[71]结束运行,耗时1013
[692]结束运行,耗时1002
最大耗时:1034
运行计数:996

这个时候4个线程一般已经发现不了问题了,这次创建了1000个线程,运行一下,结果的运行计数并不是预期的1000,而是一个小于或者等于1000的随机值。说明在整个过程中,还是发生了不同现场重复写的问题。这也可以看出,Java里面的number++是一个两步的过程,首先是n=number+1;然后number=n; 因此成员变量的自增操作在多线程的场景下一样是需要加锁的。

8) 小结

自增,自减的操作是很普遍的场景,在多线程编程的时候,每次都加锁是一件很麻烦的事情。API的重要作用就是封装复杂的操作,只留给我们简单的接口。因此JDK已经考虑到了这点,从1.5开始,JDK提供了一个java.util.concurrent.atomic包,封装了很多简单的原子高性能操作。