from pymongo import MongoClient dbconn = MongoClient('localhost', 27017) db = dbconn.twitter_demographics print db import json #insert twitter credentials into DB twitter_cred = db['twitter_cred'] #remove all before insert twitter_cred.remove({}) with open('../data/twitter-cred.json','r') as f: for line in f.readlines(): user = json.loads(line) user['is_taken'] = False twitter_cred.insert(user) print 'Inserted %d users credentails'%(twitter_cred.count()) #insert brand details in to db brands = db['brands'] brands.remove({}) with open('../data/brands.json','r') as f: for line in f.readlines(): brands.insert(eval(line)) print 'Inserted %d brands'%(brands.count()) #helper functions def get_credentials(): """Gets twitter credentials from DB Returns: Respone dict , or None if failed. """ twitter_cred = db['twitter_cred'] return twitter_cred.find_and_modify(query={'is_taken':False}, update={'$set':{'is_taken':True}}, upsert=False, sort=None) def get_brand_to_process(): """Gets unprocessed brand names from DB Returns: brand dict or None """ brandObj = db['brands'] brand = brandObj.find_and_modify(query={'is_processed':False,'is_taken':False}, update={'$set':{'is_taken':True}}, upsert=False, sort=None, full_response=False) if brand: brand['brand_id'] = brand.pop('_id') return brand def get_follower_to_process(): """Gets unprocessed followers form DB Returns: follower details dict or None """ xmatrix = db['xmatrix'] followersDtls = xmatrix.find_and_modify(query={'is_processed':False,'is_taken':False}, update={'$set':{'is_taken':True}}, upsert=False, sort=None, full_response=False) if followersDtls: followersDtls['follower_id'] = followersDtls.pop('_id') return followersDtls def add_followers_to_db(brand_id,followers_ids): """Adds followers to DB Args: brand_id :brand id of which the user follows followers_ids: Ids of followers of the brand """ xmatrix = db['xmatrix'] for follower_id in followers_ids: xmatrix.update({'_id':follower_id},{'$addToSet':{'follows':brand_id},'$set':{'is_processed':False,'is_taken':False}},True) def add_friends_to_dB(follower_id,friends_list): """Adds friends to DB Args: follower_id: Id of follower freiends : list of his friends """ xmatrix = db['xmatrix'] xmatrix.update({'_id':follower_id}, {'$addToSet':{ 'follows':{ '$each':friends_list } } }) def update_processed_flag(user_id): """updates processed flag to true Args: user_id: user id for which processed flag to be updated """ xmatrix = db['xmatrix'] xmatrix.update({'_id':user_id},{'$set':{'is_taken':False,'is_processed':True}}) def remove_user_from_x(self,user_id): """removes user from xmatrix Args: user_id: user id """ xmatrix = db['xmatrix'] xmatrix.remove({'_id':user_id}) def robust_request(twitter, resource, params, max_tries=5): """ If a Twitter request fails, sleep for 15 minutes. Do this at most max_tries times before quitting. Args: twitter .... A TwitterAPI object. resource ... A resource string to request. params ..... A parameter dictionary for the request. max_tries .. The maximum number of tries to attempt. Returns: A TwitterResponse object, or None if failed. """ for i in range(max_tries): request = twitter.request(resource, params) if request.status_code == 200: return request r = [r for r in request][0] if ('code' in r and r['code'] == 34) or ('error' in r and r['error'] == 'Not authorized.'): # 34 == user does not exist. print >> sys.stderr, 'skipping bad request', resource, params return None else: print >> sys.stderr, 'Got error:', request.text, '\nsleeping for 15 minutes.' sys.stderr.flush() time.sleep(60 * 15) #get twitter obj from TwitterAPI import TwitterAPI twitter = get_credentials() if twitter: twitterObj = TwitterAPI( twitter['api_key'], twitter['api_secret'], twitter ['access_token_key'], twitter['access_token_secret']) else: print >> sys.stderr,'Twitter credits not available' def get_followers(user_id): """To get followers of given twitter Ids Args: user_id... twitter user id Returns followers list """ followers = [] request = robust_request(twitterObj, 'followers/ids', {'user_id': user_id, 'count': 300}) if request: for result in request: if 'ids' in result: followers += result['ids'] return followers def get_friends(user_id): """To get friends of given twitter Ids Args: user_id... twitter user id Returns friends list """ friends = [] request = robust_request(twitterObj, 'friends/ids', {'user_id': user_id, 'count': 5000}) if request: for result in request: if 'ids' in result: friends += result['ids'] return friends #For testing purpose #else set it to -1 cut_off = 3 import sys def process_followers(): """Gets unprocessed brands from DB fetch 300 followers of those brands and adds it to DB. Halts when all brands are processed or cutt_off is reached """ global cut_off while True: if cut_off == 0: print >> sys.stderr, 'cut off reached' break cut_off -= 1 brand = get_brand_to_process() if not brand: print >> sys.stderr, 'No brands to process' break followers = get_followers(brand['brand_id']) if len(followers) > 0: add_followers_to_db(brand['brand_id'],followers) print 'added %d followers of %s to DB'%(len(followers),brand['brand_id']) process_followers() #For testing purpose #else set it to -1 cut_off = 3 import sys def process_friends(): """Gets unprocessed follower and get 5000 of their friends , adds it to DB Halts when all followers are processed or cutt_off is reached """ global cut_off while True: if cut_off == 0: print >> sys.stderr, 'cut off reached' break cut_off -= 1 follower = get_follower_to_process() if not follower: print >> sys.stderr, 'No follower to process' break friends = get_friends(str(follower['follower_id'])) if len(friends) > 0: add_friends_to_dB(follower['follower_id'],friends) print 'added %d followers of %s to DB'%(len(friends),follower['follower_id']) update_processed_flag(str(follower['follower_id'])) else: remove_user_from_x(str(follower['follower_id'])) print >> sys.stderr, 'removed user , unable to fetch friends list for %s'%(str(follower['follower_id'])) process_friends() # Get all brands. id2brand = dict() username2brand = dict() for brand in db.brands.find(): id2brand[brand['_id']] = brand username2brand[brand['brand_name'].lower()] = brand print 'read', len(id2brand), 'brands' # Iterate over sampled followers for each brand. from collections import Counter, defaultdict # Count how often each friend appears so we can remove those occuring fewer than N tims. friend_counts = Counter() count = 0 for follower in db.xmatrix.find({'is_processed':True}): friend_counts.update(follower['follows']) count += 1 if count % 1000 == 0: print 'read', count # Filter accounts not appearing a minimum number of times. count_thresh = 100 friend_set = set(f for f, v in friend_counts.iteritems() if v >= count_thresh) print len(friend_set), 'of', len(friend_counts), 'appear at least', count_thresh, 'times' # Now construct friend counts for each brand, using the filtered set of accounts. brand2counts = defaultdict(lambda: Counter()) count = 0 for follower in db.xmatrix.find({'is_processed':True}): count += 1 if count % 1000 == 0: print 'read', count brandids = set([f for f in follower['follows'] if f in id2brand]) friends = set(follower['follows']) & friend_set for b in brandids: brand2counts[b].update(friends) # Print the top accounts for the first brand. print brand2counts.keys()[0], sorted(brand2counts.values()[0].items(), key=lambda x: -x[1])[:5] # Read the demographics data for each brand. import json username2demo = dict() for line in open('../data/demo.json', 'rt'): js = json.loads(line) username2demo[js['twitter'].lower()] = js print 'read', len(username2demo), 'demographics' # Add demographics to each brand dict. for username, brand in username2brand.iteritems(): brand['demo'] = username2demo[username] if not 'Female' in brand['demo']: print brand['brand_name'] # , brand['demo']['Female'] # Set self reference counts to 0. for brand in brand2counts: brand2counts[brand][brand] = 0. # Now, the brand id should not appear in the count dict. print brand2counts.keys()[0], sorted(brand2counts.values()[0].items(), key=lambda x: -x[1])[:5] # Pickle everything from functools import partial import pickle pickle.dump(username2brand, open('username2brand.pkl', 'wb')) pickle.dump(id2brand,open('id2brand.pkl','wb')) pickle.dump(dict([(b,c) for b,c in brand2counts.iteritems()]), open('brand2counts.pkl', 'wb'))