@Override public synchronized void status(final PipelineExecutionEntity entity) throws Exception { String stormUIUrl= (String) entity.getPipeline().getCluster().getProperties().get(STORM_URL); entity.requireUpdate(true); if (LogStormConstants.DeployMode.LOCAL.equals(entity.getPipeline().getMode())) { if(!ExecutionManager.getInstance().contains(entity.getName())){ LOG.info("Pipeline instance '{}' is not ready yet",entity.getName()); } PipelineExecutionStatus currentStatus = entity.getStatus(); PipelineExecutionStatus newStatus = ExecutionManager.getWorkerStatus(ExecutionManager.getInstance().get(entity.getName()).getState()); if (!currentStatus.equals(newStatus)) { LOG.info("Status of pipeline: {} changed from {} to {}", entity, currentStatus, newStatus); entity.setStatus(newStatus); entity.setDescription(String.format("Status of pipeline: %s changed from %s to %s", entity, currentStatus, newStatus)); } } else { try { Nimbus.Client client = getStormClient(entity); String id = entity.getProperties() == null? null:entity.getProperties().getProperty(topology_id_key); if(id == null || id.isEmpty()) { for (TopologySummary topologySummary : client.getClusterInfo().get_topologies()) { if (topologySummary.get_name().equals(entity.getName())) { id = topologySummary.get_id(); } } } if(id == null || id.isEmpty()){ throw new NotAliveException("Topology named "+entity.getName()+" is not found"); } else { TopologyInfo topologyInfo = client.getTopologyInfo(id); entity.setProperty(topology_id_key, topologyInfo.get_id()); entity.setProperty("topology.name", topologyInfo.get_name()); entity.setProperty("topology.status", topologyInfo.get_status()); entity.setProperty("topology.uptime_secs", String.valueOf(topologyInfo.get_uptime_secs())); entity.setProperty("topology.executors_size", String.valueOf(topologyInfo.get_executors_size())); entity.setProperty("topology.errors_size", String.valueOf(topologyInfo.get_errors_size())); entity.setUrl(String.format("%s/topology.html?id=%s",stormUIUrl,topologyInfo.get_id())); Map<String,List<ErrorInfo>> errors = topologyInfo.get_errors(); StringBuilder sb = new StringBuilder(); int errorInfoSize = 0; if(topologyInfo.get_errors_size()>0) { for (Map.Entry<String, List<ErrorInfo>> entry : errors.entrySet()) { sb.append(entry.getKey()); sb.append(": \n"); for (ErrorInfo errorInfo : entry.getValue()) { errorInfoSize ++; sb.append("\t"); sb.append(errorInfo.get_error()); sb.append("\n"); } } if(errorInfoSize>0) { LOG.error(sb.toString()); entity.setDescription(sb.toString()); } } entity.setStatus(ExecutionManager.getTopologyStatus(topologyInfo.get_status())); } }catch (NotAliveException ex){ LOG.info("{} not alive, change status as STOPPED",entity.getName(),ex); entity.setStatus(PipelineExecutionStatus.STOPPED); entity.setProperty("topology.status","NOT_ALIVE"); entity.setDescription(ex.getMessage()); } catch (Exception ex ){ entity.setStatus(PipelineExecutionStatus.STOPPED); entity.setProperty("topology.status","UNKNOWN"); entity.setDescription(ex.getMessage()); LOG.error(ex.getMessage(), ex); } } }