Created
January 12, 2021 07:53
-
-
Save zew13/554057637b3bcf0e605423f85bb47b53 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env coffee | |
| import chunk from 'lodash/chunk' | |
| import R from './lib/redis/r' | |
| import ID from './lib/redis/id' | |
| import {ftruncateSync} from 'fs' | |
| import {join,dirname} from 'path' | |
| import BASE64 from 'urlsafe-base64' | |
| import fs from 'fs/promises' | |
| import b64dir from './lib/b64dir' | |
| import redis from './redis' | |
| import DIR from './const/dir/rmw' | |
| import uniq from 'lodash/uniq' | |
| import * as B64 from './base' | |
| import * as time from './lib/time' | |
| import {hex,binary} from './const.mjs' | |
| import * as url_offset from './lib/url_offset' | |
| import site_or_hash from './util/site_or_hash' | |
| import int2bin from './lib/int2bin' | |
| import {MB} from '@rmw/fsrv/const' | |
| import {FLAG_URL as _FLAG_URL} from './flag.mjs' | |
| GB = MB*1024 | |
| FLAG_URL = Buffer.from [_FLAG_URL] | |
| DIR_TMP = join DIR, "tmp" | |
| # 每分钟统计上一分钟发出的请求数,响应的请求数,然后如果发出请求数<响应的请求数,追加发max(响应的请求数*0.1,1)个请求 | |
| export default -> | |
| new Down(...arguments) | |
| class Down | |
| constructor:(@db, @conn, @split)-> | |
| @_ing = ing = {} | |
| self = @ | |
| {_send} = self | |
| _send = _send.bind self | |
| timer = => | |
| setTimeout( | |
| => | |
| now = time.now() | |
| count = 0 | |
| resend = [] | |
| for task_addr_offset of ing | |
| # task_id+int2bin(addr_id)+int2bin(offset)] | |
| ++count | |
| cost = now - ing[task_addr_offset] | |
| resend.push task_addr_offset | |
| if cost < 60 | |
| break | |
| console.log "count", count, ing | |
| if count | |
| timer() | |
| else | |
| self._send = tsend | |
| return | |
| 3000 | |
| ) | |
| @_send = tsend = -> | |
| timer() | |
| self._send = _send | |
| _send.apply @,arguments | |
| @_boot() | |
| _send_bin:(addr,key,url,offset)-> | |
| @_send( | |
| Buffer.from( | |
| addr,binary | |
| ).toString(hex) | |
| Buffer.from(key, binary) | |
| url | |
| offset | |
| ) | |
| _boot:-> | |
| {db} = @ | |
| li = chunk await redis.zrevrangebyscore("task",'+inf',0,'withscores'),2 | |
| if not li.length | |
| return | |
| p = redis.pipeline() | |
| for [key_url, id],pos in li | |
| id = int2bin parseInt id | |
| p.srandmember(R.task.todo+id) | |
| r_addr = R.task.addr+id | |
| p.rpoplpush(r_addr, r_addr) | |
| li[pos] = [id, Buffer.from(key_url[...6],binary).readUIntBE(0,6), key_url[6..]] | |
| todo = [] | |
| now = time.now() | |
| for [offset, addr_id],pos in chunk((parseInt(i[1]) for i in await p.exec()),2) | |
| [task_id, key_id, url] = li[pos] | |
| {key} = await db[site_or_hash(url)].findOne(selector:id:key_id).exec() | |
| {addr} = await db.addr.findOne(selector:id:key_id).exec() | |
| console.log [offset, addr, key, url] | |
| @_ing[task_id+int2bin(addr_id)+int2bin(offset)] = now | |
| todo.push @_send_bin(addr, key, url, offset) | |
| Promise.all todo | |
| _task_key:(key, url)-> | |
| {id} = await @db[site_or_hash(url)].id {key} | |
| int2bin(id)+url | |
| _task_id:(key, url)-> | |
| int2bin await ID.task(await @_task_key(key,url)) | |
| get:(dir, key, url, addr_li)-> | |
| if not addr_li.length | |
| return | |
| {db} = @ | |
| m = await db.addr.findByIds addr_li | |
| addr_id_rank = new Map() | |
| addr_id_li = [] | |
| id_addr = new Map() | |
| for addr from addr_li | |
| o = m.get addr | |
| if o | |
| id = o.id | |
| if o.recv | |
| addr_id_rank.set id, o.recv | |
| else | |
| {id} = await db.addr.id {addr} | |
| id_addr.set(id, addr) | |
| addr_id_li.push id | |
| addr_id_li.sort (a,b)=> | |
| (addr_id_rank.get(b) or 0) - (addr_id_rank.get(a) or 0) | |
| task_id = await @_task_id key, url | |
| r_todo = R.task.todo+task_id | |
| r_addr = R.task.addr+task_id | |
| ctime = time.now() | |
| p = redis.pipeline() | |
| if await redis.zscore R.task.ctime, task_id | |
| offset = await redis.srandmember r_todo | |
| addr_li = await redis.lrange r_addr,0,-1 | |
| exist = new Set addr_li.map((n)=>parseInt(n)) | |
| new_addr = [] | |
| for i from addr_id_li | |
| if not exist.has i | |
| new_addr.push i | |
| if new_addr.length | |
| p.rpush r_addr,...new_addr | |
| else | |
| offset = 0 | |
| p.zadd( | |
| R.task.ctime | |
| ctime | |
| task_id | |
| ) | |
| .sadd(r_todo, offset) | |
| .rpush( | |
| r_addr,...addr_id_li | |
| ) | |
| addr_id = addr_id_li[addr_id_li.length-1] | |
| @_ing[task_id+int2bin(addr_id)+int2bin(offset)] = ctime | |
| await p.exec() | |
| await @_send_bin( | |
| id_addr.get(addr_id) | |
| key | |
| url | |
| offset | |
| ) | |
| return | |
| recv:(src, key, url, offset, more, payload)-> | |
| {db} = @ | |
| keybuf = Buffer.from key, binary | |
| dir = join DIR_TMP, site_or_hash(url), b64dir(keybuf) | |
| outpath = join dir,url | |
| task_key = await @_task_key key,url | |
| task_id = int2bin(await ID.task(task_key)) | |
| addr = await db.addr.id({addr:Buffer.from(src,hex).toString binary}) | |
| task_src = task_id+int2bin(addr.id) | |
| console.log "recv",url,offset,dir | |
| p = redis.pipeline() | |
| delete @_ing[task_src+int2bin(offset)] | |
| r_todo = R.task.todo+task_id | |
| [[_,ismember],[_,scard]] = await redis.pipeline().sismember(r_todo,offset).scard(r_todo).exec() | |
| if not ismember | |
| return | |
| _done = => | |
| p.zrem("task", task_key) | |
| .del(r_todo) | |
| .del(R.task.addr+task_id) | |
| .zrem(R.task.ctime,task_id) | |
| await fs.mkdir(dirname(outpath), { recursive: true }) | |
| if offset == 0 | |
| if more | |
| size = payload.readUIntBE 0,6 | |
| payload = payload[6..] | |
| n = payload.length | |
| size += n | |
| todo = [] | |
| while n < size | |
| todo.push n | |
| n += @split | |
| fh = await fs.open(outpath, "w") | |
| await fh.truncate(size) | |
| await fh.write(payload) | |
| await fh.close() | |
| next = todo[0] | |
| @_ing[task_src+int2bin(next)] = time.now() | |
| p.srem( | |
| r_todo | |
| 0 | |
| ).sadd( | |
| r_todo, ...todo | |
| ) | |
| await @_send src, keybuf, url, next | |
| else | |
| await fs.writeFile outpath, payload | |
| _done() | |
| await p.exec() | |
| else | |
| fh = await fs.open(outpath,"r+") | |
| await fh.write( | |
| payload | |
| 0 | |
| payload.length | |
| offset | |
| ) | |
| await fh.close() | |
| if 1 == scard | |
| _done() | |
| await p.exec() | |
| else | |
| p.srem( | |
| r_todo | |
| offset | |
| ) | |
| p.srandmember(r_todo) | |
| next = parseInt (await p.exec()).pop()[1] | |
| await @_send src, keybuf, url, next | |
| addr.update({$inc:recv:(payload.length+33+url.length)/GB}) | |
| # TODO rank 更新 | |
| # 除了rank,还要检查是否在ing中 | |
| console.log [ | |
| outpath | |
| BASE64.encode(Buffer.from(src,hex)) | |
| BASE64.encode(Buffer.from(key,binary)) | |
| url | |
| offset | |
| more | |
| next | |
| payload.length | |
| payload.toString('utf8')[..32] | |
| ] | |
| return | |
| # else | |
| # if offset == 0 | |
| # size = payload.readUIntBE(pos,6) | |
| # payload = payload[pos+6..] | |
| # size += payload.length | |
| # else | |
| # #TODO | |
| # payload = payload[pos..] | |
| # console.log src, payload | |
| _send: (addr, key, url, offset)-> | |
| console.log "send", BASE64.encode(Buffer.from(addr,hex)), key.length, url, offset | |
| @conn.send( | |
| addr | |
| Buffer.concat [ | |
| FLAG_URL | |
| key | |
| url_offset.dump(url, offset) | |
| ] | |
| noReply:true | |
| ) | |
| # 每10秒检查一下,给这10秒没有更新的,重发一个请求,连续失败7次,那么放弃这个地址,如果没有可用的地址,任务暂时失败 | |
| # 30分钟后清零,重试9999次后任务失败 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment