@Override
    public synchronized void status(final PipelineExecutionEntity entity) throws Exception {
        String stormUIUrl= (String) entity.getPipeline().getCluster().getProperties().get(STORM_URL);
        entity.requireUpdate(true);
        if (LogStormConstants.DeployMode.LOCAL.equals(entity.getPipeline().getMode())) {
            if(!ExecutionManager.getInstance().contains(entity.getName())){
                LOG.info("Pipeline instance '{}' is not ready yet",entity.getName());
            }
            PipelineExecutionStatus currentStatus = entity.getStatus();
            PipelineExecutionStatus newStatus = ExecutionManager.getWorkerStatus(ExecutionManager.getInstance().get(entity.getName()).getState());
            if (!currentStatus.equals(newStatus)) {
                LOG.info("Status of pipeline: {} changed from {} to {}", entity, currentStatus, newStatus);
                entity.setStatus(newStatus);
                entity.setDescription(String.format("Status of pipeline: %s changed from %s to %s", entity, currentStatus, newStatus));
            }
        } else {
            try {
                Nimbus.Client client = getStormClient(entity);
                String id = entity.getProperties() == null? null:entity.getProperties().getProperty(topology_id_key);
                if(id == null || id.isEmpty()) {
                    for (TopologySummary topologySummary : client.getClusterInfo().get_topologies()) {
                        if (topologySummary.get_name().equals(entity.getName())) {
                            id = topologySummary.get_id();
                        }
                    }
                }

                if(id == null || id.isEmpty()){
                    throw new NotAliveException("Topology named "+entity.getName()+" is not found");
                } else {
                    TopologyInfo topologyInfo = client.getTopologyInfo(id);
                    entity.setProperty(topology_id_key, topologyInfo.get_id());
                    entity.setProperty("topology.name", topologyInfo.get_name());
                    entity.setProperty("topology.status", topologyInfo.get_status());
                    entity.setProperty("topology.uptime_secs", String.valueOf(topologyInfo.get_uptime_secs()));
                    entity.setProperty("topology.executors_size", String.valueOf(topologyInfo.get_executors_size()));
                    entity.setProperty("topology.errors_size", String.valueOf(topologyInfo.get_errors_size()));
                    entity.setUrl(String.format("%s/topology.html?id=%s",stormUIUrl,topologyInfo.get_id()));
                    Map<String,List<ErrorInfo>> errors = topologyInfo.get_errors();
                    StringBuilder sb = new StringBuilder();

                    int errorInfoSize = 0;
                    if(topologyInfo.get_errors_size()>0) {
                        for (Map.Entry<String, List<ErrorInfo>> entry : errors.entrySet()) {
                            sb.append(entry.getKey());
                            sb.append(": \n");
                            for (ErrorInfo errorInfo : entry.getValue()) {
                                errorInfoSize ++;
                                sb.append("\t");
                                sb.append(errorInfo.get_error());
                                sb.append("\n");
                            }
                        }
                        if(errorInfoSize>0) {
                            LOG.error(sb.toString());
                            entity.setDescription(sb.toString());
                        }
                    }
                    entity.setStatus(ExecutionManager.getTopologyStatus(topologyInfo.get_status()));
                }
            }catch (NotAliveException ex){
                LOG.info("{} not alive, change status as STOPPED",entity.getName(),ex);
                entity.setStatus(PipelineExecutionStatus.STOPPED);
                entity.setProperty("topology.status","NOT_ALIVE");
                entity.setDescription(ex.getMessage());
            } catch (Exception ex ){
                entity.setStatus(PipelineExecutionStatus.STOPPED);
                entity.setProperty("topology.status","UNKNOWN");
                entity.setDescription(ex.getMessage());
                LOG.error(ex.getMessage(), ex);
            }
        }
    }