Version 2 (modified by 12 years ago) (diff) | ,
---|
Locality scheduling redesign
Assumptions
- A batch consists of a sequence of files F1 ... Fn and a set of jobs J1 ... Jm operating on these files.
- Each job uses a contiguous set of files.
- A given file may be used by many jobs.
- The density of jobs in the file sequence may be variable.
- Several batches may be in progress concurrently.
Goals
- To complete the batch quickly.
- To minimize the amount of data transfer to hosts.
Scheduling policy
The policy in which we dispatch the jobs in order essentially sends every file to every host, so it fails to achieve the 2nd goal.
The ideal policy would start each host at a different point in the job space, separated according to their speeds. This would potentially send each file to a single host. However, it's impractical for various reasons: replication, unreliability of hosts, and unpredictability of their speed.
Instead, we use a policy in which the set of hosts is divided into teams, and each team starts at a different point in the job space. Teams should have these properties:
- The fastest host in a team should be no faster than the total of the other hosts. Otherwise, it could get unboundedly far ahead of the others.
- Subject to the above, teams should as small as possible. A good size might be 10 or 20.
- The hosts in a team should belong to different users.
Because of host churn, team membership is dynamic; e.g. a team may be empty for a period.
Design
A cursor consists of
- a dynamic team of hosts
- a range of jobs
- status information (see below)
Note: we discussed having a separate notion of "job range", allowing cursors to move from one job range to another, and allowing job ranges to be subdivided. I think this is needless complexity.
Database
New tables:
batch_host host_id integer batch_id integer cursor_id integer locality_cursor expavg_credit double // sum of expavg_credit of hosts in the team first_job_num integer last_job_num integer first_unfinished_job_num integer // all jobs before this have been completed first_ungenerated_job_num integer // all jobs before this have workunit records workunit (new fields) cursor_id integer job_num integer result (new fields) cursor_id integer
Initialization
To initialize a batch:
- create batch record
- based on # of hosts, # of jobs, and job density, create locality_cursor records
Feeder/scheduler
Assign host to cursors
For each batch:
If this is a new host (i.e. no batch_host record) then
- assign host to cursor with least expavg_credit
- create batch_host record
- add host's expavg_credit to cursor's expavg_credit
Otherwise, consider moving this host to a different cursor. Let C = host's cursor. If C.expavg_credit > 2*lowest expavg among cursors, then move this host to the lowest-expavg cursor. (This policy may need to be refined a bit).
Assigning jobs
For each batch:
Enumerate unsent results for this host's cursor in order of increasing job num (i.e. finish old jobs before starting new ones). Send as many as the host can handle.
Some type of synchronization is needed; maybe
- use a DB transaction (how much would this lock?)
- explicitly lock the locality_cursor record (is this possible?)
- use a semaphore per cursor
Deleting files
If the host has a file that's used only by finished jobs, tell client to delete it.
Work generator
Try to maintain a cushion of N unsent jobs per cursor. Start generating jobs at cursor.first_ungenerated_job_num.
Transitioner
When a workunit is completed (successfully or not):
if workunit.job_num == cursor.first_unfinished_job_num cursor.first_unfinished_job_num++
locality_daemon
This runs periodically (every hour or day) and recomputes locality_cursor.expavg_credit. It doesn't have to be a separate program; it could be added to an existing daemon, like the feeder