Created
March 19, 2024 18:49
-
-
Save marcellobenigno/59c56ed98c64db0d2f9e3c71facd76f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from osgeo import ogr | |
from shapely.geometry import MultiLineString | |
from shapely.wkt import loads | |
ogr.UseExceptions() | |
class Demanda: | |
def __init__(self, datasource_entrada, layer='layer_demandas'): | |
self.datasource_entrada = datasource_entrada | |
self.layer = layer | |
self.demandas_ordenadas = self.cria_demandas_ordenadas_por_arruamento() | |
def __str__(self): | |
return self.get_layer().GetName() | |
def get_layer(self): | |
return self.datasource_entrada.GetLayer(self.layer) | |
def get_srs(self): | |
return self.get_layer().GetSpatialRef() | |
def associa_streetcode_demanda(self, layer_com_street_code): | |
self.get_layer().CreateField(ogr.FieldDefn('StreetCode', ogr.OFTInteger)) | |
lyr_com_street_code = self.datasource_entrada.GetLayer(layer_com_street_code.GetName()) | |
lyr_alvo_index = self.get_indice_espacial(lyr_com_street_code) | |
for feat in self.get_layer(): | |
nearest_feature_fid = self.encontra_feicao_mais_proxima(feat, lyr_alvo_index) | |
if nearest_feature_fid is not None: | |
nearest_feature = lyr_com_street_code.GetFeature(nearest_feature_fid) | |
street_code = nearest_feature.GetField('StreetCode') | |
feat.SetField('StreetCode', street_code) | |
self.get_layer().SetFeature(feat) | |
return self.get_layer() | |
def recupera_streetcodes_com_demanda(self): | |
return list(set([feature['StreetCode'] for feature in self.get_layer()])) | |
def get_indice_espacial(self, layer_com_street_code): | |
spatial_index = {} | |
for feature in layer_com_street_code: | |
spatial_index[feature.GetFID()] = (feature.GetGeometryRef().GetX(), feature.GetGeometryRef().GetY()) | |
return spatial_index | |
def encontra_feicao_mais_proxima(self, feature, spatial_index): | |
min_dist = float('inf') | |
nearest_fid = None | |
feature_geometry = feature.GetGeometryRef() | |
feature_point = (feature_geometry.GetX(), feature_geometry.GetY()) | |
for fid, point in spatial_index.items(): | |
distance = ((point[0] - feature_point[0]) ** 2 + (point[1] - feature_point[1]) ** 2) ** 0.5 | |
if distance < min_dist: | |
nearest_fid = fid | |
min_dist = distance | |
return nearest_fid | |
def cria_demandas_ordenadas_por_arruamento(self): | |
layer = self.datasource_entrada.CreateLayer( | |
'layer_demandas_ordenadas', srs=self.get_srs(), geom_type=ogr.wkbPoint | |
) | |
layer.CreateField(ogr.FieldDefn('id', ogr.OFTInteger)) | |
layer.CreateField(ogr.FieldDefn('id_demanda', ogr.OFTInteger)) | |
layer.CreateField(ogr.FieldDefn('StreetCode', ogr.OFTInteger)) | |
layer.CreateField(ogr.FieldDefn('market-index', ogr.OFTReal)) | |
layer.CreateField(ogr.FieldDefn('dist_arruamento', ogr.OFTReal)) | |
layer.CreateField(ogr.FieldDefn('id_caixa', ogr.OFTString)) | |
layer.CreateField(ogr.FieldDefn('associado', ogr.OFTInteger)) | |
return layer | |
def gera_demandas_ordenadas_por_arruamento(self, streetcode): | |
sql = f''' | |
SELECT | |
ROW_NUMBER() OVER () AS id, | |
id_demanda, | |
StreetCode, | |
"market-index", | |
dist_arruamento, | |
geometry | |
FROM ( | |
SELECT | |
d.id_demanda, | |
d.StreetCode, | |
d."market-index", | |
ST_Distance(d.geometry, a.geometry) AS dist_arruamento, | |
d.geometry | |
FROM layer_arruamento a, | |
layer_demandas d | |
WHERE a.StreetCode = {streetcode} | |
AND d.StreetCode = {streetcode} | |
ORDER BY st_line_locate_point(a.geometry, st_closestpoint(a.geometry, d.geometry)) | |
) AS ordered_data | |
''' | |
query = self.datasource_entrada.ExecuteSQL(sql, dialect="SQLite") | |
id_caixa = 1 | |
acumulador = 0 | |
if query: | |
for row in query: | |
self.get_layer().StartTransaction() | |
feature = ogr.Feature(self.demandas_ordenadas.GetLayerDefn()) | |
feature.SetGeometry(row['geometry']) | |
feature.SetField('id', row['id']) | |
feature.SetField('id_demanda', row['id_demanda']) | |
feature.SetField('StreetCode', row['StreetCode']) | |
feature.SetField('market-index', row['market-index']) | |
feature.SetField('dist_arruamento', row['dist_arruamento']) | |
feature.SetField('associado', 0) | |
if acumulador + row['market-index'] <= 8: | |
acumulador += row['market-index'] | |
else: | |
id_caixa += 1 | |
acumulador = row['market-index'] | |
feature.SetField('id_caixa', f'{row["StreetCode"]}.{id_caixa}') | |
self.demandas_ordenadas.SetFeature(feature) | |
self.demandas_ordenadas.CommitTransaction() | |
self.datasource_entrada.ReleaseResultSet(query) | |
return self.datasource_entrada.GetLayer('layer_demandas_ordenadas') | |
def get_maior_distancia_arruamento(self, demandas_ordenadas, street_code): | |
sql = f''' | |
SELECT max(dist_arruamento) AS max_dist_arruamento | |
FROM {demandas_ordenadas.GetName()} | |
WHERE "StreetCode" = {street_code} | |
''' | |
query = self.datasource_entrada.ExecuteSQL(sql, dialect="SQLite") | |
max_dist_arruamento = 0 | |
for row in query: | |
max_dist_arruamento = row['max_dist_arruamento'] | |
self.datasource_entrada.ReleaseResultSet(query) | |
return max_dist_arruamento | |
def gera_pnt_inicial_final_id_caixas(self, lyr_demanda_ordenada, street_code): | |
aux = [] | |
lyr_demanda_ordenada.SetAttributeFilter(f"StreetCode = {street_code}") | |
for feature in lyr_demanda_ordenada: | |
id = feature['id'] | |
id_demanda = feature['id_demanda'] | |
street_code = feature['StreetCode'] | |
id_caixa = feature['id_caixa'] | |
geometry = feature.GetGeometryRef().ExportToWkt() | |
aux.append({ | |
'id': id, | |
'id_caixa': id_caixa, | |
'id_demanda': id_demanda, | |
'street_code': street_code, | |
'geometry': geometry, | |
}) | |
result = defaultdict(dict) | |
for item in aux: | |
id_caixa = item['id_caixa'] | |
geometry = item['geometry'] | |
if id_caixa not in result: | |
result[id_caixa]['id_caixa'] = id_caixa | |
result[id_caixa]['pnt_inicial'] = geometry | |
else: | |
result[id_caixa]['pnt_final'] = geometry | |
lyr_demanda_ordenada.SetAttributeFilter(None) | |
return list(result.values()) | |
def atualiza_campo_associado(self): | |
lyr_demandas_ordenadas = self.datasource_entrada.GetLayer('layer_demandas_ordenadas') | |
lyr_area_caixa = self.datasource_entrada.GetLayer('areas_de_caixa') | |
lyr_demandas_ordenadas.StartTransaction() | |
for feat_demandas in lyr_demandas_ordenadas: | |
geom_demandas = feat_demandas.GetGeometryRef() | |
is_disjoint = 0 | |
for feat_caixa in lyr_area_caixa: | |
geom_caixa = feat_caixa.GetGeometryRef() | |
if not geom_demandas.Disjoint(geom_caixa): | |
is_disjoint = 1 | |
break | |
feat_demandas.SetField('associado', is_disjoint) | |
lyr_demandas_ordenadas.SetFeature(feat_demandas) | |
lyr_demandas_ordenadas.CommitTransaction() | |
return lyr_demandas_ordenadas | |
def linhas_demandas(self): | |
lyr_demandas = self.datasource_entrada.GetLayer('layer_demandas_ordenadas') | |
lyr_caixa = self.datasource_entrada.GetLayer('areas_de_caixa') | |
caixa_list = list(set([feat['id_caixa'] for feat in lyr_demandas])) | |
lyr_linhas_demandas = self.datasource_entrada.CreateLayer( | |
'layer_linhas_demandas', srs=self.get_srs(), geom_type=ogr.wkbLineString | |
) | |
lyr_linhas_demandas.CreateField(ogr.FieldDefn('id_caixa', ogr.OFTString)) | |
for caixa in caixa_list: | |
linestring = ogr.Geometry(ogr.wkbLineString) | |
lyr_demandas.SetAttributeFilter(f"associado = 0 AND id_caixa = '{caixa}'") | |
lista_fids = [fc.GetFID() for fc in lyr_demandas] | |
for i, val in enumerate(lista_fids): | |
if i + 1 < len(lista_fids): | |
feature_current = lyr_demandas.GetFeature(lista_fids[i]) | |
feature_next = lyr_demandas.GetFeature(lista_fids[i + 1]) | |
geom_current = feature_current.GetGeometryRef() | |
geom_next = feature_next.GetGeometryRef() | |
linestring.AddPoint(geom_current.GetX(), geom_current.GetY()) | |
linestring.AddPoint(geom_next.GetX(), geom_next.GetY()) | |
linestring.FlattenTo2D() | |
if not linestring.IsEmpty(): | |
feature = ogr.Feature(lyr_linhas_demandas.GetLayerDefn()) | |
feature.SetGeometry(linestring) | |
feature.SetField("id_caixa", caixa) | |
lyr_linhas_demandas.CreateFeature(feature) | |
lyr_demandas.SetAttributeFilter(None) | |
lyr_diferenca = self.datasource_entrada.CreateLayer( | |
'layer_linhas_demandas_diff', srs=self.get_srs(), geom_type=ogr.wkbLineString | |
) | |
lyr_diferenca.CreateField(ogr.FieldDefn('id_caixa', ogr.OFTString)) | |
feature = ogr.Feature(lyr_diferenca.GetLayerDefn()) | |
# Iterate over features in lyr_linhas_demandas | |
for feat1 in lyr_linhas_demandas: | |
geom1 = feat1.GetGeometryRef() | |
for feat2 in lyr_caixa: | |
geom2 = feat2.GetGeometryRef() | |
feature = ogr.Feature(lyr_diferenca.GetLayerDefn()) | |
if geom1.Intersects(geom2): | |
i = 1 | |
result = geom1.Difference(geom2) | |
multi = MultiLineString(loads(result.ExportToWkt())) | |
linestrings = multi.geoms | |
for linestring in linestrings: | |
line = ogr.CreateGeometryFromWkt(linestring.wkt) | |
feature.SetGeometry(line) | |
feature.SetField("id_caixa", f"{feat1['id_caixa']}.{i}") | |
lyr_diferenca.CreateFeature(feature) | |
i += 1 | |
else: | |
feature.SetGeometry(geom1) | |
feature.SetField("id_caixa", f"{feat1['id_caixa']}") | |
lyr_diferenca.CreateFeature(feature) | |
feicoes_para_excluir = [] | |
for feat1 in lyr_diferenca: | |
pai = feat1['id_caixa'].count('.') == 1 | |
if feat1.GetFID() in feicoes_para_excluir: | |
continue | |
for feat2 in lyr_diferenca: | |
if feat1.GetFID() == feat2.GetFID(): | |
continue | |
if feat2['id_caixa'].startswith( | |
feat1['id_caixa'] | |
) and feat2['id_caixa'].count('.') >= 2 and pai: | |
feicoes_para_excluir.append(feat1.GetFID()) | |
break | |
elif feat2['id_caixa'] == feat1['id_caixa']: | |
feicoes_para_excluir.append(feat2.GetFID()) | |
for fid in feicoes_para_excluir: | |
lyr_diferenca.DeleteFeature(fid) | |
print('lista: ', feicoes_para_excluir) | |
return lyr_diferenca |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment