Created
October 19, 2019 04:41
-
-
Save zew13/5414d904090576724896d58abf64b233 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S node -r ./livescript-transform-implicit-async/register | |
| PG = require("./_db") | |
| {timestamp, END_DAY} = require(\./day.ls) | |
| sync_city_product = require(\./sync_city_product.ls) | |
| 测试城市 = 666 | |
| li2array = (li)-> | |
| r = [] | |
| if li.length | |
| for i in li[0] | |
| r.push [] | |
| for i in li | |
| for j,pos in i | |
| r[pos].push j | |
| t = [] | |
| for i in r | |
| t.push 'unnest(array[' + i.join(',') + '])' | |
| t.join(',') | |
| li_insert = (table, column, li, conflict)~> | |
| if not li.length | |
| return | |
| await PG.$raw( | |
| """INSERT INTO "#{table}" (#{column}) SELECT #{li2array(li)} ON CONFLICT (#{conflict}) DO NOTHING""" | |
| ) | |
| var STORE | |
| CITY_PRODUCT = new Set() | |
| PRODUCT_UNIT2 = new Set() | |
| LIMIT = 100000 | |
| SYNC = { | |
| erp_purchase_order_detail:(db, table, id)-> | |
| li = (await db.raw("select id,city_product_id,adjust_price,price,collect_weight,purchase_order_no,create_time,purchase_weight,adjust_purchase_weight from #{table} WHERE id>? order by id asc limit #{LIMIT}", id))[0] | |
| if not (li and li.length) | |
| return | |
| order_set = new Set() | |
| for i in li | |
| order_set.add(i.purchase_order_no) | |
| rli = [] | |
| key2order = {} | |
| purchase_order_time = {} | |
| for {purchase_order_no,city_id,seq,create_time,create_user_id} in ( | |
| await db.raw( | |
| "select purchase_order_no,city_id,seq,create_time,create_user_id from erp_purchase_order where purchase_order_no in ('#{[...order_set].join("','")}')" | |
| ) | |
| )[0] | |
| if city_id == 测试城市 | |
| continue | |
| purchase_order_time[purchase_order_no] = time = timestamp create_time | |
| rli.push [city_id,time,seq,create_user_id] | |
| key2order["#{city_id},#{time},#{seq}"] = purchase_order_no | |
| await li_insert("cost_order", "city_id,time,seq,worker_id", rli, "city_id,time,seq") | |
| values = [] | |
| for k,v of key2order | |
| values.push k | |
| order_dict = {} | |
| for [id,city_id,time,seq] in (await PG.$li( | |
| """SELECT id,city_id,time,seq FROM cost_order JOIN (VALUES (#{values.join('),(')})) AS t (c,t,s) ON c=city_id AND t=time AND s=seq""" | |
| )) | |
| if city_id == 测试城市 | |
| continue | |
| order_dict[key2order["#{city_id},#{time},#{seq}"]] = id | |
| rli = [] | |
| for {id,city_product_id,price,adjust_price,collect_weight,purchase_order_no,create_time,adjust_purchase_weight,purchase_weight} in li | |
| weight = collect_weight or adjust_purchase_weight or purchase_weight | |
| if not weight | |
| continue | |
| price = parseInt(adjust_price*100) or parseInt(price*100) | |
| if not ( price and (price > 0) ) | |
| continue | |
| rli.push [ | |
| city_product_id | |
| price | |
| weight - 0 | |
| order_dict[purchase_order_no] or 0 | |
| purchase_order_time[purchase_order_no] or timestamp(create_time) | |
| ] | |
| await li_insert(\cost_bill, "city_product_id,price,weight,cost_order_id,time", rli, "city_product_id,time") | |
| return id | |
| tbl_order_detail_js: (db, table, id)~> | |
| try | |
| li = (await db.raw( | |
| "select id,create_time,order_id,city_product_id,price,actual_price,actual_weight,weight,create_user_id from #{table} where id>? order by id asc limit #{LIMIT}" | |
| id | |
| ))[0] | |
| catch err | |
| console.trace! | |
| console.log err | |
| console.log db.client.config.connection | |
| return | |
| if not li.length | |
| return | |
| order_product = {} | |
| order_worker = {} | |
| for i in li | |
| { | |
| id,create_time,order_id,city_product_id, | |
| price,actual_price,actual_weight,create_user_id | |
| } = i | |
| order_worker[order_id] = create_user_id | |
| city_product_id = city_product_id - 0 | |
| product_id = CITY_PRODUCT[city_product_id] | |
| if not product_id | |
| {prod_show_name} = (await db.raw("select prod_show_name from #{table} where id=?",id))[0][0] | |
| q = { | |
| name:prod_show_name | |
| } | |
| o = (await db.raw( | |
| "SELECT unit_type FROM caicai.erp_purchase_order_detail WHERE city_product_id=? ORDER BY id DESC LIMIT 1" | |
| city_product_id | |
| ))[0] | |
| if o and o[0] | |
| q.unit= o[0].unit_type | |
| await PG.product.upsert( | |
| q | |
| "name" | |
| ) | |
| product = await PG.product.get(q,"id") | |
| CITY_PRODUCT[city_product_id] = product_id = product.id | |
| if q.unit == 2 | |
| PRODUCT_UNIT2.add(product_id) | |
| await PG.city_product.upsert({id:city_product_id, product_id},"id") | |
| if PRODUCT_UNIT2.has(product_id) | |
| actual_weight = (weight or 1)*1000 | |
| order = order_product[order_id] | |
| if not order | |
| order_product[order_id] = order = {} | |
| product = order[city_product_id] | |
| if product | |
| product[0] += actual_weight | |
| product[1] += actual_price | |
| product[2] += (price - actual_price) | |
| else | |
| product = order[city_product_id] = [ | |
| actual_weight, | |
| actual_price, | |
| price - actual_price, | |
| timestamp create_time | |
| ] | |
| rli = [] | |
| for order_id, product_dict of order_product | |
| for city_product_id, [weight, price, change, time] of product_dict | |
| rli.push [order_id, city_product_id, weight, price, change, time] | |
| await li_insert(\bill, "order_id, city_product_id, weight, pay, change, time", rli, "order_id, city_product_id") | |
| rli = [] | |
| for k,v of order_worker | |
| rli.push "(#{k},#{v or 0})" | |
| await PG.$raw( | |
| """update "order" as t set worker_id = c.worker_id from (values #{rli.join(',')}) as c(order_id, worker_id) where c.order_id = t.order_id""" | |
| ) | |
| return id | |
| tbl_order: (db, table, id)~> | |
| # (status=4 or status=6 or status=11) and | |
| li = (await db.raw("select id,city_id,store_id,order_id,user_id,total_coupon,total_full,total_vip,total_price,create_time,status from #{table} where id>? order by id asc limit #{LIMIT}", id))[0] | |
| if not li.length | |
| return | |
| rli = [] | |
| for i in li | |
| { | |
| id,city_id,store_id,order_id,user_id,total_coupon, | |
| total_full,total_vip,total_price,create_time,status | |
| } = i | |
| if city_id == 测试城市 | |
| continue | |
| store_id = store_id - 0 | |
| if not STORE.has(store_id) | |
| await PG.store.upsert({id:store_id, city_id},"id") | |
| STORE.add(store_id) | |
| rli.push [ | |
| user_id | |
| order_id - 0 | |
| store_id | |
| timestamp create_time | |
| parseInt(total_price or 0) | |
| parseInt(total_coupon or 0) | |
| parseInt(total_vip or 0) | |
| parseInt(total_full or 0) | |
| status | |
| ] | |
| await li_insert("order", "user_id,order_id,store_id,time,pay,coupon,vip,fulloff,state", rli, "order_id") | |
| return id | |
| } | |
| sync = (connection)~> | |
| database = connection.database | |
| db = require('knex')({ | |
| client: 'mysql2' | |
| connection:{ | |
| host:connection.host | |
| port:connection.port | |
| user:connection.user | |
| password:connection.password | |
| database | |
| } | |
| }) | |
| db.id = connection.id | |
| if database == \erp | |
| table_li = [ | |
| \erp_purchase_order_detail | |
| ] | |
| else | |
| table_li = [ | |
| \tbl_order_detail_js | |
| \tbl_order | |
| ] | |
| if database == \trade_his | |
| suffix = "_his" | |
| todo = [] | |
| for table in table_li | |
| todo.push _sync(db, SYNC[table], table+(suffix or '')) | |
| await Promise.all(todo) | |
| if database == \caicai | |
| await sync_city_product(db) | |
| _sync = (db, func, table)~> | |
| connection = db.client.config.connection | |
| _where = {table, db_id:db.id} | |
| {sync_id} = (await PG.sync.get(_where, "sync_id")) or {sync_id:0} | |
| # sync_id = sync_id - 500000 | |
| while 1 | |
| console.log ( | |
| new Date().toLocaleTimeString() | |
| ), connection.host.split(".")[0], connection.database, table, sync_id | |
| try | |
| sync_id = await func(db, table, sync_id) | |
| catch e | |
| console.log db.client.config.connection.database, table, func | |
| console.log e | |
| console.trace() | |
| if sync_id | |
| _where.sync_id = sync_id | |
| await PG.sync.upsert(_where,"db_id,table") | |
| else | |
| break | |
| PG.$ -> | |
| STORE := new Set(await PG.$li1("select id from store")) | |
| for [id,product_id] in await PG.$li("select id,product_id from city_product") | |
| CITY_PRODUCT[id] = product_id | |
| for id in await PG.$li1("select id from product where unit=2") | |
| PRODUCT_UNIT2.add(id) | |
| li = [] | |
| for i in await PG.db.select("*") | |
| li.push sync(i) | |
| await Promise.all(li) | |
| process.exit() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment