Skip to content

Instantly share code, notes, and snippets.

@zew13
Created January 12, 2021 07:53
Show Gist options
  • Select an option

  • Save zew13/554057637b3bcf0e605423f85bb47b53 to your computer and use it in GitHub Desktop.

Select an option

Save zew13/554057637b3bcf0e605423f85bb47b53 to your computer and use it in GitHub Desktop.
#!/usr/bin/env coffee
import chunk from 'lodash/chunk'
import R from './lib/redis/r'
import ID from './lib/redis/id'
import {ftruncateSync} from 'fs'
import {join,dirname} from 'path'
import BASE64 from 'urlsafe-base64'
import fs from 'fs/promises'
import b64dir from './lib/b64dir'
import redis from './redis'
import DIR from './const/dir/rmw'
import uniq from 'lodash/uniq'
import * as B64 from './base'
import * as time from './lib/time'
import {hex,binary} from './const.mjs'
import * as url_offset from './lib/url_offset'
import site_or_hash from './util/site_or_hash'
import int2bin from './lib/int2bin'
import {MB} from '@rmw/fsrv/const'
import {FLAG_URL as _FLAG_URL} from './flag.mjs'
GB = MB*1024
FLAG_URL = Buffer.from [_FLAG_URL]
DIR_TMP = join DIR, "tmp"
# 每分钟统计上一分钟发出的请求数,响应的请求数,然后如果发出请求数<响应的请求数,追加发max(响应的请求数*0.1,1)个请求
export default ->
new Down(...arguments)
class Down
constructor:(@db, @conn, @split)->
@_ing = ing = {}
self = @
{_send} = self
_send = _send.bind self
timer = =>
setTimeout(
=>
now = time.now()
count = 0
resend = []
for task_addr_offset of ing
# task_id+int2bin(addr_id)+int2bin(offset)]
++count
cost = now - ing[task_addr_offset]
resend.push task_addr_offset
if cost < 60
break
console.log "count", count, ing
if count
timer()
else
self._send = tsend
return
3000
)
@_send = tsend = ->
timer()
self._send = _send
_send.apply @,arguments
@_boot()
_send_bin:(addr,key,url,offset)->
@_send(
Buffer.from(
addr,binary
).toString(hex)
Buffer.from(key, binary)
url
offset
)
_boot:->
{db} = @
li = chunk await redis.zrevrangebyscore("task",'+inf',0,'withscores'),2
if not li.length
return
p = redis.pipeline()
for [key_url, id],pos in li
id = int2bin parseInt id
p.srandmember(R.task.todo+id)
r_addr = R.task.addr+id
p.rpoplpush(r_addr, r_addr)
li[pos] = [id, Buffer.from(key_url[...6],binary).readUIntBE(0,6), key_url[6..]]
todo = []
now = time.now()
for [offset, addr_id],pos in chunk((parseInt(i[1]) for i in await p.exec()),2)
[task_id, key_id, url] = li[pos]
{key} = await db[site_or_hash(url)].findOne(selector:id:key_id).exec()
{addr} = await db.addr.findOne(selector:id:key_id).exec()
console.log [offset, addr, key, url]
@_ing[task_id+int2bin(addr_id)+int2bin(offset)] = now
todo.push @_send_bin(addr, key, url, offset)
Promise.all todo
_task_key:(key, url)->
{id} = await @db[site_or_hash(url)].id {key}
int2bin(id)+url
_task_id:(key, url)->
int2bin await ID.task(await @_task_key(key,url))
get:(dir, key, url, addr_li)->
if not addr_li.length
return
{db} = @
m = await db.addr.findByIds addr_li
addr_id_rank = new Map()
addr_id_li = []
id_addr = new Map()
for addr from addr_li
o = m.get addr
if o
id = o.id
if o.recv
addr_id_rank.set id, o.recv
else
{id} = await db.addr.id {addr}
id_addr.set(id, addr)
addr_id_li.push id
addr_id_li.sort (a,b)=>
(addr_id_rank.get(b) or 0) - (addr_id_rank.get(a) or 0)
task_id = await @_task_id key, url
r_todo = R.task.todo+task_id
r_addr = R.task.addr+task_id
ctime = time.now()
p = redis.pipeline()
if await redis.zscore R.task.ctime, task_id
offset = await redis.srandmember r_todo
addr_li = await redis.lrange r_addr,0,-1
exist = new Set addr_li.map((n)=>parseInt(n))
new_addr = []
for i from addr_id_li
if not exist.has i
new_addr.push i
if new_addr.length
p.rpush r_addr,...new_addr
else
offset = 0
p.zadd(
R.task.ctime
ctime
task_id
)
.sadd(r_todo, offset)
.rpush(
r_addr,...addr_id_li
)
addr_id = addr_id_li[addr_id_li.length-1]
@_ing[task_id+int2bin(addr_id)+int2bin(offset)] = ctime
await p.exec()
await @_send_bin(
id_addr.get(addr_id)
key
url
offset
)
return
recv:(src, key, url, offset, more, payload)->
{db} = @
keybuf = Buffer.from key, binary
dir = join DIR_TMP, site_or_hash(url), b64dir(keybuf)
outpath = join dir,url
task_key = await @_task_key key,url
task_id = int2bin(await ID.task(task_key))
addr = await db.addr.id({addr:Buffer.from(src,hex).toString binary})
task_src = task_id+int2bin(addr.id)
console.log "recv",url,offset,dir
p = redis.pipeline()
delete @_ing[task_src+int2bin(offset)]
r_todo = R.task.todo+task_id
[[_,ismember],[_,scard]] = await redis.pipeline().sismember(r_todo,offset).scard(r_todo).exec()
if not ismember
return
_done = =>
p.zrem("task", task_key)
.del(r_todo)
.del(R.task.addr+task_id)
.zrem(R.task.ctime,task_id)
await fs.mkdir(dirname(outpath), { recursive: true })
if offset == 0
if more
size = payload.readUIntBE 0,6
payload = payload[6..]
n = payload.length
size += n
todo = []
while n < size
todo.push n
n += @split
fh = await fs.open(outpath, "w")
await fh.truncate(size)
await fh.write(payload)
await fh.close()
next = todo[0]
@_ing[task_src+int2bin(next)] = time.now()
p.srem(
r_todo
0
).sadd(
r_todo, ...todo
)
await @_send src, keybuf, url, next
else
await fs.writeFile outpath, payload
_done()
await p.exec()
else
fh = await fs.open(outpath,"r+")
await fh.write(
payload
0
payload.length
offset
)
await fh.close()
if 1 == scard
_done()
await p.exec()
else
p.srem(
r_todo
offset
)
p.srandmember(r_todo)
next = parseInt (await p.exec()).pop()[1]
await @_send src, keybuf, url, next
addr.update({$inc:recv:(payload.length+33+url.length)/GB})
# TODO rank 更新
# 除了rank,还要检查是否在ing中
console.log [
outpath
BASE64.encode(Buffer.from(src,hex))
BASE64.encode(Buffer.from(key,binary))
url
offset
more
next
payload.length
payload.toString('utf8')[..32]
]
return
# else
# if offset == 0
# size = payload.readUIntBE(pos,6)
# payload = payload[pos+6..]
# size += payload.length
# else
# #TODO
# payload = payload[pos..]
# console.log src, payload
_send: (addr, key, url, offset)->
console.log "send", BASE64.encode(Buffer.from(addr,hex)), key.length, url, offset
@conn.send(
addr
Buffer.concat [
FLAG_URL
key
url_offset.dump(url, offset)
]
noReply:true
)
# 每10秒检查一下,给这10秒没有更新的,重发一个请求,连续失败7次,那么放弃这个地址,如果没有可用的地址,任务暂时失败
# 30分钟后清零,重试9999次后任务失败
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment