Python crawler tutorial: batch grab QQ group information

Python crawler tutorial: batch grab QQ group information

Preface

This article explains that Python batch grabs QQ group information, including group name, group number, group number, group owner, region, classification, label, group profile, etc., and returns the XLS/CSV/JSON result file.

Basic environment configuration

Version: Python2.7

Related modules:

  • bottle
  • requests
  • simplejson
  • pyexcel-xls
  • unicodecsv

Screenshot of code implementation

ps: Here I recommend my python zero-based system learning exchange deduction qun: 322795889, if you don’t understand python (learning method, learning route, how to learn efficiently), you can add it, there are good learning tutorials in the group , Development tools, e-book sharing. Professional teacher answering questions

Source code sharing:

#!/usr/bin/env python
# -*- coding:utf-8 -*
import os
import sys
app_root = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.join(app_root,'vendor'))
from bottle import *
import requests
from time import time, sleep
from random import random
try:
    import ujson as json
except ImportError:
    import simplejson as json
from io import BytesIO
import pyexcel as pe
import unicodecsv as csv
import re
import zipfile
from uuid import uuid4
#import sae

attachments = {}
sourceURL ='http://find.qq.com/index.html?version=1&im_version=5533&width=910&height=610&search_target=0'


class QQGroups(object):
    """QQ Groups Spider"""

    def __init__(self):
        super(QQGroups, self).__init__()
        self.js_ver = '10226'
        self.newSession()

    def newSession(self):
        self.sess = requests.Session()
        headers = {
            'User-Agent':'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.59 QQ/8.9.3.21169 Safari/537.36'
        }
        self.sess.headers.update(headers)
        return

    def getQRCode(self):
        self.newSession()
        try:
            url ='http://ui.ptlogin2.qq.com/cgi-bin/login'
            params = {
                'appid': '715030901',
                'daid': '73',
                'pt_no_auth': '1',
                's_url': sourceURL
            }
            resp = self.sess.get(url, params=params, timeout=1000)
            pattern = r'imgcache\.qq\.com/ptlogin/ver/(\d+)/js'
            try:
                self.js_ver = re.search(pattern, resp.content).group(1)
            except:
                pass
            self.sess.headers.update({'Referer': url})
            url ='http://ptlogin2.qq.com/ptqrshow'
            params = {
                'appid': '715030901',
                'e': '2',
                'l':'M',
                's': '3',
                'd': '72',
                'v': '4',
                't':'%.17f'% (random()),
                'daid': '73'
            }
            resp = self.sess.get(url, params=params, timeout=1000)
            response.set_header('Content-Type','image/png')
            response.add_header('Cache-Control','no-cache, no-store')
            response.add_header('Pragma','no-cache')
        except:
            resp = None
        return resp

    def qrLogin(self):
        login_sig = self.sess.cookies.get_dict().get('pt_login_sig','')
        qrsig = self.sess.cookies.get_dict().get('qrsig','')
        status = -1
        errorMsg =''
        if all([login_sig, qrsig]):
            url ='http://ptlogin2.qq.com/ptqrlogin'
            params = {
                'u1': sourceURL,
                'ptqrtoken': self.genqrtoken(qrsig),
                'ptredirect': '1',
                'h': '1',
                't': '1',
                'g': '1',
                'from_ui': '1',
                'ptlang': '2052',
                'action': '0-0-%d'% (time() * 1000),
                'js_ver': self.js_ver,
                'js_type': '1',
                'login_sig': login_sig,
                'pt_uistyle': '40',
                'aid': '715030901',
                'daid': '73'
            }
            try:
                resp = self.sess.get(url, params=params, timeout=1000)
                result = resp.content
                if'The QR code has not expired' in result:
                    status = 0
                elif'QR code authentication in progress' in result:
                    status = 1
                elif'login successful' in result:
                    status = 2
                elif'QR code has expired' in result:
                    status = 3
                else:
                    errorMsg = str(result.text)
            except:
                try:
                    errorMsg = str(resp.status_code)
                except:
                    pass
        loginResult = {
            'status': status,
            'time': time(),
            'errorMsg': errorMsg,
        }
        resp = json.dumps(loginResult)
        response.set_header('Content-Type','application/json; charset=UTF-8')
        response.add_header('Cache-Control','no-cache; must-revalidate')
        response.add_header('Expires','-1')
        return resp

    def qqunSearch(self, request):
        sort = request.forms.get('sort')
        pn = int(request.forms.get('pn'))
        ft = request.forms.get('ft')
        kws = request.forms.get('kws').strip()
        if not kws:
            redirect('/qqun')
        kws = re.sub(r'[\r\n]','\t', kws)
        kws = [k.strip() for k in kws.split('\t') if k.strip()]
        self.sess.headers.update({'Referer': sourceURL})
        skey = self.sess.cookies.get_dict().get('skey','')
        try:
            buff = BytesIO()
            zip_archive = zipfile.ZipFile(buff, mode='w')
            temp = []
            for i in xrange(len(kws)):
                temp.append(BytesIO())
            for i, kw in enumerate(kws[:10]):
                groups = [(u'group name', u'group number', u'group number', u'group upper limit',
                           u'group owner', u'region', u'category', u'label', u'group introduction')]
                gListRaw = []
                for page in xrange(0, pn):
                    # sort type: 0 deafult, 1 menber, 2 active
                    url ='http://qun.qq.com/cgi-bin/group_search/pc_group_search'
                    data = {
                        'k': u'make friends',
                        'n': '8',
                        'st': '1',
                        'iso': '1',
                        'src': '1',
                        'v': '4903',
                        'bkn': self.genbkn(skey),
                        'isRecommend':'false',
                        'city_id': '0',
                        'from': '1',
                        'keyword': kw,
                        'sort': sort,
                        'wantnum': '24',
                        'page': page,
                        'ldw': self.genbkn(skey)
                    }
                    resp = self.sess.post(url, data=data, timeout=1000)
                    if resp.status_code != 200:
                        print'%s\n%s'% (resp.status_code, resp.text)
                    result = json.loads(resp.content)
                    gList = result['group_list']
                    gListRaw.extend(gList)
                    for g in gList:
                        name = self.rmWTS(g['name'])
                        code = g['code']
                        member_num = g['member_num']
                        max_member_num = g['max_member_num']
                        owner_uin = g['owner_uin']
                        qaddr = ''.join(g['qaddr'])
                        try:
                            gcate = '|'.join(g['gcate'])
                        except:
                            gcate =''
                        try:
                            _labels = [l.get('label','') for l in g['labels']]
                            labels = self.rmWTS(' |'.join(_labels))
                        except:
                            labels =''
                        memo = self.rmWTS(g['memo'])
                        gMeta = (name, code, member_num, max_member_num,
                                 owner_uin, qaddr, gcate, labels, memo)
                        groups.append(gMeta)
                    if len(gList) == 1:
                        break
                    sleep(2.5)
                if ft =='xls':
                    sheet = pe.Sheet(groups)
                    sheet.save_to_memory('xls', temp[i])
                elif ft =='csv':
                    writer = csv.writer(
                        temp[i], dialect='excel', encoding='utf-8')
                    writer.writerows(groups)
                elif ft =='json':
                    json.dump(gListRaw, temp[i], indent=4, sort_keys=True)
            for i in xrange(len(kws)):
                zip_archive.writestr(kws[i].decode(
                    'utf-8') +'.' + ft, temp[i].getvalue())
            zip_archive.close()
            resultId = uuid4().hex
            attachments.update({resultId: buff})
            response.set_header('Content-Type','text/html; charset=UTF-8')
            response.add_header('Cache-Control','no-cache; must-revalidate')
            response.add_header('Expires','-1')
            return resultId
        except Exception, e:
            print e
            abort(500,)

    def genqrtoken(self, qrsig):
        e = 0
        for i in xrange(0, len(qrsig)):
            e += (e << 5) + ord(qrsig[i])
        qrtoken = (e & 2147483647)
        return str(qrtoken)

    def genbkn(self, skey):
        b = 5381
        for i in xrange(0, len(skey)):
            b += (b << 5) + ord(skey[i])
        bkn = (b & 2147483647)
        return str(bkn)

    def rmWTS(self, content):
        pattern = r'\[em\]e\d{4}\[/em\]| |<br>|[\r\n\t]'
        content = re.sub(pattern, '', content)
        content = content.replace('&','&').strip()
        return content


