之前一篇博客介绍了使用redis过期键监听事件处理过期未支付订单,下面介绍另一种实现方式。
阻塞队列
阻塞队列是一个在队列基础上又支持了两个附加操作的队列。
2个附加操作:
支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。
阻塞队列的应用场景
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。
java中的阻塞队列
①:ArrayBlockingQueue 数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。
②:LinkedBlockingQueue一个由链表结构组成的有界阻塞队列
此队列按照先出先进的原则对元素进行排序
③:PriorityBlockingQueue支持优先级的无界阻塞队列
④:DelayQueue支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素
⑤:SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。
⑥:LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法
transfer方法,如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法,用来试探生产者传入的元素能否直接传给消费者。如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
⑦:LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。
DelayQueue
一个使用优先级队列实现的无界阻塞队列。支持延时获取元素的阻塞队列,元素必须实现Delay接口,可以实现自己的缓存系统,订单到期处理,提供限时支持。
###延时处理订单场景代码实现
ItemVo实现Delayed接口,存放订单对象
package com.enjoy.demo.p1.ch5.bq;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Author: BillYu
* @Description:
* @Date: Created in 09:13 2019-03-12.
*/
public class ItemVo<T> implements Delayed {
/**
* 到期时间 单位毫秒
*/
private long activeTime;
/**
* 存储对象
*/
private T obj;
public ItemVo(long activeTime, T obj) {
super();
//将传入的时间转换为超时的时刻
this.activeTime = TimeUnit.NANOSECONDS.convert(activeTime,TimeUnit.MILLISECONDS)+System.nanoTime();
this.obj = obj;
}
public long getActiveTime() {
return activeTime;
}
public T getObj() {
return obj;
}
/**
* 返回元素的剩余时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.activeTime-System.nanoTime(),TimeUnit.NANOSECONDS);
}
/**
* 按照剩余时间排序
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
long d = getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS);
return (d==0)?0:(d>0?1:-1);
}
}
Order订单实体类
/**
* @Author: BillYu
* @Description:订单实体类
* @Date: Created in 09:24 2019-03-12.
*/
public class Order {
/**
* 订单编号
*/
private final String orderNo;
/**
* 订单金额
*/
private final double orderMoney;
public Order(String orderNo, double orderMoney) {
this.orderNo = orderNo;
this.orderMoney = orderMoney;
}
public String getOrderNo() {
return orderNo;
}
public double getOrderMoney() {
return orderMoney;
}
}
PutOrder向队列放入Order对象
import java.util.concurrent.DelayQueue;
/**
* @Author: BillYu
* @Description:将订单放入队列的线程
* @Date: Created in 09:26 2019-03-12.
*/
public class PutOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public PutOrder(DelayQueue<ItemVo<Order>> queue) {
this.queue = queue;
}
@Override
public void run() {
//5秒到期
Order orderTb = new Order("Tb12345",3666);
ItemVo itemTb = new ItemVo(5000,orderTb);
queue.offer(itemTb);
System.out.println("订单5秒后到期:"+orderTb.getOrderNo());
//8秒到期
Order orderJd = new Order("Jd98765",3666);
ItemVo itemJd = new ItemVo(8000,orderJd);
queue.offer(itemJd);
System.out.println("订单8秒后到期:"+orderJd.getOrderNo());
}
}
FetchOrder:从队列中取出对象
import java.util.concurrent.DelayQueue;
/**
* @Author: BillYu
* @Description:
* @Date: Created in 09:31 2019-03-12.
*/
public class FetchOrder implements Runnable {
private DelayQueue<ItemVo<Order>> queue;
public FetchOrder(DelayQueue<ItemVo<Order>> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true){
try {
ItemVo<Order> item = queue.take();
Order order = (Order) item.getObj();
System.out.println("get from queue:"+order.getOrderNo());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Test测试类
import java.util.concurrent.DelayQueue;
/**
* @Author: BillYu
* @Description:测试延时订单
* @Date: Created in 09:35 2019-03-12.
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
DelayQueue<ItemVo<Order>> queue = new DelayQueue<>();
new Thread(new PutOrder(queue)).start();
new Thread(new FetchOrder(queue)).start();
for (int i = 0;i<15;i++){
Thread.sleep(500);
System.out.println(i*500);
}
}
}