dht 磁力源代码共享, python 语言(不了解请参看磁力百科) - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
3023369823
V2EX    Python

dht 磁力源代码共享, python 语言(不了解请参看磁力百科)

  •  
  •   3023369823 2016-08-22 09:46:35 +08:00 5352 次点击
    这是一个创建于 3346 天前的主题,其中的信息可能已经有所发展或是发生改变。

    之前我在写百度网盘爬虫百度图片爬虫的时候答应网友说,抽时间要把ok 搜搜的的源码公开,如今是时候兑现诺言了,下面就是爬虫的所有代码,完全,彻底的公开,你会不会写程序都可以使用,不过请先装个 linux 系统,具备公网条件,然后运行:

    python startCrawler.py 

    有必要提醒你,数据库字段代码中都有,请你自己建张表格,这个太简单了,就不多说了。同时我也提供一下下载地址,源码都在:下载地址 1 下载地址 2

    #!/usr/bin/env python # encoding: utf-8 """ author:haoning create time:2015.8.1 """ import hashlib import os import time import datetime import traceback import sys import random import json import socket import threading from hashlib import sha1 #进行 hash 加密 from random import randint from struct import unpack from socket import inet_ntoa from threading import Timer, Thread from time import sleep from collections import deque from Queue import Queue import MySQLdb as mdb #数据库连接器 import metautils import downloadTorrent from bencode import bencode, bdecode import pygeoip DB_HOST = '127.0.0.1' DB_USER = 'root' DB_PASS = 'root' BOOTSTRAP_NODES = ( ("67.215.246.10", 6881), ("82.221.103.244", 6881), ("23.21.224.150", 6881) ) RATE = 1 #调控速率 TID_LENGTH = 2 RE_JOIN_DHT_INTERVAL = 3 TOKEN_LENGTH = 2 INFO_HASH_LEN = 500000 #50w 数据很小,限制内存不至于消耗太大 CACHE_LEN = 100 #更新数据库缓存 WAIT_DOWNLOAD = 80 geoip = pygeoip.GeoIP('GeoIP.dat') def is_ip_allowed(ip): country = geoip.country_code_by_addr(ip) if country in ('CN','TW','JP','HK', 'KR'): return True return False def entropy(length): return "".join(chr(randint(0, 255)) for _ in xrange(length)) def random_id(): h = sha1() h.update(entropy(20)) return h.digest() def decode_nodes(nodes): n = [] length = len(nodes) if (length % 26) != 0: return n for i in range(0, length, 26): nid = nodes[i:i+20] ip = inet_ntoa(nodes[i+20:i+24]) port = unpack("!H", nodes[i+24:i+26])[0] n.append((nid, ip, port)) return n def timer(t, f): Timer(t, f).start() def get_neighbor(target, nid, end=10): return target[:end]+nid[end:] class KNode(object): def __init__(self, nid, ip, port): self.nid = nid self.ip = ip self.port = port class DHTClient(Thread): def __init__(self, max_node_qsize): Thread.__init__(self) self.setDaemon(True) self.max_node_qsize = max_node_qsize self.nid = random_id() self.nodes = deque(maxlen=max_node_qsize) def send_krpc(self, msg, address): try: self.ufd.sendto(bencode(msg), address) except Exception: pass def send_find_node(self, address, nid=None): nid = get_neighbor(nid, self.nid) if nid else self.nid tid = entropy(TID_LENGTH) msg = { "t": tid, "y": "q", "q": "find_node", "a": { "id": nid, "target": random_id() } } self.send_krpc(msg, address) def join_DHT(self): for address in BOOTSTRAP_NODES: self.send_find_node(address) def re_join_DHT(self): if len(self.nodes) == 0: self.join_DHT() timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) def auto_send_find_node(self): wait = 1.0 / self.max_node_qsize while True: try: node = self.nodes.popleft() self.send_find_node((node.ip, node.port), node.nid) except IndexError: pass try: sleep(wait) except KeyboardInterrupt: os._exit(0) def process_find_node_response(self, msg, address): nodes = decode_nodes(msg["r"]["nodes"]) for node in nodes: (nid, ip, port) = node if len(nid) != 20: continue if ip == self.bind_ip: continue n = KNode(nid, ip, port) self.nodes.append(n) class DHTServer(DHTClient): #获得 info_hash def __init__(self, master, bind_ip, bind_port, max_node_qsize): DHTClient.__init__(self, max_node_qsize) self.master = master self.bind_ip = bind_ip self.bind_port = bind_port self.speed=0 self.process_request_actiOns= { "get_peers": self.on_get_peers_request, "announce_peer": self.on_announe_peer_request, } self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) self.ufd.bind((self.bind_ip, self.bind_port)) timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT) def run(self): self.re_join_DHT() while True: try: (data, address) = self.ufd.recvfrom(65536) msg = bdecode(data) self.on_message(msg, address) except Exception: pass def on_message(self, msg, address): global RATE #设为全局量 try: if msg["y"] == "r": if msg["r"].has_key("nodes"): self.process_find_node_response(msg, address) #发现节点 elif msg["y"] == "q": try: self.speed+=1 if self.speed % 10000 ==0: RATE=random.randint(1,3) if RATE==2: RATE=1 if RATE==3: RATE=10 if self.speed>100000: self.speed=0 if self.speed % RATE==0: #数据过多,占用 cpu 太多,划分限速,1,1,10 self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取 info_hash #self.process_request_actions[msg["q"]](msg, address) #处理其他节点的请求,这个过程获取 info_hash except KeyError: self.play_dead(msg, address) except KeyError: pass def on_get_peers_request(self, msg, address): try: infohash = msg["a"]["info_hash"] tid = msg["t"] nid = msg["a"]["id"] token = infohash[:TOKEN_LENGTH] msg = { "t": tid, "y": "r", "r": { "id": get_neighbor(infohash, self.nid), "nodes": "", "token": token } } self.master.log(infohash, address) self.send_krpc(msg, address) except KeyError: pass def on_announce_peer_request(self, msg, address): try: infohash = msg["a"]["info_hash"] token = msg["a"]["token"] nid = msg["a"]["id"] tid = msg["t"] if infohash[:TOKEN_LENGTH] == token: if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0: port = address[1] else: port = msg["a"]["port"] self.master.log_announce(infohash, (address[0], port)) except Exception: print 'error' pass finally: self.ok(msg, address) def play_dead(self, msg, address): try: tid = msg["t"] msg = { "t": tid, "y": "e", "e": [202, "Server Error"] } self.send_krpc(msg, address) except KeyError: pass def ok(self, msg, address): try: tid = msg["t"] nid = msg["a"]["id"] msg = { "t": tid, "y": "r", "r": { "id": get_neighbor(nid, self.nid) } } self.send_krpc(msg, address) except KeyError: pass class Master(Thread): #解析 info_hash def __init__(self): Thread.__init__(self) self.setDaemon(True) self.queue = Queue() self.cache = Queue() self.count=0 self.mutex = threading.RLock() #可重入锁,使单线程可以再次获得已经获得的? self.waitDownload = Queue() self.metadata_queue = Queue() self.dbcOnn= mdb.connect(DB_HOST, DB_USER, DB_PASS, 'oksousou', charset='utf8') self.dbconn.autocommit(False) self.dbcurr = self.dbconn.cursor() self.dbcurr.execute('SET NAMES utf8') self.visited = set() def lock(self): #加锁 self.mutex.acquire() def unlock(self): #解锁 self.mutex.release() def work(self,item): print "start thread",item while True: self.prepare_download_metadata() self.lock() self.download_metadata() self.unlock() self.lock() self.got_torrent() self.unlock() def start_work(self,max): for item in xrange(max): t = threading.Thread(target=self.work, args=(item,)) t.setDaemon(True) t.start() #入队的种子效率更高 def log_announce(self, binhash, address=None): if self.queue.qsize() < INFO_HASH_LEN : #大于 INFO_HASH_LEN 就不要入队,否则后面来不及处理 if is_ip_allowed(address[0]): self.queue.put([address, binhash]) #获得 info_hash def log(self, infohash, address=None): if self.queue.qsize() < INFO_HASH_LEN: #大于 INFO_HASH_LEN/2 就不要入队,否则后面来不及处理 if is_ip_allowed(address[0]): self.queue.put([address, infohash]) def prepare_download_metadata(self): if self.queue.qsize() == 0: sleep(2) #从 queue 中获得 info_hash 用来下载 address, binhash= self.queue.get() if binhash in self.visited: return if len(self.visited) > 100000: #大于 100000 重置队列,认为已经访问过了 self.visited = set() self.visited.add(binhash) #跟新已经访问过的 info_hash info_hash = binhash.encode('hex') utcnow = datetime.datetime.utcnow() self.cache.put((address,binhash,utcnow)) #装入缓存队列 def download_metadata(self): if self.cache.qsize() > CACHE_LEN/2: #出队更新下载 while self.cache.qsize() > 0: #排空队列 address,binhash,utcnow = self.cache.get() info_hash = binhash.encode('hex') self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,)) y = self.dbcurr.fetchone() if y: # 更新最近发现时间,请求数 self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash)) else: self.waitDownload.put((address, binhash)) self.dbconn.commit() if self.waitDownload.qsize() > WAIT_DOWNLOAD: while self.waitDownload.qsize() > 0: address,binhash = self.waitDownload.get() t = threading.Thread(target=downloadTorrent.download_metadata, args=(address, binhash, self.metadata_queue)) t.setDaemon(True) t.start() def decode(self, s): if type(s) is list: s = ';'.join(s) u = s for x in (self.encoding, 'utf8', 'gbk', 'big5'): try: u = s.decode(x) return u except: pass return s.decode(self.encoding, 'ignore') def decode_utf8(self, d, i): if i+'.utf-8' in d: return d[i+'.utf-8'].decode('utf8') return self.decode(d[i]) def parse_metadata(self, data): #解析种子 info = {} self.encoding = 'utf8' try: torrent = bdecode(data) #编码后解析 if not torrent.get('name'): return None except: return None detail = torrent info['name'] = self.decode_utf8(detail, 'name') if 'files' in detail: info['files'] = [] for x in detail['files']: if 'path.utf-8' in x: v = {'path': self.decode('/'.join(x['path.utf-8'])), 'length': x['length']} else: v = {'path': self.decode('/'.join(x['path'])), 'length': x['length']} if 'filehash' in x: v['filehash'] = x['filehash'].encode('hex') info['files'].append(v) info['length'] = sum([x['length'] for x in info['files']]) else: info['length'] = detail['length'] info['data_hash'] = hashlib.md5(detail['pieces']).hexdigest() return info def got_torrent(self): if self.metadata_queue.qsize() == 0: return binhash, address, data,start_time = self.metadata_queue.get() if not data: return try: info = self.parse_metadata(data) if not info: return except: traceback.print_exc() return temp = time.time() x = time.localtime(float(temp)) utcnow = time.strftime("%Y-%m-%d %H:%M:%S",x) # get time now info_hash = binhash.encode('hex') #磁力 info['info_hash'] = info_hash # need to build tags info['tagged'] = False info['classified'] = False info['requests'] = 1 info['last_seen'] = utcnow info['create_time'] = utcnow info['source_ip'] = address[0] if info.get('files'): files = [z for z in info['files'] if not z['path'].startswith('_')] if not files: files = info['files'] else: files = [{'path': info['name'], 'length': info['length']}] files.sort(key=lambda z:z['length'], reverse=True) bigfname = files[0]['path'] info['extension'] = metautils.get_extension(bigfname).lower() info['category'] = metautils.get_category(info['extension']) try: try: print '\n', 'Saved', info['info_hash'], info['name'], (time.time()-start_time), 's', address[0] except: print '\n', 'Saved', info['info_hash'] ret = self.dbcurr.execute('INSERT INTO search_hash(info_hash,category,data_hash,name,extension,classified,source_ip,tagged,' + 'length,create_time,last_seen,requests) VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', (info['info_hash'], info['category'], info['data_hash'], info['name'], info['extension'], info['classified'], info['source_ip'], info['tagged'], info['length'], info['create_time'], info['last_seen'], info['requests'])) if self.count %50 ==0: self.dbconn.commit() if self.count>100000: self.count=0 except: print self.name, 'save error', self.name, info traceback.print_exc() return if __name__ == "__main__": #启动客户端 master = Master() master.start_work(150) #启动服务器 dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200) dht.start() dht.auto_send_find_node() 

    注:不好意思, V2EX 主题字数不能超过 20000 ,我没法写下去了,此贴首发博客园,我把链接贴出来,可以过去看看,不想过去把源码下下来慢慢看吧,开篇我已经给了地址,谢谢。 博客园地址:http://www.cnblogs.com/huangxie/p/5550680.html 这个爬虫还是耗费了本人和其他网上高手的很多时间的,请看到这篇博客的朋友保持钻研精神,开源精神,多多交流,秉承分享。本人建立个 qq 群作为去转盘网的官方群,人数现在也不多,如果有兴趣的话来逛逛吧,多个粉丝去转盘多一份热闹, qq 群号: 512245829

    6 条回复    2016-08-22 21:41:14 +08:00
    snsd
        1
    snsd  
       2016-08-22 10:24:12 +08:00 via iPhone
    楼主方便的话能不能做个爱奇艺 1080p 视频的在线解析?
    3023369823
        2
    3023369823  
    OP
       2016-08-22 10:44:02 +08:00
    @snsd 这个没有研究过,你有好的思路可以说下
    Allianzcortex
        3
    Allianzcortex  
       2016-08-22 11:18:01 +08:00
    我最近也在看 DHT 协议,准备实时监听。不过 LZ 的代码里可以优化一下 global 全局变量和裸 except 这块( no offense 哈)。
    3023369823
        4
    3023369823  
    OP
       2016-08-22 11:21:40 +08:00
    @Allianzcortex 是的,代码写了好久了,有时间在看看,谢谢你的指出
    hmlbr
        5
    hmlbr  
       2016-08-22 17:34:07 +08:00
    有空研究下,赞 lz
    3023369823
        6
    3023369823  
    OP
       2016-08-22 21:41:14 +08:00
    @hmlbr 好的
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     1012 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 26ms UTC 22:36 PVG 06:36 LAX 15:36 JFK 18:36
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86