Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions openjob-common/src/main/java/io/openjob/common/util/IpUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
Expand Down Expand Up @@ -90,6 +91,17 @@ public static String getLocalAddress() {
}
}

/**
* Get ip by host
*
* @param host host
* @return String
* @throws UnknownHostException UnknownHostException
*/
public static String getIpByHost(String host) throws UnknownHostException {
return InetAddress.getByName(host).getHostAddress();
}

/**
* Normalize address.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.openjob.common.util;

import org.checkerframework.checker.units.qual.A;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* @author stelin swoft@qq.com
* @since 1.0.0
Expand All @@ -16,4 +20,22 @@ public void testGetLocalIp() {
String formatAddress = IpUtil.getFormatAddress();
Assertions.assertNotNull(formatAddress);
}

@Test
public void testGetIpByHost() throws UnknownHostException {
String ip = IpUtil.getIpByHost("localhost");
Assertions.assertEquals(ip, "127.0.0.1");

String ip2 = IpUtil.getIpByHost("127.0.0.1");
Assertions.assertEquals(ip2, "127.0.0.1");

String ip3 = IpUtil.getIpByHost("github.com");
Assertions.assertNotNull(ip3);

String ip4 = IpUtil.getIpByHost("20.205.243.166");
Assertions.assertEquals(ip4, "20.205.243.166");

String ip5 = IpUtil.getIpByHost("172.20.1.166");
Assertions.assertEquals(ip5, "172.20.1.166");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,37 @@
import lombok.Getter;

/**
* user: 100+
* namespace: 200+
* application: 300+
* job: 400+
* delay: 500+
* User: 100+
* Namespace: 200+
* Application: 300+
* Job: 400+
* Delay: 500+
*
* @author stelin swoft@qq.com
* @since 1.0.0
*/
@Getter
@AllArgsConstructor
public enum CodeEnum implements CodeExceptionAssert {
// Code list
USER_EXIST(100, "User is exist!"),

/**
* App name not exist
*/
NAME_EXIST(100, "App name must be globally unique!"),
// Namespace
NAMESPACE_DELETE_INVALID(200, "Namespace can not be delete!"),

// Application
APP_NAME_EXIST(300, "App name must be globally unique!"),
APP_DELETE_INVALID(301, "Application can not be deleted!"),

// Job
TIME_EXPRESSION_INVALID(400, "Time expression is invalid"),
JOB_DELETE_INVALID(401, "Job can not be deleted!"),

// Delay
DELAY_TOPIC_EXIST(500, "Topic is exist!"),
DELAY_DELETE_INVALID(501, "Delay can not be deleted!"),
;

TIME_EXPRESSION_INVALID(400, "Time expression is invalid");

/**
* Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.repository.dao.AppDAO;
import io.openjob.server.repository.dao.DelayDAO;
import io.openjob.server.repository.dao.JobDAO;
import io.openjob.server.repository.dao.NamespaceDAO;
import io.openjob.server.repository.entity.App;
import io.openjob.server.repository.entity.Delay;
import io.openjob.server.repository.entity.Job;
import io.openjob.server.repository.entity.Namespace;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -37,17 +41,22 @@ public class AppServiceImpl implements AppService {

private final AppDAO appDAO;
private final NamespaceDAO namespaceDAO;
private final JobDAO jobDAO;
private final DelayDAO delayDAO;


@Autowired
public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO) {
public AppServiceImpl(AppDAO appDAO, NamespaceDAO namespaceDAO, JobDAO jobDAO, DelayDAO delayDAO) {
this.appDAO = appDAO;
this.namespaceDAO = namespaceDAO;
this.jobDAO = jobDAO;
this.delayDAO = delayDAO;
}

@Override
public AddAppVO add(AddAppRequest addRequest) {
App app = this.appDAO.getAppByName(addRequest.getName());
CodeEnum.NAME_EXIST.assertIsTrue(Objects.isNull(app));
CodeEnum.APP_NAME_EXIST.assertIsTrue(Objects.isNull(app));

Long id = this.appDAO.save(BeanMapperUtil.map(addRequest, App.class));

Expand All @@ -61,7 +70,7 @@ public UpdateAppVO update(UpdateAppRequest updateRequest) {
// App name is exist and not self!
App nameApp = this.appDAO.getAppByName(updateRequest.getName());
if (Objects.nonNull(nameApp) && !nameApp.getId().equals(updateRequest.getId())) {
CodeEnum.NAME_EXIST.throwException();
CodeEnum.APP_NAME_EXIST.throwException();
}

App app = BeanMapperUtil.map(BeanMapperUtil.map(updateRequest, App.class), App.class);
Expand All @@ -71,9 +80,16 @@ public UpdateAppVO update(UpdateAppRequest updateRequest) {

@Override
public DeleteAppVO delete(DeleteAppRequest deleteAppRequest) {
App app = BeanMapperUtil.map(deleteAppRequest, App.class);
app.setDeleted(CommonConstant.YES);
this.appDAO.update(app);
App byId = this.appDAO.getById(deleteAppRequest.getId());

// Job/delay/workflow
Job firstJob = this.jobDAO.getFirstByNamespaceAndAppid(byId.getNamespaceId(), byId.getId());
Delay firstDelay = this.delayDAO.getFirstByNamespaceAndAppid(byId.getNamespaceId(), byId.getId());
if (Objects.nonNull(firstJob) || Objects.nonNull(firstDelay)) {
CodeEnum.APP_DELETE_INVALID.throwException();
}

this.appDAO.deleteById(deleteAppRequest.getId());
return new DeleteAppVO();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public StopDelayInstanceVO stop(StopDelayInstanceRequest request) {
DelayInstanceStopResponseDTO stop = this.delayInstanceScheduler.stop(delayInstanceStopRequestDTO);

// Update status
this.delayInstanceDAO.updateStatus(request.getTaskId(), TaskStatusEnum.STOP.getStatus());

StopDelayInstanceVO stopDelayInstanceVO = new StopDelayInstanceVO();
stopDelayInstanceVO.setResult(stop.getResult());
return stopDelayInstanceVO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.openjob.common.constant.CommonConstant;
import io.openjob.common.constant.TaskStatusEnum;
import io.openjob.common.util.DelayUtil;
import io.openjob.server.admin.constant.CodeEnum;
import io.openjob.server.admin.request.delay.AddDelayRequest;
import io.openjob.server.admin.request.delay.DeleteDelayRequest;
import io.openjob.server.admin.request.delay.ListDelayRequest;
Expand Down Expand Up @@ -65,6 +66,9 @@ public DelayServiceImpl(DelayDAO delayDAO, AppDAO appDAO, DelayInstanceDAO delay
@Override
@Transactional(rollbackFor = Exception.class)
public AddDelayVO add(AddDelayRequest addRequest) {
Delay byTopic = this.delayDAO.findByTopic(addRequest.getTopic());
CodeEnum.DELAY_TOPIC_EXIST.assertIsTrue(Objects.isNull(byTopic));

// Delay
Delay delay = BeanMapperUtil.map(addRequest, Delay.class);
delay.setPid(0L);
Expand Down Expand Up @@ -139,8 +143,13 @@ public PageVO<ListDelayVO> list(ListDelayRequest listDelayRequest) {
@Override
@Transactional(rollbackFor = Exception.class)
public DeleteDelayVO delete(DeleteDelayRequest deleteDelayRequest) {
this.delayDAO.updateStatusOrDeleted(deleteDelayRequest.getId(), null, CommonConstant.YES);
this.delayDAO.updateStatusOrDeleted(deleteDelayRequest.getCid(), null, CommonConstant.YES);
if (Objects.nonNull(this.delayInstanceDAO.getFirstByDelayId(deleteDelayRequest.getId()))) {
CodeEnum.DELAY_DELETE_INVALID.throwException();
}

// Delete
this.delayDAO.deleteById(deleteDelayRequest.getId());
this.delayDAO.deleteById(deleteDelayRequest.getCid());

// Refresh delay version
this.delayScheduler.refreshDelayVersion();
Expand All @@ -150,6 +159,11 @@ public DeleteDelayVO delete(DeleteDelayRequest deleteDelayRequest) {
@Override
@Transactional(rollbackFor = Exception.class)
public UpdateDelayVO update(UpdateDelayRequest updateDelayRequest) {
Delay byTopic = this.delayDAO.findByTopic(updateDelayRequest.getTopic());
if (Objects.nonNull(byTopic) && !byTopic.getId().equals(updateDelayRequest.getId())) {
CodeEnum.DELAY_TOPIC_EXIST.throwException();
}

// Delay
Delay delay = BeanMapperUtil.map(updateDelayRequest, Delay.class);
this.delayDAO.update(delay);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
import io.openjob.server.repository.constant.JobStatusEnum;
import io.openjob.server.repository.dao.AppDAO;
import io.openjob.server.repository.dao.JobDAO;
import io.openjob.server.repository.dao.JobInstanceDAO;
import io.openjob.server.repository.dto.JobPageDTO;
import io.openjob.server.repository.entity.App;
import io.openjob.server.repository.entity.Job;
import io.openjob.server.scheduler.dto.JobExecuteRequestDTO;
import io.openjob.server.scheduler.service.JobSchedulingService;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.apache.bcel.classfile.Code;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
Expand All @@ -56,12 +58,14 @@ public class JobServiceImpl implements JobService {

private final JobDAO jobDAO;
private final AppDAO appDAO;
private final JobInstanceDAO jobInstanceDAO;
private final JobSchedulingService jobSchedulingService;

@Autowired
public JobServiceImpl(JobDAO jobDAO, AppDAO appDAO, JobSchedulingService jobSchedulingService) {
public JobServiceImpl(JobDAO jobDAO, AppDAO appDAO, JobInstanceDAO jobInstanceDAO, JobSchedulingService jobSchedulingService) {
this.jobDAO = jobDAO;
this.appDAO = appDAO;
this.jobInstanceDAO = jobInstanceDAO;
this.jobSchedulingService = jobSchedulingService;
}

Expand Down Expand Up @@ -107,6 +111,10 @@ public UpdateJobVO update(UpdateJobRequest updateJobRequest) {

@Override
public DeleteJobVO delete(DeleteJobRequest deleteJobRequest) {
if (Objects.nonNull(this.jobInstanceDAO.getFirstByJobId(deleteJobRequest.getId()))) {
CodeEnum.JOB_DELETE_INVALID.throwException();
}

this.jobDAO.updateByStatusOrDeleted(deleteJobRequest.getId(), null, CommonConstant.YES, null);
return new DeleteJobVO();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.openjob.server.admin.service.impl;

import io.openjob.common.constant.CommonConstant;
import io.openjob.server.admin.constant.CodeEnum;
import io.openjob.server.admin.request.namespace.AddNamespaceRequest;
import io.openjob.server.admin.request.namespace.DeleteNamespaceRequest;
import io.openjob.server.admin.request.namespace.ListNamespaceRequest;
Expand All @@ -14,11 +15,13 @@
import io.openjob.server.common.util.BeanMapperUtil;
import io.openjob.server.common.util.PageUtil;
import io.openjob.server.common.vo.PageVO;
import io.openjob.server.repository.dao.AppDAO;
import io.openjob.server.repository.dao.NamespaceDAO;
import io.openjob.server.repository.entity.Namespace;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Objects;
import java.util.UUID;

/**
Expand All @@ -28,10 +31,12 @@
@Service
public class NamespaceServiceImpl implements NamespaceService {
private final NamespaceDAO namespaceDAO;
private final AppDAO appDAO;

@Autowired
public NamespaceServiceImpl(NamespaceDAO namespaceDAO) {
public NamespaceServiceImpl(NamespaceDAO namespaceDAO, AppDAO appDAO) {
this.namespaceDAO = namespaceDAO;
this.appDAO = appDAO;
}

@Override
Expand All @@ -53,6 +58,10 @@ public UpdateNamespaceVO update(UpdateNamespaceRequest updateRequest) {

@Override
public DeleteNamespaceVO delete(DeleteNamespaceRequest deleteNamespaceRequest) {
if (Objects.nonNull(this.appDAO.getFirstByNamespaceId(deleteNamespaceRequest.getId()))) {
CodeEnum.NAMESPACE_DELETE_INVALID.throwException();
}

Namespace namespace = BeanMapperUtil.map(deleteNamespaceRequest, Namespace.class);
namespace.setDeleted(CommonConstant.YES);
this.namespaceDAO.update(namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public Boolean doJoin(String hostname, Integer port) {

// Refresh current slots.
this.refreshManager.refreshCurrentSlots();

// Refresh app workers;
this.refreshManager.refreshAppWorkers();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void workerCheck() {
workerStartRequest.setAppName(w.getAppName());
workerStartRequest.setWorkerKey(w.getWorkerKey());

log.info("Scheduling worker start begin!");
log.info("Scheduling worker start begin! address={}", w.getAddress());
this.workerStart(workerStartRequest);
}
});
Expand All @@ -193,7 +193,7 @@ public void workerCheck() {
workerStopRequest.setAddress(w.getAddress());
workerStopRequest.setAppName(w.getAppName());

log.info("Scheduling worker stop begin!");
log.info("Scheduling worker stop begin! address={}", w.getAddress());
this.workerStop(workerStopRequest);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public static void refreshAppWorkers(List<Worker> workers) {
Map<Long, List<WorkerDTO>> appWorkers = workers.stream()
.map(w -> {
WorkerDTO workerDTO = new WorkerDTO();
workerDTO.setNamespaceId(w.getNamespaceId());
workerDTO.setAppId(w.getAppId());
workerDTO.setWorkerKey(w.getWorkerKey());
workerDTO.setAddress(w.getAddress());
Expand All @@ -67,12 +68,13 @@ public static void refreshAppWorkers(List<Worker> workers) {
.collect(Collectors.groupingBy(WorkerDTO::getAppId));

ClusterContext.refreshAppWorkers(appWorkers);
log.info("Refresh app workers {}", appWorkers);
}

/**
* Online workers.
*
* @param appId appId
* @param appId appId
* @return Set
*/
public static Set<String> getOnlineWorkers(Long appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@
* @since 1.0.0
*/
public class ClusterUtilTest {

@Test
public void testGetKnowServersByOnlyOne() {
Map<Long, Node> nodesMap = new HashMap<>(16);
Node currentNode = null;
Node node = new Node();
node.setStatus(1);
node.setServerId(1L);
node.setIp("127.0.0.1");
node.setAkkaAddress(String.format("127.0.0.1:%d", 1L));
nodesMap.put(1L, node);

currentNode = node;
List<Long> knowServers = ClusterUtil.getKnowServers(nodesMap, currentNode, 5);
List<Long> expect = new ArrayList<>();
Assertions.assertEquals(knowServers, expect);
}

@Test
public void testGetKnowServersByTwo() {
Map<Long, Node> nodesMap = new HashMap<>(16);
Expand Down
Loading