之前一篇博客介绍了使用redis过期键监听事件处理过期未支付订单,下面介绍另一种实现方式。

阻塞队列

阻塞队列是一个在队列基础上又支持了两个附加操作的队列。
2个附加操作:
支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。

阻塞队列的应用场景

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。

java中的阻塞队列

①:ArrayBlockingQueue 数组结构组成的有界阻塞队列。

此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。

②:LinkedBlockingQueue一个由链表结构组成的有界阻塞队列

此队列按照先出先进的原则对元素进行排序

③:PriorityBlockingQueue支持优先级的无界阻塞队列

④:DelayQueue支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素

⑤:SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。

⑥:LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法

transfer方法,如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法,用来试探生产者传入的元素能否直接传给消费者。如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
⑦:LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。

DelayQueue

一个使用优先级队列实现的无界阻塞队列。支持延时获取元素的阻塞队列,元素必须实现Delay接口,可以实现自己的缓存系统,订单到期处理,提供限时支持。

###延时处理订单场景代码实现
ItemVo实现Delayed接口,存放订单对象

package com.enjoy.demo.p1.ch5.bq;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Author: BillYu
 * @Description:
 * @Date: Created in 09:13 2019-03-12.
 */
public class ItemVo<T> implements Delayed {
    /**
     * 到期时间 单位毫秒
     */
    private long activeTime;

    /**
     * 存储对象
     */
    private T obj;

    public ItemVo(long activeTime, T obj) {
        super();
        //将传入的时间转换为超时的时刻
        this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,TimeUnit.MILLISECONDS)+System.nanoTime();
        this.obj = obj;
    }

    public long getActiveTime() {
        return activeTime;
    }

    public T getObj() {
        return obj;
    }

    /**
     * 返回元素的剩余时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.activeTime-System.nanoTime(),TimeUnit.NANOSECONDS);
    }

    /**
     * 按照剩余时间排序
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        long d  = getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS);
        return (d==0)?0:(d>0?1:-1);
    }
}
Order订单实体类

/**
 * @Author: BillYu
 * @Description:订单实体类
 * @Date: Created in 09:24 2019-03-12.
 */
public class Order {
    /**
     * 订单编号
     */
    private final String orderNo;

    /**
     * 订单金额
     */
    private final double orderMoney;

    public Order(String orderNo, double orderMoney) {
        this.orderNo = orderNo;
        this.orderMoney = orderMoney;
    }

    public String getOrderNo() {
        return orderNo;
    }

    public double getOrderMoney() {
        return orderMoney;
    }
}

PutOrder向队列放入Order对象

import java.util.concurrent.DelayQueue;

/**
 * @Author: BillYu
 * @Description:将订单放入队列的线程
 * @Date: Created in 09:26 2019-03-12.
 */
public class PutOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;


    public PutOrder(DelayQueue<ItemVo<Order>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        //5秒到期
        Order orderTb = new Order("Tb12345",3666);
        ItemVo itemTb = new ItemVo(5000,orderTb);
        queue.offer(itemTb);
        System.out.println("订单5秒后到期:"+orderTb.getOrderNo());
        //8秒到期
        Order orderJd = new Order("Jd98765",3666);
        ItemVo itemJd = new ItemVo(8000,orderJd);
        queue.offer(itemJd);
        System.out.println("订单8秒后到期:"+orderJd.getOrderNo());
        
    }
}

FetchOrder:从队列中取出对象

import java.util.concurrent.DelayQueue;

/**
 * @Author: BillYu
 * @Description:
 * @Date: Created in 09:31 2019-03-12.
 */
public class FetchOrder implements Runnable {
    private DelayQueue<ItemVo<Order>> queue;

    public FetchOrder(DelayQueue<ItemVo<Order>> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true){
            try {
                ItemVo<Order> item = queue.take();
                Order order = (Order) item.getObj();
                System.out.println("get from queue:"+order.getOrderNo());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Test测试类

import java.util.concurrent.DelayQueue;

/**
 * @Author: BillYu
 * @Description:测试延时订单
 * @Date: Created in 09:35 2019-03-12.
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<ItemVo<Order>> queue = new DelayQueue<>();
        new Thread(new PutOrder(queue)).start();
        new Thread(new FetchOrder(queue)).start();

        for (int i = 0;i<15;i++){
            Thread.sleep(500);
            System.out.println(i*500);
        }
    }
}