Skip to content

Instantly share code, notes, and snippets.

@zew13
Created October 19, 2019 04:41
Show Gist options
  • Select an option

  • Save zew13/5414d904090576724896d58abf64b233 to your computer and use it in GitHub Desktop.

Select an option

Save zew13/5414d904090576724896d58abf64b233 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S node -r ./livescript-transform-implicit-async/register
PG = require("./_db")
{timestamp, END_DAY} = require(\./day.ls)
sync_city_product = require(\./sync_city_product.ls)
测试城市 = 666
li2array = (li)->
r = []
if li.length
for i in li[0]
r.push []
for i in li
for j,pos in i
r[pos].push j
t = []
for i in r
t.push 'unnest(array[' + i.join(',') + '])'
t.join(',')
li_insert = (table, column, li, conflict)~>
if not li.length
return
await PG.$raw(
"""INSERT INTO "#{table}" (#{column}) SELECT #{li2array(li)} ON CONFLICT (#{conflict}) DO NOTHING"""
)
var STORE
CITY_PRODUCT = new Set()
PRODUCT_UNIT2 = new Set()
LIMIT = 100000
SYNC = {
erp_purchase_order_detail:(db, table, id)->
li = (await db.raw("select id,city_product_id,adjust_price,price,collect_weight,purchase_order_no,create_time,purchase_weight,adjust_purchase_weight from #{table} WHERE id>? order by id asc limit #{LIMIT}", id))[0]
if not (li and li.length)
return
order_set = new Set()
for i in li
order_set.add(i.purchase_order_no)
rli = []
key2order = {}
purchase_order_time = {}
for {purchase_order_no,city_id,seq,create_time,create_user_id} in (
await db.raw(
"select purchase_order_no,city_id,seq,create_time,create_user_id from erp_purchase_order where purchase_order_no in ('#{[...order_set].join("','")}')"
)
)[0]
if city_id == 测试城市
continue
purchase_order_time[purchase_order_no] = time = timestamp create_time
rli.push [city_id,time,seq,create_user_id]
key2order["#{city_id},#{time},#{seq}"] = purchase_order_no
await li_insert("cost_order", "city_id,time,seq,worker_id", rli, "city_id,time,seq")
values = []
for k,v of key2order
values.push k
order_dict = {}
for [id,city_id,time,seq] in (await PG.$li(
"""SELECT id,city_id,time,seq FROM cost_order JOIN (VALUES (#{values.join('),(')})) AS t (c,t,s) ON c=city_id AND t=time AND s=seq"""
))
if city_id == 测试城市
continue
order_dict[key2order["#{city_id},#{time},#{seq}"]] = id
rli = []
for {id,city_product_id,price,adjust_price,collect_weight,purchase_order_no,create_time,adjust_purchase_weight,purchase_weight} in li
weight = collect_weight or adjust_purchase_weight or purchase_weight
if not weight
continue
price = parseInt(adjust_price*100) or parseInt(price*100)
if not ( price and (price > 0) )
continue
rli.push [
city_product_id
price
weight - 0
order_dict[purchase_order_no] or 0
purchase_order_time[purchase_order_no] or timestamp(create_time)
]
await li_insert(\cost_bill, "city_product_id,price,weight,cost_order_id,time", rli, "city_product_id,time")
return id
tbl_order_detail_js: (db, table, id)~>
try
li = (await db.raw(
"select id,create_time,order_id,city_product_id,price,actual_price,actual_weight,weight,create_user_id from #{table} where id>? order by id asc limit #{LIMIT}"
id
))[0]
catch err
console.trace!
console.log err
console.log db.client.config.connection
return
if not li.length
return
order_product = {}
order_worker = {}
for i in li
{
id,create_time,order_id,city_product_id,
price,actual_price,actual_weight,create_user_id
} = i
order_worker[order_id] = create_user_id
city_product_id = city_product_id - 0
product_id = CITY_PRODUCT[city_product_id]
if not product_id
{prod_show_name} = (await db.raw("select prod_show_name from #{table} where id=?",id))[0][0]
q = {
name:prod_show_name
}
o = (await db.raw(
"SELECT unit_type FROM caicai.erp_purchase_order_detail WHERE city_product_id=? ORDER BY id DESC LIMIT 1"
city_product_id
))[0]
if o and o[0]
q.unit= o[0].unit_type
await PG.product.upsert(
q
"name"
)
product = await PG.product.get(q,"id")
CITY_PRODUCT[city_product_id] = product_id = product.id
if q.unit == 2
PRODUCT_UNIT2.add(product_id)
await PG.city_product.upsert({id:city_product_id, product_id},"id")
if PRODUCT_UNIT2.has(product_id)
actual_weight = (weight or 1)*1000
order = order_product[order_id]
if not order
order_product[order_id] = order = {}
product = order[city_product_id]
if product
product[0] += actual_weight
product[1] += actual_price
product[2] += (price - actual_price)
else
product = order[city_product_id] = [
actual_weight,
actual_price,
price - actual_price,
timestamp create_time
]
rli = []
for order_id, product_dict of order_product
for city_product_id, [weight, price, change, time] of product_dict
rli.push [order_id, city_product_id, weight, price, change, time]
await li_insert(\bill, "order_id, city_product_id, weight, pay, change, time", rli, "order_id, city_product_id")
rli = []
for k,v of order_worker
rli.push "(#{k},#{v or 0})"
await PG.$raw(
"""update "order" as t set worker_id = c.worker_id from (values #{rli.join(',')}) as c(order_id, worker_id) where c.order_id = t.order_id"""
)
return id
tbl_order: (db, table, id)~>
# (status=4 or status=6 or status=11) and
li = (await db.raw("select id,city_id,store_id,order_id,user_id,total_coupon,total_full,total_vip,total_price,create_time,status from #{table} where id>? order by id asc limit #{LIMIT}", id))[0]
if not li.length
return
rli = []
for i in li
{
id,city_id,store_id,order_id,user_id,total_coupon,
total_full,total_vip,total_price,create_time,status
} = i
if city_id == 测试城市
continue
store_id = store_id - 0
if not STORE.has(store_id)
await PG.store.upsert({id:store_id, city_id},"id")
STORE.add(store_id)
rli.push [
user_id
order_id - 0
store_id
timestamp create_time
parseInt(total_price or 0)
parseInt(total_coupon or 0)
parseInt(total_vip or 0)
parseInt(total_full or 0)
status
]
await li_insert("order", "user_id,order_id,store_id,time,pay,coupon,vip,fulloff,state", rli, "order_id")
return id
}
sync = (connection)~>
database = connection.database
db = require('knex')({
client: 'mysql2'
connection:{
host:connection.host
port:connection.port
user:connection.user
password:connection.password
database
}
})
db.id = connection.id
if database == \erp
table_li = [
\erp_purchase_order_detail
]
else
table_li = [
\tbl_order_detail_js
\tbl_order
]
if database == \trade_his
suffix = "_his"
todo = []
for table in table_li
todo.push _sync(db, SYNC[table], table+(suffix or ''))
await Promise.all(todo)
if database == \caicai
await sync_city_product(db)
_sync = (db, func, table)~>
connection = db.client.config.connection
_where = {table, db_id:db.id}
{sync_id} = (await PG.sync.get(_where, "sync_id")) or {sync_id:0}
# sync_id = sync_id - 500000
while 1
console.log (
new Date().toLocaleTimeString()
), connection.host.split(".")[0], connection.database, table, sync_id
try
sync_id = await func(db, table, sync_id)
catch e
console.log db.client.config.connection.database, table, func
console.log e
console.trace()
if sync_id
_where.sync_id = sync_id
await PG.sync.upsert(_where,"db_id,table")
else
break
PG.$ ->
STORE := new Set(await PG.$li1("select id from store"))
for [id,product_id] in await PG.$li("select id,product_id from city_product")
CITY_PRODUCT[id] = product_id
for id in await PG.$li1("select id from product where unit=2")
PRODUCT_UNIT2.add(id)
li = []
for i in await PG.db.select("*")
li.push sync(i)
await Promise.all(li)
process.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment