Skip to content

Instantly share code, notes, and snippets.

@jinhuicheng
Last active July 5, 2020 16:03
Show Gist options
  • Save jinhuicheng/25a7cad4edee992e7f8a2165155e5f53 to your computer and use it in GitHub Desktop.
Save jinhuicheng/25a7cad4edee992e7f8a2165155e5f53 to your computer and use it in GitHub Desktop.
weibo_like
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cjh.log</groupId>
<artifactId>log</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.3.13.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>4.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.5</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>2.8</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.alibaba.toolkit.common</groupId>-->
<!-- <artifactId>toolkit-common-lang</artifactId>-->
<!-- <version>1.1.5</version>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.cjh.common.log.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/6/11 9:40 下午
* @description
*/
public class NamedThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final AtomicInteger threadNumber = new AtomicInteger(1);
final ThreadGroup group;
final String namePrefix;
final boolean isDaemon;
public NamedThreadFactory(String name) {
this(name, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = prefix + "-" + poolNumber.getAndIncrement() + "-thread-";
isDaemon = daemon;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
t.setDaemon(isDaemon);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
package com.cjh.common.log.util;
import java.util.Random;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:02 下午
* @description
*/
public class NumUtil {
private static final Random random=new Random(10);
public static int getRandomNum(){
return random.nextInt();
}
}
package com.cjh.common.log.weibo.like.common;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:44 下午
* @description
*/
public class BusinessException extends Exception {
/**
* Constructs a new exception with the specified detail message. The cause is not initialized, and may subsequently be initialized by a call to
* {@link #initCause}.
*
* @param message the detail message. The detail message is saved for later retrieval by the {@link #getMessage()} method.
*/
public BusinessException(String message) {
super(message);
}
}
package com.cjh.common.log.weibo.like.common;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 5:44 下午
* @description
*/
public class Constants {
public static final String USER_LIKE_STATUS_PREFIX="u_like_s_";
public static final String STATUS_LIKED_PREFIX="s_liked_";
public static String builderStatusLikedPrefix(int statusId){
return STATUS_LIKED_PREFIX+statusId;
}
public static String builderUserLikePrefix(int uid,int statusId){
return USER_LIKE_STATUS_PREFIX + uid+"_"+statusId;
}
}
package com.cjh.common.log.weibo.like.common;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:35 下午
* @description
*/
public class InnerResult<T> {
private T result;
private String resultCode;
private boolean isSuccess;
public InnerResult(T result) {
this.result = result;
this.resultCode="success";
this.isSuccess=true;
}
public T getResult() {
return result;
}
public void setResult(T result) {
this.result = result;
}
public String getResultCode() {
return resultCode;
}
public void setResultCode(String resultCode) {
this.resultCode = resultCode;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
}
package com.cjh.common.log.weibo.like;
import com.cjh.common.log.weibo.like.common.BusinessException;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 4:36 下午
* @description
*/
public interface LikeService {
/**
* 点赞
* @param uid 用户id
* @param statusId 微博id
*/
void like(int uid, int statusId);
/**
* 查询是否点过赞
* @param uid 用户id
* @param statusId 微博id
* @return 是否点过
*/
boolean isLiked(int uid, int statusId) throws BusinessException;
}
package com.cjh.common.log.weibo.like;
import static com.cjh.common.log.util.NumUtil.getRandomNum;
import static com.cjh.common.log.weibo.like.common.Constants.builderStatusLikedPrefix;
import static com.cjh.common.log.weibo.like.common.Constants.builderUserLikePrefix;
import com.cjh.common.log.weibo.like.common.BusinessException;
import com.cjh.common.log.weibo.like.common.InnerResult;
import com.cjh.common.log.weibo.like.msg.EventPublisher;
import com.cjh.common.log.weibo.like.msg.EventUtil;
import com.cjh.common.log.weibo.like.query.QueryContext;
import com.cjh.common.log.weibo.like.query.QueryService;
import com.cjh.common.log.weibo.like.route.DataStrategy;
import com.cjh.common.log.weibo.like.route.DataStrategyUtil;
import javax.annotation.Resource;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 4:41 下午
* @description
*/
@Service
public class LikeServiceImpl implements LikeService{
/*缓存客户端*/
@Resource
private Jedis redisClient;
/*消息发送客户端*/
@Resource
private EventPublisher eventPublisher;
@Resource
private QueryService queryService;
/**
* 点赞
*
* @param uid 用户id
* @param statusId 微博id
*/
@Override
public void like(int uid, int statusId) {
// 查询用户是否已经关注过微博,只查缓存
String redisKey = builderUserLikePrefix(uid, statusId);
// 如果已经存在,则直接返回,或者 执行删除点赞记录的逻辑
boolean ifExist = redisClient.exists(redisKey);
if (ifExist) {
return;
}
// 用户点赞微博
redisClient.set(redisKey, "1", "nx", "ex", 60 * 60 * 24 * 30 * 6 + getRandomNum());
// 当前微博被点赞数更新
redisClient.incr(builderStatusLikedPrefix(statusId));
try {
// 发送消息 执行落库操作,事务型保证数据一致性
Message msg = EventUtil.builderTransactionMsg(uid, statusId);
eventPublisher.sendMessageInTransaction(msg, null);
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 查询是否点过赞
*
* @param uid 用户id
* @param statusId 微博id
* @return 是否点过
*/
@Override
public boolean isLiked(int uid, int statusId) throws BusinessException {
//解析当前微博id中的时间戳信息,获取查询服务路由策略
DataStrategy strategyTag= DataStrategyUtil.parseDataStrategyByStatusId(statusId);
//根据路由策略获取对应的查询服务
QueryContext context= queryService.getQueryContext(strategyTag,uid,statusId);
//执行查询
InnerResult<Boolean> result=context.queryIsLiked();
if (result==null || !result.isSuccess()){
throw new BusinessException("抱歉,网络繁忙请稍后重试");
}
return result.getResult();
}
}
package com.cjh.common.log.weibo.like.msg;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 4:52 下午
* @description 接收点赞消息,操作入库
*/
public class EventMessageListener implements MessageListenerConcurrently {
private TransactionTemplate transactionTemplate;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
transactionTemplate.execute(new TransactionCallback() {
@Override
public Object doInTransaction(TransactionStatus transactionStatus) {
//step1:insert 用户点赞数据
//step2:更新 redis 布隆过滤器,防止查询缓存时效时的数据穿透
//step3: update 微博被点赞个数
// ....
return null;
}
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
package com.cjh.common.log.weibo.like.msg;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 4:49 下午
* @description 发送消息
*/
public class EventPublisher extends DefaultMQProducer {
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return super.send(msg);
}
}
package com.cjh.common.log.weibo.like.msg;
import javax.annotation.Resource;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 6:48 下午
* @description 事务消息回查接口
*/
@Service
public class EventTransactionListenerImpl implements TransactionListener {
/*缓存客户端*/
@Resource
private Jedis redisClient;
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String uid=message.getUserProperty("uid");
String statusId=message.getUserProperty("statusId");
//分支逻辑1 : 查询缓存中是否有对应记录,如果缓存中数据正常,说明可以继续投递 ,return LocalTransactionState.COMMIT_MESSAGE;
//分支逻辑2 : 否则抛弃消息 return LocalTransactionState.ROLLBACK_MESSAGE
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
}
package com.cjh.common.log.weibo.like.msg;
import org.apache.rocketmq.common.message.Message;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 6:47 下午
* @description
*/
public class EventUtil {
public static Message builderTransactionMsg(int uid, int statusId) {
return new Message("topic","{uid:123;statusId:456}".getBytes());
}
}
package com.cjh.common.log.weibo.like.query;
import com.cjh.common.log.weibo.like.common.InnerResult;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:46 下午
* @description 历史较冷数据直接查询服务
*/
public class HbaseQueryStrategyImpl implements QueryStrategy{
@Override
public InnerResult<Boolean> queryIsLiked(int uid, int statusId) {
//略。。。
return null;
}
}
package com.cjh.common.log.weibo.like.query;
import com.cjh.common.log.weibo.like.common.InnerResult;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:37 下午
* @description
*/
public class QueryContext {
private QueryStrategy queryStrategy;
private int uid;
private int statusId;
public void setQueryContext(QueryStrategy queryStrategy, int uid, int statusId) {
this.queryStrategy = queryStrategy;
this.uid = uid;
this.statusId = statusId;
}
public InnerResult<Boolean> queryIsLiked(){
return queryStrategy.queryIsLiked(uid,statusId);
}
}
package com.cjh.common.log.weibo.like.query;
import com.cjh.common.log.weibo.like.route.DataStrategy;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ApplicationObjectSupport;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:33 下午
* @description
*/
public class QueryService extends ApplicationObjectSupport {
public QueryContext getQueryContext(DataStrategy strategy,int uid ,int statusId){
ApplicationContext applicationContext=getApplicationContext();
QueryStrategy queryStrategy=applicationContext.getBean(strategy.name(),QueryStrategy.class);
QueryContext context=applicationContext.getBean("queryContext",QueryContext.class);
context.setQueryContext(queryStrategy,uid,statusId);
return context;
}
}
package com.cjh.common.log.weibo.like.query;
import com.cjh.common.log.weibo.like.common.InnerResult;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:35 下午
* @description
*/
public interface QueryStrategy {
InnerResult<Boolean> queryIsLiked(int uid ,int statusId);
}
package com.cjh.common.log.weibo.like.query;
import com.cjh.common.log.weibo.like.common.Constants;
import com.cjh.common.log.weibo.like.common.InnerResult;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:46 下午
* @description 近半年热数据查询缓存服务
*/
@Service
public class RedisQueryStrategyImpl implements QueryStrategy{
/*缓存客户端*/
@Resource
private Jedis redisClient;
@Override
public InnerResult<Boolean> queryIsLiked(int uid, int statusId) {
//redis查询 用户uid是否对微博statusId点过赞
String result=redisClient.get(Constants.builderUserLikePrefix(uid, statusId));
if (StringUtils.equals(result,"1")){
return new InnerResult<Boolean>(true);
}
//布隆过滤器查询是否存在该key , 防止缓存穿透
boolean isExist=queryBooleanFilter(uid,statusId);
if (!isExist){
return new InnerResult<>(false);
}
//查询数据库
if (1 <= queryNumFromDB()){
return new InnerResult<Boolean>(true);
}
return new InnerResult<>(false);
}
/**
* 布隆过滤器 略
* @param uid
* @param statusId
* @return
*/
private boolean queryBooleanFilter(int uid, int statusId) {
return false;
}
/**
* 查询DB中的数据 略
* @return
*/
private int queryNumFromDB() {
return 1;
}
}
package com.cjh.common.log.weibo.like.route;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 10:25 下午
* @description
*/
public enum DataStrategy {
COLD, HOT
}
package com.cjh.common.log.weibo.like.route;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 11:44 下午
* @description
*/
public class DataStrategyUtil {
/**
* 通过statusId中的时间戳解析出查询服务路由策略
* @param statusId 微博id
* @return 查询路由策略
*/
public static DataStrategy parseDataStrategyByStatusId(int statusId) {
//parse id ,解析时间戳,判断冷热服务
// 如果是半年之前的则返回 return DataStrategy.COLD ;
// 如果是半年之内的则返回 return DataStrategy.HOT ;
return DataStrategy.HOT;
}
}
package com.cjh.common.log.weibo.like.route;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 11:22 上午
* @description 用户请求路由转发client
*/
public interface RouteClient {
/**
* 路由信息初始化 同时回调业务方更新业务方本地的路由信息
* @param ruleConfig 配置
*/
void init(RuleConfig ruleConfig);
/**
* 路由计算
* @param routeRequest 路由请求
* @return 目标逻辑机房或目标机器
*/
String route(RouteRequest routeRequest);
}
package com.cjh.common.log.weibo.like.route;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 11:50 上午
* @description
*/
public class RouteInfo {
/**
* 机房名
*/
private String roomName;
/**
* 机房类型
*/
private String roomType;
/**
* uid最小范围
*/
private Integer uidMin;
/**
* uid最大范围
*/
private Integer uidMax;
/**
* 是否预发
*/
private boolean isPre;
}
package com.cjh.common.log.weibo.like.route;
import java.util.Map;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 4:32 下午
* @description
*/
public class RouteRequest {
private int uidRouteValue;
private byte[] requestBody;
private String targetAppName;
private Map<String, String> headerMap;
private boolean isPre;
}
package com.cjh.common.log.weibo.like.route;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/7/5 4:13 下午
* @description
*/
public class RuleConfig {
/**
* 各逻辑机房配置 room01A:100;room01B:0;room02A:50:room02B:50 ,用户流量切换,分流发布,容灾
*/
private String roomListConfigStr;
public String getRoomListConfigStr() {
return roomListConfigStr;
}
public void setRoomListConfigStr(String roomListConfigStr) {
this.roomListConfigStr = roomListConfigStr;
}
}
package com.cjh.test.common.log;
/**
* @author chengjinhui
* @version 1.0.0
* @date 2020/6/20 8:44 下午
* @description
*/
public class Test1 {
private static final int _1MB = 1024 * 1024;
public static void main(String[] args) {
MyObject ob1, ob2, ob3, ob4;
ob1 = new MyObject("ob1",2 * _1MB);
ob2 = new MyObject("ob2",2 * _1MB);
ob3 = new MyObject("ob3",2 * _1MB);
ob4 = new MyObject("ob4",2 * _1MB);
System.out.println();
}
private static class MyObject{
private String name;
private byte[] space;
public MyObject(String name, int size) {
this.name = name;
this.space = new byte[size];
System.out.println(name + " init ~");
}
}
}
logging.pattern.console=%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(---){faint} %clr(%-80.80logger{79}){cyan} %clr(:){faint} %m%n
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="OFF">
<appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d [%p] %c [%M] [%L] : %m%n"/>
</Console>
</appenders>
<loggers>
<root level="trace">
<appender-ref ref="Console"/>
</root>
</loggers>
</configuration>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment