RabbitMQ客户端源码阅读(03)_Utility中的copy方法有什么用途?

/ 默认分类 / 0 条评论 / 989浏览

rabbitmq java客户端源码阅读(01)

com.rabbitmq.utility.Utility中的copy方法

在阅读rabbitmq java客户端源码的时候,在开始的建立连接的过过程中会为当前连接添加很多的监听器

public class AutorecoveringConnection implements RecoverableConnection, NetworkConnection {

    public static final Predicate<ShutdownSignalException> DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION =
        cause -> !cause.isInitiatedByApplication() || (cause.getCause() instanceof MissedHeartbeatException);

    private static final Logger LOGGER = LoggerFactory.getLogger(AutorecoveringConnection.class);

    private final RecoveryAwareAMQConnectionFactory cf;
    private final Map<Integer, AutorecoveringChannel> channels;
    private final ConnectionParams params;
    private volatile RecoveryAwareAMQConnection delegate;

    private final List<ShutdownListener> shutdownHooks  = Collections.synchronizedList(new ArrayList<>());
    private final List<RecoveryListener> recoveryListeners = Collections.synchronizedList(new ArrayList<>());
    private final List<BlockedListener> blockedListeners = Collections.synchronizedList(new ArrayList<>());

比如上面的建立可自动恢复的连接

这里想记录的是在遍历监听器的过程中用到的一个简单的工具类

    private void notifyRecoveryListenersStarted() {
        for (RecoveryListener f : Utility.copy(this.recoveryListeners)) {
            f.handleRecoveryStarted(this);
        }
    }

重点看Utility.copy(this.recoveryListeners)

方法的实现是这样的:

 /**
     * Synchronizes on the list and then returns a copy of the list that is safe to iterate over. Useful when wanting to do thread-safe iteration over
     * a List wrapped in {@link Collections#synchronizedList(List)}.
     *
     * @param list
     *            The list, which may not be {@code null}
     * @return ArrayList copy of the list
     */
    public static <E> List<E> copy(final List<E> list) {
        // No Sonar: this very list instance can be synchronized in other places of its owning class
        synchronized (list) { //NOSONAR
            return new ArrayList<E>(list);
        }
    }
    
    /**
     * Synchronizes on the map and then returns a copy of the map that is safe to iterate over. Useful when wanting to do thread-safe iteration over a
     * Map wrapped in {@link Collections#synchronizedMap(Map)}
     *
     * @param map
     *            The map, which may not be {@code null}
     * @return LinkedHashMap copy of the map
     */
    public static <K, V> Map<K, V> copy(final Map<K, V> map) {
        // No Sonar: this very map instance can be synchronized in other places of its owning class
        synchronized (map) { //NOSONAR
            return new LinkedHashMap<K, V>(map);
        }
    }

从注释中可以看出,这个方法可以在迭代list集合的时保证线程安全

好的,首先要解释这个方法你至少需要看过java中的ArrayList的源码,不过没有看过或者不记得了也没有关系,这里推荐大家看一下我之前写的一篇java中集合 框架的源码阅读笔记点我跳转

因为ArrayList是中的添加修改等操作都是线程不安全的,所以如果需要保证线程安全需要为这样的操作加锁,这个在java中也提供了支持,可以使用Collections的:

public static <T> List<T> synchronizedList(List<T> list) {
        return (list instanceof RandomAccess ?
                new SynchronizedRandomAccessList<>(list) :
                new SynchronizedList<>(list));
    }

这样集合就被封装在了同步集合中,同步集合中的操作方法都被加锁了,这样保证了线程安全,就像下面一样:

   public E get(int index) {
            synchronized (mutex) {return list.get(index);}
        }
        public E set(int index, E element) {
            synchronized (mutex) {return list.set(index, element);}
        }
        public void add(int index, E element) {
            synchronized (mutex) {list.add(index, element);}
        }
        public E remove(int index) {
            synchronized (mutex) {return list.remove(index);}
        }

        public int indexOf(Object o) {
            synchronized (mutex) {return list.indexOf(o);}
        }
        public int lastIndexOf(Object o) {
            synchronized (mutex) {return list.lastIndexOf(o);}
        }

        public boolean addAll(int index, Collection<? extends E> c) {
            synchronized (mutex) {return list.addAll(index, c);}
        }

        public ListIterator<E> listIterator() {
            return list.listIterator(); // Must be manually synched by user
        }