app = Bottle()
q = QQGroups()


@app.route('/static/<path:path>')
def server_static(path):
    return static_file(path, root='static')


@app.route('/')
def home():
    redirect('/qqun')


@app.route('/qqun', method='ANY')
@view('qqun')
def qqun():
    if request.method =='GET':
        response.set_header('Content-Type','text/html; charset=UTF-8')
        response.add_header('Cache-Control','no-cache')
        return
    elif request.method =='POST':
        return q.qqunSearch(request)


@app.route('/getqrcode')
def getQRCode():
    return q.getQRCode()


@app.route('/qrlogin')
def qrLogin():
    return q.qrLogin()


@app.route('/download')
def download():
    resultId = request.query.rid or''
    f = attachments.get(resultId,'')
    if f:
        response.set_header('Content-Type','application/zip')
        response.add_header('Content-Disposition',
                            'attachment; filename="results.zip"')
        return f.getvalue()
    else:
        abort(404)


### SAE ###
# debug(True)
# application = sae.create_wsgi_app(app)


### Local ###
if __name__ =='__main__':
    # https://bottlepy.org/docs/dev/deployment.html#switching-the-server-backend
    try:
        run(app, server='paste', host='localhost',
            port=8080, debug=True, reloader=True)
    except:
        run(app, host='localhost', port=8080, debug=True, reloader=True)

Alright! The article is shared with the readers here

Finally, if you find it helpful, remember to follow, forward, and favorite

·END·

Reference: https://cloud.tencent.com/developer/article/1481352 Python crawler tutorial: Grab QQ group information in batches-Cloud + Community-Tencent Cloud