        public ListIterator<E> listIterator(int index) {
            return list.listIterator(index); // Must be manually synched by user
        }

但是这里有一个明显的注释,在迭代器操作中,说明了同步处理需要用户自己处理,如果你看过ArrayList的源码你就会知道,ArrayList中使用了fail-fast机制,在迭代器中,如果出现多线程 修改数据的情况,那么会立即抛出异常,这里可以参考我的笔记文章

所以如果我们使用下面的代码,在遍历的时候,开启另一个线程来修改数据,那么这样就会触发fail-fast机制抛出异常:

public static void main(String[] args) throws InterruptedException {
        Student student1 = new Student("1","11","111");
        Student student2 = new Student("2","22","222");
        List<Student> list = new ArrayList<>();
        list.add(student1);
        list.add(student2);

        new Thread(() -> {
            int i = 0;
            for (; ; ) {
                i++;
                System.out.println("模拟多线程条件下的数据修改"+i);
                list.add(new Student("new student"+i));
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("开始迭代");
        for (Student student : list) {
            System.out.println(student);
            TimeUnit.SECONDS.sleep(1);
        }
    }

运行结果:

开始迭代
Student{name='1', id='11', sex='111'}
模拟多线程条件下的数据修改1
模拟多线程条件下的数据修改2
Exception in thread "main" java.util.ConcurrentModificationException
	at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
	at java.util.ArrayList$Itr.next(ArrayList.java:859)
	at com.wondershare.test.CommonTest.main(CommonTest.java:1957)
模拟多线程条件下的数据修改3
模拟多线程条件下的数据修改4

可以看到,由于迭代器中的expectModCoun和modCount值不一致,所以抛出ConcurrentModificationException,证明当前出现线程不安全的情况,所以直接执行fail-fast机制

那我们使用线程安全的List是不是就可以了呢?答案肯定是不行,因为,这里的fore属于语法糖,实际上还是使用的迭代器,从上面的介绍可以知道,即使是包装过的同步的synchronizedList,它的 迭代器的线程安全的控制也需要自己实现,所以也就出现了rabbitmq团队写的这个方法,其实这个方法原理很简单,就是让迭代器迭代的集合是我们当前集合的副本即可,这样父集合修改了modCount 不会影响副本的modCount,所以其实我们只要再次new一个副本集合出来就可以了:

public static void main(String[] args) throws InterruptedException {
        Student student1 = new Student("1","11","111");
        Student student2 = new Student("2","22","222");
        List<Student> list = new ArrayList<>();
        list.add(student1);
        list.add(student2);
        List<Student> copy = new ArrayList<>(list);

        new Thread(() -> {
            int i = 0;
            for (; ; ) {
                i++;
                System.out.println("模拟多线程条件下的数据修改"+i);
                list.add(new Student("new student"+i));
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("开始迭代");
        for (Student student : copy) {
            System.out.println(student);
            TimeUnit.SECONDS.sleep(1);
        }
    }

运行结果:

开始迭代
Student{name='1', id='11', sex='111'}
模拟多线程条件下的数据修改1
模拟多线程条件下的数据修改2
Student{name='2', id='22', sex='222'}
模拟多线程条件下的数据修改3
模拟多线程条件下的数据修改4

只不过,Utility中的copy方法对此进行了封装,使在rabbitmq客户端代码中可以很方便的使用这样的功能,并且做得更严谨,保证线程安全:

    public static <E> List<E> copy(final List<E> list) {
        synchronized (list) { 
            return new ArrayList<E>(list);
        }
    }

所以如果我们这样:

public static void main(String[] args) throws InterruptedException {
        Student student1 = new Student("1","11","111");
        Student student2 = new Student("2","22","222");
        List<Student> list = Collections.synchronizedList(new ArrayList<>());
        list.add(student1);
        list.add(student2);
        //或者List<Student> copy = new ArrayList(list)
        List<Student> copy = Utility.copy(list);

        new Thread(() -> {
            int i = 0;
            for (; ; ) {
                i++;
                System.out.println("模拟多线程条件下的数据修改"+i);
//                list.get(1).setName("ahdjkashdka");
                list.add(new Student("new student"+i));
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("开始迭代");
        for (Student student : copy) {
            System.out.println(student);
            TimeUnit.SECONDS.sleep(1);
        }
    }

那么这就是一个完全的线程安全操作的集合了.

另外,这里贴一下ArrayList这个构造器的源码:

    public ArrayList(Collection<? extends E> c) {
        elementData = c.toArray();
        if ((size = elementData.length) != 0) {
            // c.toArray might (incorrectly) not return Object[] (see 6260652)
            if (elementData.getClass() != Object[].class)
                elementData = Arrays.copyOf(elementData, size, Object[].class);
        } else {
            // replace with empty array.
            this.elementData = EMPTY_ELEMENTDATA;
        }
    }

可以看出,如果集合中存储的是引用数据类型(对象),那么在并发情况下,遍历的过程中数据是可能被修改的,因为这只是浅拷贝,关于深浅拷贝可以查看我的这篇文章笔记

public static void main(String[] args) throws InterruptedException {
        Student student1 = new Student("1","11","111");
        Student student2 = new Student("2","22","222");
        List<Student> list = Collections.synchronizedList(new ArrayList<>());
        list.add(student1);
        list.add(student2);
        List<Student> copy = new ArrayList<>(list);

        new Thread(() -> {
            int i = 0;
            for (; ; ) {
                i++;
                System.out.println("模拟多线程条件下的数据修改"+i);
                list.get(1).setName("54zh.cn");
//                list.add(new Student("new student"+i));
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        System.out.println("开始迭代");
        for (Student student : copy) {
            System.out.println(student);
            TimeUnit.SECONDS.sleep(1);
        }
    }

运行结果:

开始迭代
Student{name='1', id='11', sex='111'}
模拟多线程条件下的数据修改1
模拟多线程条件下的数据修改2
Student{name='54zh.cn', id='22', sex='222'}
模拟多线程条件下的数据修改3
模拟多线程条件下的数据修改4